import argparse import asyncio from asyncio.subprocess import PIPE, STDOUT import copy import getpass import glob import logging import os import subprocess import shutil import sqlite3 import sys from time import sleep import urllib.parse from git import Repo, InvalidGitRepositoryError import yaml import zmq import zmq.asyncio from messages import Errors loop = asyncio.get_event_loop() def init_config_repo(config): """ Make sure the configuration repo. is present and up-to-data :param config: the configuration defined in the `config-repo` section """ os.makedirs(config['repo-local'], exist_ok=True) try: # Check if it is a git-repo. repo = Repo(config['repo-local']) except InvalidGitRepositoryError: # clone the repo. repo = Repo.clone_from(config['figures-remote'], config['repo-local']) # make sure it is updated repo.remote().pull() logging.info("Config repo is initialized") async def parse_output(output): joblist = [] for line in output.split('\n'): if 'Submitted' in line: joblist.append(line.split()[2]) logging.info('joblist: {}'.format(joblist)) return joblist async def wait_jobs(joblist): counter = 0 while True: found_jobs = set() output = subprocess.check_output(['squeue']).decode('utf8') for line in output.split("\n"): for job in joblist: if str(job) in line: found_jobs.add(job) if len(found_jobs) == 0: logging.info('Plot modules is done') break await asyncio.sleep(10) counter += 10 async def get_run_base(detector): run_base = ['xfel-calibrate'] + detector['det-type'] for key, item in detector.items(): if key in ['det-type']: continue run_base += ['--{}'.format(str(key))] if not isinstance(item, list): run_base += [str(item)] else: for val in item: run_base += [str(val)] return run_base async def del_folder(fpath): """ Delete temporary folders e.g. the out-folder of new generated plots. :param fpath: the folder path that needs to be deleted. """ cmd = ["rm", "-rf", fpath] await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) logging.info('tmp file has been deleted') async def copy_files(f, path, sem): """ Copying with concurrency limitation :param f: the main file with it's current path. :param path: the path, where f is copied to. :param sem: Semaphore is a variable that controls access to common resources. sem can be give a None value, to indicate a copy of low numbers of file e.g. 1 file. """ if sem: async with sem: cmd = ["rsync", "-a", f, path] await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) else: cmd = ["rsync", "-a", f, path] await asyncio.subprocess.create_subprocess_shell(" ".join(cmd)) async def push_figures(repo_master, addf): """ Upload new figures :param repo_master: the local git-repository. :param addf: the generated figures to be added. """ repo = Repo(repo_master) repo.index.add(addf) repo.index.commit("Add {} new figures".format(len(addf))) #TODO: create an async function for pushing new figures # to avoid blocking the report service. repo.remote().push() logging.info('Pushed to git') async def server_runner(conf_file, jobs_timeout): """ The main server loop. After pulling the latest changes of the DC project, it awaits receiving configurations on how to generate the requested reports. Depending on receiving a conf yaml file or a list of instruments, the server proceeds by generating figures. Figures are copied to the corresponding folder in the DC project and an add-commit-push is performed to update the remote project and build reports that can be accessed through ReadTheDocs. """ with open(conf_file, "r") as f: config = yaml.load(f.read()) # perform git-dir checks and pull the project for updates. init_config_repo(config['GLOBAL']['git']) logging.info("report service port: {}:{}" .format(config['GLOBAL']['report-service']['bind-to'], config['GLOBAL']['report-service']['port'])) context = zmq.asyncio.Context() socket = context.socket(zmq.REP) socket.bind("{}:{}".format(config['GLOBAL']['report-service']['bind-to'], config['GLOBAL']['report-service']['port'])) asyncio.ensure_future(copy_files(conf_file, config['GLOBAL']['git']['figures-local'], sem=None)) while True: response = await socket.recv_pyobj() await socket.send_pyobj('Build DC reports through -->') logging.info("response: {}".format(response)) # Check if response is a list or a dict. # if list, it should either have instrument names or ['all']. # if dict, it should acquires the details of the requested reports # for generation. As it will be used instead of report_conf.yaml # reports config file req_cfg = {} if isinstance(response, dict): req_cfg = response elif isinstance(response, list): if len(response) == 1 and response[0] == 'all': req_cfg = config else: req_cfg['GLOBAL'] = config['GLOBAL'] for instr in response: try: req_cfg[instr] = config[instr] except: logging.error( Errors.INSTRUMENT_NOT_FOUND.format(instr)) continue else: logging.error(Errors.REQUEST_MALFORMED.format(response)) continue logging.info('Requested Configuration: {}'.format(req_cfg)) async def do_action(cfg, jobs_timeout = 3000): logging.info('Run plot production') all_new_files = [] all_new_files.append('{}/{}'.format(config['GLOBAL'] ['git'] ['figures-local'], conf_file.split('/')[-1])) for instr_name, instrument in cfg.items(): if instr_name == "GLOBAL": continue launched_jobs = [] for det_name, det_conf in instrument.items(): logging.info('Process detector: {}'.format(det_name)) logging.debug('Config information: {}'.format(det_conf)) run_base = await get_run_base(det_conf) try: output = await asyncio.create_subprocess_shell( " ".join(run_base), stdout=PIPE, stderr=PIPE) launched_jobs.append(output.communicate()) logging.info('Submission information: {}' .format(run_base)) except Exception as e: logging.error('Submission failed: {}'.format(e)) exit(1) outputs = await asyncio.gather(*launched_jobs) job_list = [] for output in outputs: job_list += await parse_output(output[0].decode('utf8')) try: # make sure to timeout if waiting for jobs # took more than 10 mins. await asyncio.wait_for(wait_jobs(job_list), timeout=jobs_timeout) logging.info('All jobs are finished') except asyncio.TimeoutError: asyncio.ensure_future(del_folder('./tmp')) logging.error('Jobs timeout!') # Copy all plots out_folder = det_conf['out-folder'] path = cfg['GLOBAL']['git']['figures-local'] figures = glob.glob("{}/*png".format(out_folder)) logging.info('Copy figures to: {}'.format(path)) det_new_files = {} for f in figures: const = f.split('/')[-1].split('_')[0] fpath = '{}/{}/{}'.format(path, det_name, const) os.makedirs(fpath, exist_ok=True) det_new_files[f] = fpath # Set concurrency limitation. # 50 have been chosen by trial # Note: This is not the max limitation. sem = asyncio.Semaphore(50) all_new_files.append( '{}/{}'.format(fpath, f.split('/')[-1])) await asyncio.gather(*[copy_files(k, v, sem) for k, v in det_new_files.items()]) logging.info('Figures Copied') asyncio.ensure_future( push_figures(cfg['GLOBAL']['git']['repo-local'], all_new_files)) logging.info('All done') return try: # Delete out-folder del_folder(out_folder) except: logging.error(str(e)) try: asyncio.ensure_future(do_action(copy.copy(req_cfg), jobs_timeout)) except Exception as e: # actions that fail are only error logged logging.error(str(e)) break arg_parser = argparse.ArgumentParser(description='Start the report service') arg_parser.add_argument('--config-file', type=str, default='./report_conf.yaml', help='config file path with reportservice port. ' 'Default=./report_conf.yaml') arg_parser.add_argument('--jobs-timeout', type=int, default=3000, help='Maximum time to wait for jobs. Default=3000') arg_parser.add_argument('--log-file', type=str, default='./report.log', help='The report log file path. Default=./report.log') arg_parser.add_argument('--logging', type=str, default="INFO", help='logging modes: INFO, DEBUG or ERROR. ' 'Default=INFO', choices=['INFO', 'DEBUG', 'ERROR']) if __name__ == "__main__": args = vars(arg_parser.parse_args()) conf_file = args["config_file"] logfile = args["log_file"] fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' logging.basicConfig(filename=logfile, filemode='a+', level=getattr(logging, args['logging']), format='%(levelname)-6s: %(asctime)s %(message)s', datefmt='%Y-%m-%d %H:%M:%S') loop = asyncio.get_event_loop() loop.run_until_complete(server_runner(conf_file, args["jobs_timeout"])) loop.close()