diff --git a/reportservice/automatic_run.py b/reportservice/automatic_run.py index 8d1074d2f1f8f74a31fc845566b274a6e246a8a1..35a8bdb2a8356fa3da1eca650a2f92574d9f4517 100644 --- a/reportservice/automatic_run.py +++ b/reportservice/automatic_run.py @@ -5,7 +5,7 @@ import logging from dateutil import parser, tz import yaml import zmq - +import zmq.asyncio async def auto_run(): """ @@ -42,12 +42,12 @@ async def auto_run(): # Begin-IF a run-timestamp passed, # do automatic-run. if time_now > sched_time: - con = zmq.Context() + con = zmq.asyncio.Context() sock = con.socket(zmq.REQ) port = cfg['GLOBAL']['server-port'] sock.connect(port) - sock.send_pyobj(['all']) - msg = sock.recv_pyobj() + await sock.send_pyobj(['all']) + msg = await sock.recv_pyobj() logging.info('{} Automatic Run' .format(msg)) diff --git a/reportservice/manual_run.py b/reportservice/manual_run.py index 4ba643ad637c6fe1827a08db0a4719d021429878..719b28ac5c86fb39cf366969afe14631b159478c 100644 --- a/reportservice/manual_run.py +++ b/reportservice/manual_run.py @@ -1,9 +1,11 @@ +import asyncio import logging import os import sys import yaml import zmq +import zmq.asyncio logfile = './cal.log' logging.basicConfig(filename=logfile, filemode='a+', @@ -12,7 +14,7 @@ logging.basicConfig(filename=logfile, filemode='a+', datefmt='%Y-%m-%d %H:%M:%S') -async def manual_launch(req_conf): +async def manual_run(req_conf): """ Run the report service manually from any machine and provide the requested configuration for @@ -32,11 +34,11 @@ async def manual_launch(req_conf): cfg = yaml.load(ymlfile) port = cfg['GLOBAL']['server-port'] - con = zmq.Context() + con = zmq.asyncio.Context() socket = con.socket(zmq.REQ) con = socket.connect(port) - socket.send_pyobj(req_conf) - msg = socket.recv_pyobj() + await socket.send_pyobj(req_conf) + msg = await socket.recv_pyobj() logging.info('{} Manual Run'.format(msg)) diff --git a/reportservice/report_service.py b/reportservice/report_service.py index 894ceb39d860298126b81475da6e97dbd832c663..ec4a853965cbc7926d00cee4da07707d7d97202c 100644 --- a/reportservice/report_service.py +++ b/reportservice/report_service.py @@ -42,6 +42,7 @@ def init_config_repo(config): async def parse_output(output): + joblist = [] for line in output.split('\n'): if 'Submitted' in line: @@ -51,6 +52,7 @@ async def parse_output(output): async def wait_jobs(joblist): + counter = 0 while True: found_jobs = set() @@ -68,6 +70,7 @@ async def wait_jobs(joblist): async def get_run_base(detector): + run_base = ['xfel-calibrate'] + detector['det-type'] for key, item in detector.items(): @@ -85,7 +88,7 @@ async def get_run_base(detector): return run_base -async def delete_tmp_folder(fpath): +async def del_folder(fpath): """ Delete temporary folders e.g. the out-folder of new generated plots. @@ -113,8 +116,8 @@ async def push_figures(repo_master, addf): :param repo_master: the local git-repository. :param addf: the generated figures to be added. - """ + repo = Repo(repo_master) repo.index.add(addf) repo.index.commit("Add new {} figures".format(len(addf))) @@ -138,7 +141,7 @@ async def server_runner(config): # perform git-dir checks and pull the project for updates. init_config_repo(config['GLOBAL']['git']) - + logging.info("report service port: {}:{}" .format(config['GLOBAL']['report-service']['bind-to'], config['GLOBAL']['report-service']['port'])) @@ -161,7 +164,7 @@ async def server_runner(config): # reports config file req_cfg = {} - + #Begin-IF response dict or list if isinstance(response, dict): req_cfg = response @@ -203,12 +206,12 @@ async def server_runner(config): logging.debug('Config information: {}'.format(detector)) run_base = await get_run_base(detector) - + try: output = subprocess.check_output(run_base) \ .decode('utf8') logging.info( - 'Submission information: \n{}'.format(output)) + 'Submission information: {}'.format(output)) logging.info('Run plots {}'.format(run_base)) except Exception as e: @@ -224,6 +227,7 @@ async def server_runner(config): logging.info('Jobs finished') except asyncio.TimeoutError: + asyncio.ensure_future(del_folder('./tmp')) logging.error('Jobs timeout!') # Copy all plots @@ -241,7 +245,10 @@ async def server_runner(config): os.makedirs(fpath, exist_ok=True) det_new_files[f] = fpath - + + # set concurrency limitation. + # 50 have been chosen by trial + # (this is not the max limitation). sem = asyncio.Semaphore(50) all_new_files.append( '{}/{}'.format(fpath, f.split('/')[-1])) @@ -255,14 +262,14 @@ async def server_runner(config): # End-for-loop over instruments asyncio.ensure_future( - push_figures(cfg['GLOBAL']['git']['repo-local'], - all_new_files)) - + push_figures(cfg['GLOBAL']['git']['repo-local'], + all_new_files)) + logging.info('All done') return # End-asyncio-function(do_action) - # TODO delete tmp file + # TODO delete out-folder try: asyncio.ensure_future(do_action(copy.copy(req_cfg)))