diff --git a/reportservice/manual_run.py b/reportservice/manual_run.py index 87f442f196429cc3dbe9c22e1f8b46878b4c8ce8..4e67c8fd70a59a24dac1189e82b1a83fad9aae7a 100644 --- a/reportservice/manual_run.py +++ b/reportservice/manual_run.py @@ -37,7 +37,7 @@ arg_parser.add_argument('--instrument', default=['all'], nargs='+', help='select the requested instruments. ' 'Default=all') arg_parser.add_argument('--testing', dest='testing', action='store_true', - help='required for testing with different' + help='required for testing with different ' 'config files') arg_parser.set_defaults(testing=False) arg_parser.add_argument('--config-file', type=str, diff --git a/reportservice/report_service.py b/reportservice/report_service.py index 03b318df3286356c98324f1b097b64c230d257f0..34530622a91aebb4d336463baae8a0a4f18e7583 100644 --- a/reportservice/report_service.py +++ b/reportservice/report_service.py @@ -1,5 +1,6 @@ import argparse import asyncio +from asyncio.subprocess import PIPE, STDOUT import copy import getpass import glob @@ -99,15 +100,21 @@ async def del_folder(fpath): logging.info('tmp file has been deleted') -async def cp_with_conc_lim(f, path, sem): +async def copy_files(f, path, sem): """ Copying with concurrency limitation :param f: the main file with it's current path. :param path: the path, where f is copied to. :param sem: Semaphore is a variable that controls access to common resources. + sem can be give a None value, to indicate + a copy of low numbers of file e.g. 1 file. """ - async with sem: + if sem: + async with sem: + cmd = ["rsync", "-a", f, path] + await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) + else: cmd = ["rsync", "-a", f, path] await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) @@ -128,7 +135,7 @@ async def push_figures(repo_master, addf): logging.info('Pushed to git') -async def server_runner(config): +async def server_runner(conf_file, jobs_timeout): """ The main server loop. After pulling the latest changes of the DC project, it awaits receiving configurations @@ -141,6 +148,8 @@ async def server_runner(config): update the remote project and build reports that can be accessed through ReadTheDocs. """ + with open(conf_file, "r") as f: + config = yaml.load(f.read()) # perform git-dir checks and pull the project for updates. init_config_repo(config['GLOBAL']['git']) @@ -155,6 +164,10 @@ async def server_runner(config): socket.bind("{}:{}".format(config['GLOBAL']['report-service']['bind-to'], config['GLOBAL']['report-service']['port'])) + asyncio.ensure_future(copy_files(conf_file, + config['GLOBAL']['git']['figures-local'], + sem=None)) + while True: response = await socket.recv_pyobj() await socket.send_pyobj('Build DC reports through -->') @@ -188,72 +201,82 @@ async def server_runner(config): logging.info('Requested Configuration: {}'.format(req_cfg)) - async def do_action(cfg): + async def do_action(cfg, jobs_timeout = 3000): logging.info('Run plot production') all_new_files = [] + all_new_files.append('{}/{}'.format(config['GLOBAL'] + ['git'] + ['figures-local'], + conf_file.split('/')[-1])) for instr_name, instrument in cfg.items(): if instr_name == "GLOBAL": continue - for det_name, detector in instrument.items(): + launched_jobs = [] + for det_name, det_conf in instrument.items(): logging.info('Process detector: {}'.format(det_name)) - logging.debug('Config information: {}'.format(detector)) + logging.debug('Config information: {}'.format(det_conf)) - run_base = await get_run_base(detector) + run_base = await get_run_base(det_conf) try: - output = subprocess.check_output(run_base) \ - .decode('utf8') - logging.info( - 'Submission information: {}'.format(output)) - logging.info('Run plots {}'.format(run_base)) + output = await asyncio.create_subprocess_shell( + " ".join(run_base), stdout=PIPE, stderr=PIPE) + + launched_jobs.append(output.communicate()) + + logging.info('Submission information: {}' + .format(run_base)) except Exception as e: logging.error('Submission failed: {}'.format(e)) exit(1) - - joblist = await parse_output(output) - - try: - # make sure to timeout if waiting for jobs - # took more than 10 mins. - await asyncio.wait_for(wait_jobs(joblist), timeout=600) - logging.info('Jobs finished') - - except asyncio.TimeoutError: - asyncio.ensure_future(del_folder('./tmp')) - logging.error('Jobs timeout!') - - # Copy all plots - out_folder = detector['out-folder'] - path = cfg['GLOBAL']['git']['figures-local'] - figures = glob.glob("{}/*png".format(out_folder)) - logging.info('Copy figures to: {}'.format(path)) - - det_new_files = {} - - for f in figures: - const = f.split('/')[-1].split('_')[0] - fpath = '{}/{}/{}'.format(path, det_name, const) - - os.makedirs(fpath, exist_ok=True) - det_new_files[f] = fpath - - # set concurrency limitation. - # 50 have been chosen by trial - # (this is not the max limitation). - sem = asyncio.Semaphore(50) - all_new_files.append( - '{}/{}'.format(fpath, f.split('/')[-1])) - - await asyncio.gather(*[cp_with_conc_lim(k, v, sem) - for k, v in det_new_files.items()]) - - logging.info('Figures Copied') + outputs = await asyncio.gather(*launched_jobs) + + job_list = [] + + for output in outputs: + job_list += await parse_output(output[0].decode('utf8')) + try: + # make sure to timeout if waiting for jobs + # took more than 10 mins. + await asyncio.wait_for(wait_jobs(job_list), timeout=jobs_timeout) + logging.info('All jobs are finished') + + except asyncio.TimeoutError: + asyncio.ensure_future(del_folder('./tmp')) + logging.error('Jobs timeout!') + + # Copy all plots + out_folder = det_conf['out-folder'] + path = cfg['GLOBAL']['git']['figures-local'] + figures = glob.glob("{}/*png".format(out_folder)) + logging.info('Copy figures to: {}'.format(path)) + + det_new_files = {} + + for f in figures: + const = f.split('/')[-1].split('_')[0] + fpath = '{}/{}/{}'.format(path, det_name, const) + + os.makedirs(fpath, exist_ok=True) + det_new_files[f] = fpath + + # Set concurrency limitation. + # 50 have been chosen by trial + # Note: This is not the max limitation. + sem = asyncio.Semaphore(50) + all_new_files.append( + '{}/{}'.format(fpath, f.split('/')[-1])) + + await asyncio.gather(*[copy_files(k, v, sem) + for k, v in det_new_files.items()]) + + logging.info('Figures Copied') asyncio.ensure_future( push_figures(cfg['GLOBAL']['git']['repo-local'], @@ -262,19 +285,24 @@ async def server_runner(config): logging.info('All done') return - # TODO delete out-folder - + try: + # Delete out-folder + del_folder(out_folder) + except: + logging.error(str(e)) try: - asyncio.ensure_future(do_action(copy.copy(req_cfg))) + asyncio.ensure_future(do_action(copy.copy(req_cfg), jobs_timeout)) except Exception as e: # actions that fail are only error logged logging.error(str(e)) break arg_parser = argparse.ArgumentParser(description='Start the report service') -arg_parser.add_argument('--config-file', type=str, - default='./report_conf.yaml', +arg_parser.add_argument('--config-file', type=str, + default='./report_conf.yaml', help='config file path with reportservice port. ' 'Default=./report_conf.yaml') +arg_parser.add_argument('--jobs-timeout', type=int, default=3000, + help='Maximum time to wait for jobs. Default=3000') arg_parser.add_argument('--log-file', type=str, default='./report.log', help='The report log file path. Default=./report.log') arg_parser.add_argument('--logging', type=str, default="INFO", @@ -285,8 +313,6 @@ arg_parser.add_argument('--logging', type=str, default="INFO", if __name__ == "__main__": args = vars(arg_parser.parse_args()) conf_file = args["config_file"] - with open(conf_file, "r") as f: - config = yaml.load(f.read()) logfile = args["log_file"] fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' @@ -297,5 +323,5 @@ if __name__ == "__main__": datefmt='%Y-%m-%d %H:%M:%S') loop = asyncio.get_event_loop() - loop.run_until_complete(server_runner(config)) + loop.run_until_complete(server_runner(conf_file, args["jobs_timeout"])) loop.close()