diff --git a/webservice/messages.py b/webservice/messages.py index 3f3255daa3510fe9333024996156344c84dc3369..957ebfe33836474a27ebf82f7bbd56e17acf3caa 100644 --- a/webservice/messages.py +++ b/webservice/messages.py @@ -13,6 +13,7 @@ class Errors: MDC_RESPONSE = "FAILED: Response error from MDC: {}" NOT_CONFIGURED = "FAILED: instrument not configured, please contact det-support@xfel.eu" NOT_SUBMITTED = "FAILED: correction of {} failed during submision, please contact det-support@xfel.eu" + OTHER_ERROR = "FAILED: Error {}, please contact det-support@xfel.eu" class MDC: diff --git a/webservice/webservice.py b/webservice/webservice.py index de19d5a2545ce55ef540d4cb016e908e3c0fefd8..0d1696e53e29fb83971c47cf4784062198834d93 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -3,16 +3,16 @@ import asyncio import copy import getpass import glob +import inspect import json import logging import os import sqlite3 -import traceback import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime from pathlib import Path -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional import yaml import zmq @@ -95,10 +95,9 @@ def init_config_repo(config): logging.info("Config repo is initialized") -async def upload_config(socket, config, yaml, instrument, cycle, proposal): +async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes: """ Upload a new configuration YAML - :param socket: ZMQ socket to send reply on :param config: the configuration defined in the `config-repo` section of the webservice.yaml configuration. :param yaml: the YAML contents to update @@ -128,7 +127,7 @@ async def upload_config(socket, config, yaml, instrument, cycle, proposal): datetime.now().isoformat())) repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) - socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode()) + return Success.UPLOADED_CONFIG.format(cycle, proposal).encode() def merge(source: Dict, destination: Dict) -> Dict: @@ -156,8 +155,8 @@ def merge(source: Dict, destination: Dict) -> Dict: return destination -async def change_config(socket, config, updated_config, karabo_id, instrument, - cycle, proposal, apply=False): +def change_config(config, updated_config, karabo_id, instrument, + cycle, proposal, apply=False) -> bytes: """ Change the configuration of a proposal @@ -166,7 +165,6 @@ async def change_config(socket, config, updated_config, karabo_id, instrument, Changes are committed to git. - :param socket: ZMQ socket to send reply on :param config: repo config as given in YAML config file :param updated_config: a dictionary containing the updated config :param instrument: the instrument to change config for @@ -206,7 +204,7 @@ async def change_config(socket, config, updated_config, karabo_id, instrument, "Update to proposal YAML: {}".format(datetime.now().isoformat())) repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) - socket.send(yaml.dump(new_conf, default_flow_style=False).encode()) + return yaml.dump(new_conf, default_flow_style=False).encode() async def run_proc_async(cmd: List[str]) -> (int, bytes): @@ -262,7 +260,7 @@ async def slurm_job_status(jobid): return "NA", "NA", "NA" -async def query_rid(conn, socket, rid): +def query_rid(conn, rid) -> bytes: c = conn.cursor() c.execute("SELECT * FROM jobs WHERE rid LIKE ?", rid) combined = {} @@ -293,7 +291,7 @@ async def query_rid(conn, socket, rid): msg += "\n".join(statii) if msg == "": msg = 'NA' - socket.send(msg.encode()) + return msg.encode() def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: @@ -350,7 +348,7 @@ async def update_job_db(config): rid, jobid, proposal, run, status, time, _, action = r logging.debug("DB info {}".format(r)) - cflg, cstatus = combined.get(rid, ([], [])) + cflg, cstatus = combined.get((rid, action), ([], [])) if jobid in statii: slstatus, runtime = statii[jobid] query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" @@ -368,17 +366,22 @@ async def update_job_db(config): else: cflg.append("NA") cstatus.append(slstatus) - combined[rid] = cflg, cstatus + combined[rid, action] = cflg, cstatus conn.commit() flg_order = {"R": 2, "A": 1, "NA": 0} dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'} - for rid, value in combined.items(): + for rid, action in combined: if int(rid) == 0: # this job was not submitted from MyMDC continue - flgs, statii = value + flgs, statii = combined[rid, action] # sort by least done status flg = max(flgs, key=lambda i: flg_order[i]) + if flg != 'R': + logging.info( + "Jobs finished - action: %s, run id: %s, status: %s", + action, rid, flg, + ) msg = "\n".join(statii) msg_debug = f"Update MDC {rid}, {msg}" logging.debug(msg_debug.replace('\n', ', ')) @@ -391,10 +394,11 @@ async def update_job_db(config): 'calcat_feedback': msg}} response = mdc.update_dark_run_api(rid, data) if response.status_code != 200: + logging.error("Failed to update MDC for action %s, rid %s", + action, rid) logging.error(Errors.MDC_RESPONSE.format(response)) - except Exception as e: - e = str(e) - logging.error(f"Failure to update job DB: {e}") + except Exception: + logging.error(f"Failure to update job DB", exc_info=True) await asyncio.sleep(time_interval) @@ -416,7 +420,7 @@ async def copy_untouched_files(file_list, out_folder, run): logging.info("Copying {} to {}".format(f, of)) -async def run_action(job_db, cmd, mode, proposal, run, rid): +async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: """ Run action command (CORRECT OR DARK) :param job_db: jobs database @@ -425,7 +429,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): but the command will be logged :param proposal: proposal the command was issued for :param run: run the command was issued for - :param: rid: run id in the MDC + :param rid: run id in the MDC Returns a formatted Success or Error message indicating outcome of the execution. @@ -474,7 +478,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): return Success.START_CORRECTION_SIM.format(proposal, run) -async def wait_on_transfer(rpath, max_tries=300): +async def wait_on_transfer(rpath, max_tries=300) -> bool: """ Wait on data files to be transferred to Maxwell @@ -511,6 +515,34 @@ async def wait_on_transfer(rpath, max_tries=300): await asyncio.sleep(10) +async def wait_transfers( + wait_runs: List[str], in_folder: str, proposal: str +) -> bool: + """Wait for multiple runs to be transferred to Maxwell. + + :param wait_runs: Run numbers to wait for + :param in_folder: Proposal raw directory containing runs + :param proposal: Proposal number + :return: True if all runs transferred, false on timeout + """ + logging.debug("Waiting for: propsal %s, runs %s", proposal, wait_runs) + + # FIXME: this loop should be an asyncio.gather + for runnr in wait_runs: + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + transfer_complete = await wait_on_transfer(rpath) + if not transfer_complete: + logging.error( + Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr) + ) + return False + + logging.info( + "Transfer complete: proposal %s, runs %s", proposal, wait_runs + ) + return True + + def check_files(in_folder: str, runs: List[int], karabo_das: List[str]) -> bool: @@ -553,6 +585,7 @@ async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, response = await shield(loop.run_in_executor(None, mdc.update_dark_run_api, rid, data)) if response.status_code != 200: + logging.error("Failed to update MDC dark report path for run id %s", rid) logging.error(Errors.MDC_RESPONSE.format(response)) @@ -584,441 +617,561 @@ async def update_mdc_status(mdc: MetadataClient, action: str, func = mdc.update_run_api data = {'flg_cal_data_status': flag, 'cal_pipeline_reply': message} - if action == 'dark_request': + elif action == 'dark_request': func = mdc.update_dark_run_api data = {'dark_run': {'flg_status': flag, 'calcat_feedback': message}} + else: + raise ValueError(f"Unexpected action: {action}") loop = get_event_loop() response = await shield(loop.run_in_executor(None, func, rid, data)) if response.status_code != 200: + logging.error("Failed to update MDC status for action %s, run id %s", + action, rid) logging.error(Errors.MDC_RESPONSE.format(response)) -async def server_runner(config, mode): - """ The main server loop - - The main server loop handles remote requests via a ZMQ interface. - - Requests are the form of ZMQ.REQuest and have the format - - command, *params - - where *parms is a string-encoded python list as defined by the - commands. The following commands are currently understood: - - - correct, with parmeters rid, sase, instrument, cycle, proposal, runnr - - where - - :param rid: is the runid within the MDC database - :param sase: is the sase beamline - :param instrument: is the instrument - :param cycle: is the facility cycle - :param proposal: is the proposal id - :param runnr: is the run number in integer form, e.g. without leading - "r" - - This will trigger a correction process to be launched for that run in - the given cycle and proposal. - - - dark_request, with parameters rid, sase, instrument, cycle, proposal, - did, operation_mode, pdu_names, karabo_das, runnr - - where - - :param rid: is the runid within the MDC database - :param sase: is the sase beamline - :param instrument: is the instrument - :param cycle: is the facility cycle - :param proposal: is the proposal id - :param did: is the detector karabo id - :param operation_mode: is the detector's operation mode, as defined in - CalCat - :param pdu_names: physical detector units for each modules - :param karabo_das: the Data Agreggators representing which detector - modules to calibrate - :param runnr: is the run number in integer form, i.e. without leading - "r" - - - upload-yaml, with parameters sase, instrument, cycle, proposal, yaml - - where - - :param sase: is the sase beamline - :param instrument: is the instrument - :param cycle: is the facility cycle - :param proposal: is the proposal id - :param yaml: is url-encoded (quotes and spaces) representation of - new YAML file - - This will create or replace the existing YAML configuration for the - proposal and cycle with the newly sent one, and then push it to the git - configuration repo. - - """ - - init_config_repo(config['config-repo']) - job_db = await init_job_db(config) - mdc = await init_md_client(config) - - context = zmq.asyncio.Context() - auth = zmq.auth.thread.ThreadAuthenticator(context) - if mode == "prod-auth": - auth.start() - auth.allow(config['web-service']['allowed-ips']) +class ActionsServer: + def __init__(self, config, mode, job_db, mdc): + self.config = config + self.mode = mode + self.job_db = job_db + self.mdc = mdc + + # Set up a ZMQ socket to listen for requests + self.zmq_ctx = zmq.asyncio.Context() + auth = zmq.auth.thread.ThreadAuthenticator(self.zmq_ctx) + if mode == "prod-auth": + auth.start() + auth.allow(config['web-service']['allowed-ips']) + + self.socket = self.zmq_ctx.socket(zmq.REP) + self.socket.zap_domain = b'global' + self.socket.bind("{}:{}".format(config['web-service']['bind-to'], + config['web-service']['port'])) + + # __init__ can't be async - this is a workaround + @classmethod + async def ainit(cls, config, mode): + init_config_repo(config['config-repo']) + job_db = await init_job_db(config) + mdc = await init_md_client(config) + return cls(config, mode, job_db, mdc) + + @classmethod + async def launch(cls, config, mode): + server = await cls.ainit(config, mode) + return await server.run() + + async def run(self): + """The main server loop + + The main server loop handles remote requests via a ZMQ interface. + + Requests are the form of ZMQ.REQuest and have the format + + command, *params + + where *parms is a string-encoded python list as defined by the + commands. + """ + while True: + req = await self.socket.recv_multipart() + logging.debug("Raw request data: %r", req) + try: + resp = await self.handle_one_req(req) + except Exception as e: + logging.error("Unexpected error handling request", exc_info=e) + resp = Errors.OTHER_ERROR.format(e).encode() - socket = context.socket(zmq.REP) - socket.zap_domain = b'global' - socket.bind("{}:{}".format(config['web-service']['bind-to'], - config['web-service']['port'])) + logging.debug("Sending response: %r", resp) + await self.socket.send(resp) - while True: - response = await socket.recv_multipart() - if isinstance(response, list) and len(response) == 1: + async def handle_one_req(self, req: List[bytes]) -> bytes: + if len(req) == 1: try: # protect against unparseable requests - response = eval(response[0]) + req = eval(req[0]) except SyntaxError as e: logging.error(str(e)) - socket.send(Errors.REQUEST_FAILED.encode()) - continue + return Errors.REQUEST_FAILED.encode() - if len(response) < 2: # catch parseable but malformed requests - logging.error(Errors.REQUEST_MALFORMED.format(response)) - socket.send(Errors.REQUEST_MALFORMED.format(response).encode()) - continue + if len(req) < 2: # catch parseable but malformed requests + logging.error(Errors.REQUEST_MALFORMED.format(req)) + return Errors.REQUEST_MALFORMED.format(req).encode() - # FIXME: action should be an enum - action, payload = response[0], response[1:] + action, *payload = req - if action not in ['correct', 'dark', 'dark_request', 'query-rid', - 'upload-yaml', 'update_conf']: + if action not in self.accepted_actions: logging.warning(Errors.UNKNOWN_ACTION.format(action)) - socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) - continue + return Errors.UNKNOWN_ACTION.format(action).encode() - logging.debug('{}, {}'.format(action, payload)) + logging.info("Handling request for action %s", action) + logging.debug('Running action %s, payload %r', action, payload) - if action == "query-rid": - rid = payload[0] - await query_rid(job_db, socket, rid) - continue + handler = getattr(self, 'handle_' + action.replace('-', '_')) - async def do_action(action, payload): # FIXME: this needn't be nested - in_folder = None - run_mapping = {} - priority = None # TODO: Investigate argument - - if action == 'update_conf': - updated_config = None - try: - sase, karabo_id, instrument, cycle, proposal, config_yaml, apply = payload # noqa - updated_config = json.loads(config_yaml) - await change_config(socket, config['config-repo'], - updated_config, karabo_id, instrument, - cycle, proposal, - apply.upper() == "TRUE") - except Exception as e: - e = str(e) - err_msg = (f"Failure applying config for {proposal}:" - f" {e}: {updated_config}") - logging.error(err_msg) - logging.error(f"Unexpected error: {traceback.format_exc()}") # noqa - socket.send(yaml.dump(err_msg, - default_flow_style=False).encode()) - - if action in ['dark', 'correct', 'dark_request']: - request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - try: - wait_runs: List[str] = [] - rid, sase, instrument, cycle, proposal, *payload = payload - - if action == 'correct': - runnr, priority = payload - runnr = runnr.strip('r') - wait_runs = [runnr] - - if action == 'dark': - karabo_ids, karabo_das, *runs = payload - - karabo_ids = karabo_ids.split(',') - karabo_das = karabo_das.split(',') - for i, run in enumerate(runs): - erun = eval(run) - if isinstance(erun, (list, tuple)): - typ, runnr = erun - if typ == "reservation": - continue - runnr = runnr.strip('r') - run_mapping[typ] = runnr - wait_runs.append(runnr) - else: - run_mapping['no_mapping_{}'.format(i)] = erun - wait_runs.append(erun) - - if action == 'dark_request': - karabo_id, operation_mode, *payload = payload - payload = eval(','.join(payload)) - pdus, karabo_das, wait_runs = payload - - karabo_das = [val.strip() for val in karabo_das] - wait_runs = [str(val) for val in wait_runs] - - proposal = proposal.strip('p') - proposal = "{:06d}".format(int(proposal)) - - logging.info(f'{action} of {proposal} run {wait_runs} at ' - f'{instrument} is requested. Checking files.') - - # Read calibration configuration from yaml - conf_file = Path(config['config-repo']['local-path'], - cycle, f'{proposal}.yaml') - if not conf_file.exists(): - conf_file = Path(config['config-repo']['local-path'], - "default.yaml") - - with open(conf_file, "r") as f: - pconf_full = yaml.load(f.read(), - Loader=yaml.FullLoader) - - # FIXME: remove once MyMDC sends `dark` action - action_ = 'dark' if action == 'dark_request' else action - data_conf = pconf_full['data-mapping'] - if instrument in pconf_full[action_]: - pconf = pconf_full[action_][instrument] - else: - socket.send(Errors.NOT_CONFIGURED.encode()) - logging.info(f'Instrument {instrument} is unknown') - return + # Verify that requests contains the right number of parameters + sig = inspect.signature(handler) + try: + sig.bind(*payload) + except TypeError: + logging.error( + "Wrong number of arguments for action %s", action, exc_info=True + ) + return Errors.REQUEST_MALFORMED.format(req).encode() + + res = handler(*payload) + if asyncio.iscoroutine(res): + res = await res + return res + + accepted_actions = { + 'correct', 'dark', 'dark_request', 'query-rid', 'upload-yaml', + 'update_conf', + } + + # Handler methods for each available action ------------------------------ + + async def handle_correct( + self, rid, _sase, instrument, cycle, proposal, runnr, priority + ): + """Launch detector correction + + :param rid: is the runid within the MDC database + :param _sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param runnr: is the run number in integer form, e.g. without leading + "r" + + This will trigger a correction process to be launched for that run in + the given cycle and proposal. + """ + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + try: + runnr = runnr.strip('r') - in_folder = config[action_]['in-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal) + proposal = self._normalise_proposal_num(proposal) + pconf_full = self.load_proposal_config(cycle, proposal) - msg = Success.QUEUED.format(proposal, wait_runs) - socket.send(msg.encode()) - logging.debug(msg) + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full['correct']: + pconf = pconf_full['correct'][instrument] + else: + logging.info(f'Instrument {instrument} is unknown') + return Errors.NOT_CONFIGURED.encode() - if action in ['correct', 'dark_request']: - await update_mdc_status(mdc, action, rid, msg) + in_folder = self.config['correct']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = self.config['correct']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + run='r{:04d}'.format(int(runnr)) + ) - except Exception as e: - e = str(e) - msg = Errors.JOB_LAUNCH_FAILED.format(action, e) - logging.error(msg) - socket.send(msg.encode()) + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('correct', e) + logging.error(msg, exc_info=e) + asyncio.ensure_future( + update_mdc_status(self.mdc, 'correct', rid, msg) + ) + return msg.encode() - if action in ['correct', 'dark_request']: - await update_mdc_status(mdc, action, rid, msg) - return + queued_msg = Success.QUEUED.format(proposal, [runnr]) + logging.debug(queued_msg) - # Check if all files for given runs are transferred - all_transfers = [] - - # FIXME: this loop should be an asyncio.gather - for runnr in wait_runs: - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - transfer_complete = await wait_on_transfer(rpath) - all_transfers.append(transfer_complete) - if not transfer_complete: - logging.error( - Errors.TRANSFER_EVAL_FAILED.format(proposal, - runnr)) - if action in ['correct', 'dark_request']: - await update_mdc_status(mdc, action, rid, - MDC.MIGRATION_TIMEOUT) - - if not all(all_transfers): - logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, ','.join(wait_runs))) # noqa + async def _continue(): + """Runs in the background after we reply to the 'correct' request""" + await update_mdc_status(self.mdc, 'correct', rid, queued_msg) + try: + transfer_complete = await wait_transfers( + [runnr], in_folder, proposal + ) + if not transfer_complete: + # Timed out + await update_mdc_status(self.mdc, 'correct', rid, + MDC.MIGRATION_TIMEOUT) return - logging.debug(f"Now doing: {action}") - ts = datetime.now().strftime('%y%m%d_%H%M%S') - if action == 'dark': + rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) + + # Prepare configs for all detectors in run + fl = glob.glob(f"{rpath}/*.h5") + corr_file_list = set() + copy_file_list = set(fl) detectors = {} - out_folder = config[action]['out-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal, - runs="_".join(wait_runs)) + for karabo_id in pconf: + dconfig = data_conf[karabo_id] + # check for files according to mapping in raw run dir. + if any(y in x for x in fl + for y in dconfig['karabo-da']): + for karabo_da in dconfig['karabo-da']: + tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5") + corr_file_list = corr_file_list.union(set(tfl)) + thisconf = copy.copy(dconfig) + if isinstance(pconf[karabo_id], dict): + thisconf.update(copy.copy(pconf[karabo_id])) + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = out_folder + thisconf["karabo-id"] = karabo_id + thisconf["run"] = runnr + if priority: + thisconf["priority"] = str(priority) - # Run over all available detectors - if karabo_ids[0] == 'all': - karabo_ids = list(pconf.keys()) + detectors[karabo_id] = thisconf + copy_file_list = copy_file_list.difference(corr_file_list) + asyncio.ensure_future(copy_untouched_files(copy_file_list, + out_folder, + runnr)) + except Exception as corr_e: + logging.error(f"Error during correction", exc_info=corr_e) + await update_mdc_status(self.mdc, 'correct', rid, + Errors.REQUEST_FAILED) + return + + if len(detectors) == 0: + msg = Errors.NOTHING_TO_DO.format(rpath) + logging.warning(msg) + await update_mdc_status(self.mdc, 'correct', rid, msg) + return + + ret, _ = await self.launch_jobs( + [runnr], rid, detectors, 'correct', instrument, cycle, proposal, + request_time, + ) + await update_mdc_status(self.mdc, 'correct', rid, ret) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + return queued_msg.encode() + + async def handle_dark( + self, rid, _sase, instrument, cycle, proposal, karabo_ids, + karabo_das, *runs + ): + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + try: + run_mapping = {} + wait_runs = [] + + karabo_ids = karabo_ids.split(',') + karabo_das = karabo_das.split(',') + + for i, run in enumerate(runs): + erun = eval(run) + if isinstance(erun, (list, tuple)): + typ, runnr = erun + if typ == "reservation": + continue + runnr = runnr.strip('r') + run_mapping[typ] = runnr + wait_runs.append(runnr) + else: + run_mapping['no_mapping_{}'.format(i)] = erun + wait_runs.append(erun) - # Prepare configs for all requested detectors - for karabo_id in karabo_ids: + proposal = self._normalise_proposal_num(proposal) + pconf_full = self.load_proposal_config(cycle, proposal) - # use selected karabo_das - if karabo_das[0] == 'all': - karabo_da = data_conf[karabo_id]["karabo-da"] + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full['dark']: + pconf = pconf_full['dark'][instrument] + else: + logging.info(f'Instrument {instrument} is unknown') + return Errors.NOT_CONFIGURED.encode() - # Check if any files for given karabo-das exists - if check_files(in_folder, wait_runs, karabo_da): - thisconf = copy.copy(data_conf[karabo_id]) + # Run over all available detectors + if karabo_ids[0] == 'all': + karabo_ids = list(pconf.keys()) - if (karabo_id in pconf and - isinstance(pconf[karabo_id], dict)): - thisconf.update(copy.copy(pconf[karabo_id])) + in_folder = self.config['dark']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = self.config['dark']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + runs="_".join(wait_runs)) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = '/'.join((out_folder, - karabo_id.replace('-', '_'))) # noqa FIXME Make use of pathlib - thisconf["karabo-id"] = karabo_id - thisconf["karabo-da"] = karabo_da + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('dark', e) + logging.error(msg, exc_info=e) + return msg.encode() + + async def _continue(): + """Runs in the background after we reply to the 'dark' request""" + transfer_complete = await wait_transfers( + wait_runs, in_folder, proposal + ) + if not transfer_complete: + return # Timed out + + detectors = {} + + # Prepare configs for all requested detectors + for karabo_id in karabo_ids: + + # use selected karabo_das + karabo_das_for_id = karabo_das + if karabo_das[0] == 'all': + karabo_das_for_id = data_conf[karabo_id]["karabo-da"] + + # Check if any files for given karabo-das exists + if check_files(in_folder, wait_runs, karabo_das_for_id): + thisconf = copy.copy(data_conf[karabo_id]) + + if (karabo_id in pconf and + isinstance(pconf[karabo_id], dict)): + thisconf.update(copy.copy(pconf[karabo_id])) + + thisconf["in-folder"] = in_folder + thisconf["out-folder"] = os.path.join( + out_folder, karabo_id.replace('-', '_') + ) + thisconf["karabo-id"] = karabo_id + thisconf["karabo-da"] = karabo_das_for_id + + run_config = [] + for typ, run in run_mapping.items(): + if "no_mapping" in typ: + run_config.append(run) + else: + thisconf[typ] = run + if len(run_config): + thisconf["runs"] = ",".join(run_config) + + detectors[karabo_id] = thisconf + else: + logging.warning( + "File list for %s in proposal %s runs %s is empty", + karabo_id, proposal, wait_runs + ) + if len(detectors) == 0: + logging.warning(Errors.NOTHING_TO_DO.format(wait_runs)) + return + + await self.launch_jobs( + wait_runs, 0, detectors, 'dark', instrument, cycle, proposal, + request_time, + ) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + msg = Success.QUEUED.format(proposal, wait_runs) + logging.debug(msg) + return msg.encode() + + async def handle_dark_request( + self, rid, _sase, instrument, cycle, proposal, karabo_id, + operation_mode, *extra + ): + """Launch dark run processing + + :param rid: is the runid within the MDC database + :param _sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param karabo_id: is the detector karabo id + :param operation_mode: is the detector's operation mode, as defined in + CalCat + :param pdu_names: physical detector units for each modules + :param karabo_das: the Data Agreggators representing which detector + modules to calibrate + :param runnr: is the run number in integer form, i.e. without leading + "r" + """ + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + try: + pdus, karabo_das, wait_runs = eval(','.join(extra)) - run_config = [] - for typ, run in run_mapping.items(): - if "no_mapping" in typ: - run_config.append(run) - else: - thisconf[typ] = run - if len(run_config): - thisconf["runs"] = ",".join(run_config) + karabo_das = [val.strip() for val in karabo_das] + runs = [str(val) for val in wait_runs] - detectors[karabo_id] = thisconf - else: - logging.warning("File list for {} at {} is empty" - .format(karabo_id, - "{}/*.h5".format(rpath))) - - if len(detectors) == 0: - logging.warning(Errors.NOTHING_TO_DO.format(rpath)) - - if action == 'correct': - try: - runnr = wait_runs[0] - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - - out_folder = config[action]['out-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal, - run='r{:04d}'.format(int(runnr))) - - # Prepare configs for all detectors in run - fl = glob.glob(f"{rpath}/*.h5") - corr_file_list = set() - copy_file_list = set(fl) - detectors = {} - for karabo_id in pconf: - dconfig = data_conf[karabo_id] - # check for files according to mapping in raw run dir. - if any(y in x for x in fl - for y in dconfig['karabo-da']): - for karabo_da in dconfig['karabo-da']: - tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5") - corr_file_list = corr_file_list.union(set(tfl)) - thisconf = copy.copy(dconfig) - if isinstance(pconf[karabo_id], dict): - thisconf.update(copy.copy(pconf[karabo_id])) - thisconf["in-folder"] = in_folder - thisconf["out-folder"] = out_folder - thisconf["karabo-id"] = karabo_id - thisconf["run"] = runnr - if priority: - thisconf["priority"] = str(priority) - - detectors[karabo_id] = thisconf - copy_file_list = copy_file_list.difference(corr_file_list) - asyncio.ensure_future(copy_untouched_files(copy_file_list, - out_folder, - runnr)) - if len(detectors) == 0: - logging.warning(Errors.NOTHING_TO_DO.format(rpath)) - await update_mdc_status(mdc, action, rid, - MDC.NOTHING_TO_DO) - return - - except Exception as corr_e: - logging.error(f"Error during correction: {str(corr_e)}") - await update_mdc_status(mdc, action, rid, - Errors.REQUEST_FAILED) - - if action == 'dark_request': - runs = [str(r) for r in wait_runs] - - # Notebooks require one or three runs, depending on the - # detector type and operation mode. - triple = any(det in karabo_id for det in - ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"]) - - if triple and len(runs) == 1: - runs_dict = {'run-high': runs[0], - 'run-med': '0', - 'run-low': '0'} - elif triple and len(runs) == 3: - runs_dict = {'run-high': runs[0], - 'run-med': runs[1], - 'run-low': runs[2]} - else: # single - runs_dict = {'run': runs[0]} - - out_folder = config['dark']['out-folder'].format( - instrument=instrument, cycle=cycle, proposal=proposal, - runs='_'.join(runs)) - out_folder = str(Path(out_folder, - karabo_id.replace('-', '_'))) - - # We assume that MyMDC does not allow dark request if the data - # is not migrated, thus skipping some validation here. - thisconf = copy.copy(data_conf[karabo_id]) - - if (karabo_id in pconf - and isinstance(pconf[karabo_id], dict)): - thisconf.update(copy.copy(pconf[karabo_id])) - - thisconf['in-folder'] = in_folder - thisconf['out-folder'] = out_folder - thisconf['karabo-id'] = karabo_id - thisconf['karabo-da'] = karabo_das - thisconf['operation-mode'] = operation_mode - - thisconf.update(runs_dict) - - detectors = {karabo_id: thisconf} - - if action in ['correct', 'dark', 'dark_request']: - # run xfel_calibrate - action_ = 'dark' if action == 'dark_request' else action - for karabo_id, dconfig in detectors.items(): - detector = dconfig['detector-type'] - del dconfig['detector-type'] - cmd = config[action_]['cmd'].format( - detector=detector, - sched_prio=str(config[action_]['sched-prio']), - action=action_, instrument=instrument, - cycle=cycle, proposal=proposal, - runs="_".join([f"r{r}" for r in wait_runs]), - time_stamp=ts, - det_instance=karabo_id, - request_time=request_time - ).split() - - cmd = parse_config(cmd, dconfig) - - rid = rid if action in ['correct', 'dark_request'] else 0 - ret = await run_action(job_db, cmd, mode, - proposal, runnr, rid) - - if action == 'correct': - await update_mdc_status(mdc, action, rid, ret) - if action == 'dark_request': - await update_mdc_status(mdc, action, rid, ret) - report_idx = cmd.index('--report-to') + 1 - report = cmd[report_idx] + '.pdf' - await update_darks_paths(mdc, rid, in_folder, - out_folder, report) - - # TODO: moving this block further up reduces the need of so - # many nested ifs. Move up and return directly - if action == 'upload-yaml': - sase, instrument, cycle, proposal, this_yaml = payload - this_yaml = urllib.parse.unquote_plus(this_yaml) - await upload_config(socket, config['config-repo'], this_yaml, - instrument, cycle, proposal) + proposal = self._normalise_proposal_num(proposal) - try: - asyncio.ensure_future( - do_action(copy.copy(action), copy.copy(payload))) - except Exception as e: # actions that fail are only error logged - logging.error(str(e)) + pconf_full = self.load_proposal_config(cycle, proposal) + + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full['dark']: + pconf = pconf_full['dark'][instrument] + else: + logging.info(f'Instrument {instrument} is unknown') + return Errors.NOT_CONFIGURED.encode() + in_folder = self.config['dark']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + out_folder = self.config['dark']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + runs='_'.join(runs)) + out_folder = str(Path(out_folder, karabo_id.replace('-', '_'))) + + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('dark_request', e) + logging.error(msg, exc_info=e) + asyncio.ensure_future( + update_mdc_status(self.mdc, 'dark_request', rid, msg) + ) + return msg.encode() + + queued_msg = Success.QUEUED.format(proposal, runs) + logging.debug(queued_msg) + + async def _continue(): + """Runs in the background after we reply to the 'dark_request' request""" + await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) + + transfer_complete = await wait_transfers( + runs, in_folder, proposal + ) + if not transfer_complete: + # Timed out + await update_mdc_status( + self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT + ) + return + + # Notebooks require one or three runs, depending on the + # detector type and operation mode. + triple = any(det in karabo_id for det in + ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"]) + + if triple and len(runs) == 1: + runs_dict = {'run-high': runs[0], + 'run-med': '0', + 'run-low': '0'} + elif triple and len(runs) == 3: + runs_dict = {'run-high': runs[0], + 'run-med': runs[1], + 'run-low': runs[2]} + else: # single + runs_dict = {'run': runs[0]} + + # We assume that MyMDC does not allow dark request if the data + # is not migrated, thus skipping some validation here. + thisconf = copy.copy(data_conf[karabo_id]) + + if (karabo_id in pconf + and isinstance(pconf[karabo_id], dict)): + thisconf.update(copy.copy(pconf[karabo_id])) + + thisconf['in-folder'] = in_folder + thisconf['out-folder'] = out_folder + thisconf['karabo-id'] = karabo_id + thisconf['karabo-da'] = karabo_das + thisconf['operation-mode'] = operation_mode + + thisconf.update(runs_dict) + + detectors = {karabo_id: thisconf} + + ret, report_path = await self.launch_jobs( + runs, rid, detectors, 'dark', instrument, cycle, proposal, + request_time + ) + await update_mdc_status(self.mdc, 'dark_request', rid, ret) + if report_path is None: + logging.warning("Failed to identify report path for dark_request") + else: + await update_darks_paths(self.mdc, rid, in_folder, + out_folder, report_path) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + return queued_msg.encode() + + def handle_query_rid(self, rid): + return query_rid(self.job_db, rid) + + def handle_upload_yaml(self, _sase, instrument, cycle, proposal, this_yaml): + """Reconfigure detector calibration for the specified proposal + + :param _sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param yaml: is url-encoded (quotes and spaces) representation of + new YAML file + + This will create or replace the existing YAML configuration for the + proposal and cycle with the newly sent one, and then push it to the git + configuration repo. + """ + this_yaml = urllib.parse.unquote_plus(this_yaml) + return upload_config( + self.config['config-repo'], this_yaml, instrument, cycle, proposal + ) + + async def handle_update_conf( + self, sase, karabo_id, instrument, cycle, proposal, config_yaml, + apply + ): + updated_config = None + try: + updated_config = json.loads(config_yaml) + return change_config(self.config['config-repo'], + updated_config, karabo_id, instrument, + cycle, proposal, + apply.upper() == "TRUE") + except Exception as e: + err_msg = (f"Failure applying config for {proposal}:" + f" {e}: {updated_config}") + logging.error(err_msg, exc_info=e) + return yaml.dump(err_msg, default_flow_style=False).encode() + + # Helper methods for handlers --------------------------------------------- + + @staticmethod + def _normalise_proposal_num(p: str) -> str: + return "{:06d}".format(int(p.strip('p'))) + + def load_proposal_config(self, cycle, proposal) -> Dict: + # Read calibration configuration from yaml + conf_file = Path( + self.config['config-repo']['local-path'], cycle, f'{proposal}.yaml' + ) + if not conf_file.exists(): + conf_file = Path( + self.config['config-repo']['local-path'], "default.yaml" + ) + + logging.debug("Loading config for cycle %s, proposal %s from %s", + cycle, proposal, conf_file) + + with open(conf_file, "r") as f: + return yaml.load(f.read(), Loader=yaml.FullLoader) + + async def launch_jobs( + self, run_nrs, rid, detectors, action, instrument, cycle, proposal, + request_time + ) -> (str, Optional[str]): + # run xfel_calibrate + for karabo_id, dconfig in detectors.items(): + detector = dconfig['detector-type'] + del dconfig['detector-type'] + cmd = self.config[action]['cmd'].format( + detector=detector, + sched_prio=str(self.config[action]['sched-prio']), + action=action, instrument=instrument, + cycle=cycle, proposal=proposal, + runs="_".join([f"r{r}" for r in run_nrs]), + time_stamp=datetime.now().strftime('%y%m%d_%H%M%S'), + det_instance=karabo_id, + request_time=request_time + ).split() + + cmd = parse_config(cmd, dconfig) + + ret = await run_action(self.job_db, cmd, self.mode, + proposal, run_nrs[-1], rid) + + if '--report-to' in cmd[:-1]: + report_idx = cmd.index('--report-to') + 1 + report = cmd[report_idx] + '.pdf' + else: + report = None + return ret, report parser = argparse.ArgumentParser( description='Start the calibration webservice') @@ -1028,7 +1181,7 @@ parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod']) parser.add_argument('--logging', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR']) -if __name__ == "__main__": +def main(): args = vars(parser.parse_args()) conf_file = args["config_file"] with open(conf_file, "r") as f: @@ -1041,5 +1194,9 @@ if __name__ == "__main__": format=fmt) loop = asyncio.get_event_loop() loop.create_task(update_job_db(config)) - loop.run_until_complete(server_runner(config, mode)) + loop.run_until_complete(ActionsServer.launch(config, mode)) loop.close() + + +if __name__ == "__main__": + main()