diff --git a/.gitignore b/.gitignore index bd850f309e83e3260c0866586288d0e525541d94..fe665f186efdfad4bcc081d3b40e10e2e613a0c9 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,4 @@ docs/source/_static/reports webservice/webservice.yaml webservice/*.log webservice/*sqlite +reportservice/*.log diff --git a/reportservice/automatic_run.py b/reportservice/automatic_run.py new file mode 100644 index 0000000000000000000000000000000000000000..99a3293d352fac9e9a6cb6fa9bd0478e48ce598b --- /dev/null +++ b/reportservice/automatic_run.py @@ -0,0 +1,88 @@ +import argparse +import asyncio +from datetime import datetime, timedelta +import logging +import urllib.parse + +from dateutil import parser, tz +import yaml +import zmq +import zmq.asyncio + + +async def auto_run(cfg, timeout=3000): + """ + Run the report service automatically depending on the scheduled times + in the run_time list, read from the config yaml file (report_conf.yaml) + """ + + # time index that points at a timestamp, when the next + # report service run takes place. + tidx = 0 + + # list of timestamps for the report service runs + run_time = cfg['GLOBAL']['run-on'] + + for i, ti in enumerate(run_time): + run_time[i] = parser.parse(ti) + + while True: + + time_now = datetime.utcnow().replace(tzinfo=tz.tzutc()) + sched_time = run_time[tidx] + + if sched_time.tzinfo is None: + sched_time = sched_time.replace(tzinfo=tz.tzutc()) + + # do automatic-run. + if time_now > sched_time: + con = zmq.asyncio.Context() + sock = con.socket(zmq.REQ) + port = cfg['GLOBAL']['server-port'] + sock.SNDTIMEO = timeout + sock.RCVTIMEO = timeout + sock.connect(port) + await sock.send_pyobj(['all']) + msg = await sock.recv_pyobj() + logging.info('{} Automatic Run' + .format(msg)) + + # Changing run_time to the sametime next week + run_time[tidx] = sched_time + timedelta(weeks=1) + + tidx = tidx + 1 if tidx != len(run_time)-1 else 0 + + # check every 10mins, if there is + # a need for an automatic-run. + await asyncio.sleep(3000) + + +arg_parser = argparse.ArgumentParser(description='Automatic Launch') +arg_parser.add_argument('--config-file', type=str, + default='./report_conf.yaml', + help='config file path with reportservice port. ' + 'Default=./report_conf.yaml') +arg_parser.add_argument('--log-file', type=str, default='./report.log', + help='The report log file path. Default=./report.log') +arg_parser.add_argument('--logging', type=str, default="INFO", + help='logging modes: INFO, DEBUG or ERROR. ' + 'Default=INFO', + choices=['INFO', 'DEBUG', 'ERROR']) + +if __name__ == "__main__": + args = vars(arg_parser.parse_args()) + conf_file = args["config_file"] + with open(conf_file, "r") as f: + cfg = yaml.load(f.read()) + + logfile = args["log_file"] + fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + logging.basicConfig(filename=logfile, filemode='a+', + level=getattr(logging, args['logging']), + format='%(levelname)-6s: %(asctime)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + loop = asyncio.get_event_loop() + loop.run_until_complete(auto_run(cfg)) + loop.close() diff --git a/reportservice/manual_run.py b/reportservice/manual_run.py new file mode 100644 index 0000000000000000000000000000000000000000..4e67c8fd70a59a24dac1189e82b1a83fad9aae7a --- /dev/null +++ b/reportservice/manual_run.py @@ -0,0 +1,72 @@ +import argparse +import logging +import os +import sys +import urllib.parse + +import yaml +import zmq + + +def manual_run(request, cfg): + """ + Run the report service manually from any machine + and provide the requested configuration for + reports generation. + + :param request: a list for generating reports for the + requested Instruments. This list can + contain the Instruments names e.g ['SPB'] + or ['all'] for generating reports for all + instruments in the "report_conf.yaml". + + This can also be a customized conf. file(dict) + for test purposes. + """ + + port = cfg['GLOBAL']['server-port'] + con = zmq.Context() + socket = con.socket(zmq.REQ) + con = socket.connect(port) + socket.send_pyobj(request) + msg = socket.recv_pyobj() + logging.info('{} Manual Run'.format(msg)) + +arg_parser = argparse.ArgumentParser(description='Manual Launch') +arg_parser.add_argument('--instrument', default=['all'], nargs='+', + help='select the requested instruments. ' + 'Default=all') +arg_parser.add_argument('--testing', dest='testing', action='store_true', + help='required for testing with different ' + 'config files') +arg_parser.set_defaults(testing=False) +arg_parser.add_argument('--config-file', type=str, + default='./report_conf.yaml', + help='config file path with reportservice port. ' + 'Default=./report_conf.yaml') +arg_parser.add_argument('--log-file', type=str, default='./report.log', + help='The report log file path. Default=./report.log') +arg_parser.add_argument('--logging', type=str, default="INFO", + help='logging modes: INFO, DEBUG or ERROR. ' + 'Default=INFO', + choices=['INFO', 'DEBUG', 'ERROR']) + +if __name__ == "__main__": + args = vars(arg_parser.parse_args()) + conf_file = args["config_file"] + with open(conf_file, "r") as f: + cfg = yaml.load(f.read()) + + logfile = args["log_file"] + fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + logging.basicConfig(filename=logfile, filemode='a+', + level=getattr(logging, args['logging']), + format='%(levelname)-6s: %(asctime)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + if args["testing"]: + request = cfg + else: + request = args["instrument"] + + manual_run(request, cfg) diff --git a/reportservice/messages.py b/reportservice/messages.py new file mode 100644 index 0000000000000000000000000000000000000000..901795b5c8bcaafc84bc8b4ec2e53cd3d19ef830 --- /dev/null +++ b/reportservice/messages.py @@ -0,0 +1,5 @@ +class Errors: + REQUEST_MALFORMED = "FAILED: request {} is malformed, please contact det-support@xfel.eu" + INSTRUMENT_NOT_FOUND = "FAILED: Instrument {} is not known!, please contact det-support@xfel.eu" + + diff --git a/reportservice/report_conf.yaml b/reportservice/report_conf.yaml new file mode 100644 index 0000000000000000000000000000000000000000..87e51687d5b62b59caaf35cb8f9cf40bed971d59 --- /dev/null +++ b/reportservice/report_conf.yaml @@ -0,0 +1,43 @@ +GLOBAL: + git: + figures-local: "/gpfs/exfel/data/scratch/ahmedk/calibration/DetectorCharacterization/figures" + repo-local: "/gpfs/exfel/data/scratch/ahmedk/calibration/DetectorCharacterization/" + figures-remote: "http://git@git.xfel.eu/gitlab/detectors/DetectorCharacterization.git" + server-port: "tcp://max-exfl015:5555" + + run-on: + - Friday 08:30:00 UTC + - Friday 10:33:00 UTC + + report-service: + port: 5555 + bind-to: tcp://* + allowed-ips: + job-db: ./webservice_jobs.sqlite + job-update-interval: 30 + job-timeout: 3600 + +SPB: + AGIPD1M1: + det-type: + - "AGIPD" + - "STATS_FROM_DB" + start-date: "2019-01-01" + end-date: "2019-12-12" + constants: + - "Noise" + - "SlopesFF" + - "SlopesPC" + - "Offset" + modules: "0-15" + bias-voltages: + - 300 + - 500 + mem-cells: + - 128 + - 176 + photon-energy: 9.2 + use-existing: "/gpfs/exfel/data/scratch/karnem/testAGIPD16_21/" + out-folder: "/gpfs/exfel/data/scratch/karnem/testAGIPD16_21/" + cal-db-timeout: 180000 + cal-db-interface: "tcp://max-exfl016:8015#8025" diff --git a/reportservice/report_service.py b/reportservice/report_service.py new file mode 100644 index 0000000000000000000000000000000000000000..88bd8de704b1668af17eb317741dd1ccfd081d53 --- /dev/null +++ b/reportservice/report_service.py @@ -0,0 +1,339 @@ +import argparse +import asyncio +from asyncio.subprocess import PIPE +import copy +import getpass +import glob +import logging +import os +import subprocess +import shutil +import sqlite3 +import sys +from time import sleep +import urllib.parse + +from git import Repo, InvalidGitRepositoryError +import yaml +import zmq +import zmq.asyncio + +from messages import Errors + + +loop = asyncio.get_event_loop() + + +def init_config_repo(config): + """ Make sure the configuration repo. is present and up-to-data + + :param config: the configuration defined in the `config-repo` section + """ + os.makedirs(config['repo-local'], exist_ok=True) + + try: + # Check if it is a git-repo. + repo = Repo(config['repo-local']) + except InvalidGitRepositoryError: + # clone the repo. + repo = Repo.clone_from(config['figures-remote'], + config['repo-local']) + logging.info("Clonning the repository") + # make sure it is updated + repo.remote().pull() + logging.info("Config repo is initialized") + + +async def parse_output(output): + + joblist = [] + for line in output.split('\n'): + if 'Submitted' in line: + joblist.append(line.split()[2]) + logging.info('joblist: {}'.format(joblist)) + return joblist + + +async def wait_jobs(joblist): + + counter = 0 + while True: + found_jobs = set() + output = subprocess.check_output(['squeue']).decode('utf8') + for line in output.split("\n"): + for job in joblist: + if str(job) in line: + found_jobs.add(job) + if len(found_jobs) == 0: + logging.info('Plot modules is done') + break + await asyncio.sleep(10) + counter += 10 + + +async def get_run_base(detector): + + run_base = ['xfel-calibrate'] + detector['det-type'] + + for key, item in detector.items(): + + if key in ['det-type']: + continue + + run_base += ['--{}'.format(str(key))] + if not isinstance(item, list): + run_base += [str(item)] + else: + for val in item: + run_base += [str(val)] + + return run_base + + +async def del_folder(fpath): + """ Delete temporary folders e.g. the out-folder of new generated + plots. + + :param fpath: the folder path that needs to be deleted. + """ + cmd = ["rm", "-rf", fpath] + await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) + logging.info('tmp file has been deleted') + + +async def copy_files(f, path, sem): + """ Copying with concurrency limitation + + :param f: the main file with it's current path. + :param path: the path, where f is copied to. + :param sem: Semaphore is a variable that controls + access to common resources. + sem can be give a None value, to indicate + a copy of low numbers of file e.g. 1 file. + """ + if sem: + async with sem: + cmd = ["rsync", "-a", f, path] + await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) + else: + cmd = ["rsync", "-a", f, path] + await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) + + +async def push_figures(repo_master, addf): + """ Upload new figures + + :param repo_master: the local git-repository. + :param addf: the generated figures to be added. + """ + + repo = Repo(repo_master) + repo.index.add(addf) + repo.index.commit("Add {} new figures".format(len(addf))) + #TODO: create an async function for pushing new figures + # to avoid blocking the report service. + repo.remote().push() + logging.info('Pushed to git') + + +async def server_runner(conf_file, jobs_timeout): + """ + The main server loop. After pulling the latest changes + of the DC project, it awaits receiving configurations + on how to generate the requested reports. + + Depending on receiving a conf yaml file or a list of + instruments, the server proceeds by generating figures. + Figures are copied to the corresponding folder in the + DC project and an add-commit-push is performed to + update the remote project and build reports that can + be accessed through ReadTheDocs. + """ + with open(conf_file, "r") as f: + config = yaml.load(f.read()) + + # perform git-dir checks and pull the project for updates. + init_config_repo(config['GLOBAL']['git']) + + logging.info("report service port: {}:{}" + .format(config['GLOBAL']['report-service']['bind-to'], + config['GLOBAL']['report-service']['port'])) + + context = zmq.asyncio.Context() + socket = context.socket(zmq.REP) + + socket.bind("{}:{}".format(config['GLOBAL']['report-service']['bind-to'], + config['GLOBAL']['report-service']['port'])) + + asyncio.ensure_future(copy_files(conf_file, + config['GLOBAL']['git']['figures-local'], + sem=None)) + + while True: + response = await socket.recv_pyobj() + await socket.send_pyobj('Build DC reports through -->') + logging.info("response: {}".format(response)) + + # Check if response is a list or a dict. + # if list, it should either have instrument names or ['all']. + # if dict, it should acquires the details of the requested reports + # for generation. As it will be used instead of report_conf.yaml + + # reports config file + req_cfg = {} + + if isinstance(response, dict): + req_cfg = response + elif isinstance(response, list): + if len(response) == 1 and response[0] == 'all': + req_cfg = config + else: + req_cfg['GLOBAL'] = config['GLOBAL'] + for instr in response: + try: + req_cfg[instr] = config[instr] + except: + logging.error( + Errors.INSTRUMENT_NOT_FOUND.format(instr)) + continue + else: + logging.error(Errors.REQUEST_MALFORMED.format(response)) + continue + + logging.info('Requested Configuration: {}'.format(req_cfg)) + + async def do_action(cfg, jobs_timeout=3000): + logging.info('Run plot production') + + all_new_files = [] + all_new_files.append('{}/{}'.format(config['GLOBAL'] + ['git'] + ['figures-local'], + conf_file.split('/')[-1])) + + for instr_name, instrument in cfg.items(): + + if instr_name == "GLOBAL": + continue + + launched_jobs = [] + for det_name, det_conf in instrument.items(): + + logging.info('Process detector: {}'.format(det_name)) + logging.debug('Config information: {}'.format(det_conf)) + + run_base = await get_run_base(det_conf) + + try: + output = await asyncio.create_subprocess_shell( + " ".join(run_base), stdout=PIPE, stderr=PIPE) + + launched_jobs.append(output.communicate()) + + logging.info('Submission information: {}:' + .format(run_base)) + + except Exception as e: + logging.error('Submission failed: {}'.format(e)) + exit(1) + + outputs = await asyncio.gather(*launched_jobs) + + job_list = [] + for output in outputs: + if output[0]: + logging.info('Submission Output: {}' + .format(output[0].decode('utf8'))) + if output[1]: + logging.error('Submission Error: {}' + .format(output[1].decode('utf8'))) + job_list += await parse_output(output[0].decode('utf8')) + + try: + # make sure to timeout if waiting for jobs + # took more than 10 mins. + await asyncio.wait_for(wait_jobs(job_list), + timeout=jobs_timeout) + + logging.info('All jobs are finished') + + except asyncio.TimeoutError: + asyncio.ensure_future(del_folder('./tmp')) + logging.error('Jobs timeout!') + + # Copy all plots + out_folder = det_conf['out-folder'] + path = cfg['GLOBAL']['git']['figures-local'] + figures = glob.glob("{}/*png".format(out_folder)) + logging.info('Copy figures to: {}'.format(path)) + + det_new_files = {} + + for f in figures: + const = f.split('/')[-1].split('_')[0] + fpath = '{}/{}/{}'.format(path, det_name, const) + + os.makedirs(fpath, exist_ok=True) + det_new_files[f] = fpath + + # Set concurrency limitation. + # 50 have been chosen by trial + # Note: This is not the max limitation. + sem = asyncio.Semaphore(50) + all_new_files.append( + '{}/{}'.format(fpath, f.split('/')[-1])) + + await asyncio.gather(*[copy_files(k, v, sem) + for k, v in det_new_files.items()]) + + logging.info('Figures Copied') + + asyncio.ensure_future( + push_figures(cfg['GLOBAL']['git']['repo-local'], + all_new_files)) + # TODO:delete out-folder + #try: + # asyncio.ensure_future(del_folder(out_folder)) + #except: + #logging.error(str(e)) + + logging.info('All done') + + return + + + try: + asyncio.ensure_future(do_action(copy.copy(req_cfg), jobs_timeout)) + except Exception as e: # actions that fail are only error logged + logging.error(str(e)) + break + +arg_parser = argparse.ArgumentParser(description='Start the report service') +arg_parser.add_argument('--config-file', type=str, + default='./report_conf.yaml', + help='config file path with reportservice port. ' + 'Default=./report_conf.yaml') +arg_parser.add_argument('--jobs-timeout', type=int, default=3000, + help='Maximum time to wait for jobs. Default=3000') +arg_parser.add_argument('--log-file', type=str, default='./report.log', + help='The report log file path. Default=./report.log') +arg_parser.add_argument('--logging', type=str, default="INFO", + help='logging modes: INFO, DEBUG or ERROR. ' + 'Default=INFO', + choices=['INFO', 'DEBUG', 'ERROR']) + +if __name__ == "__main__": + args = vars(arg_parser.parse_args()) + conf_file = args["config_file"] + + logfile = args["log_file"] + fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' + + logging.basicConfig(filename=logfile, filemode='a+', + level=getattr(logging, args['logging']), + format='%(levelname)-6s: %(asctime)s %(message)s', + datefmt='%Y-%m-%d %H:%M:%S') + + loop = asyncio.get_event_loop() + loop.run_until_complete(server_runner(conf_file, args["jobs_timeout"])) + loop.close()