import argparse import ast import asyncio import copy import glob import inspect import json import locale import logging import os import shlex import sqlite3 import sys import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union import requests import yaml import zmq import zmq.asyncio import zmq.auth.thread from git import InvalidGitRepositoryError, Repo from metadata_client.metadata_client import MetadataClient try: from .config import webservice as config from .messages import MDC, Errors, MigrationError, Success except ImportError: from config import webservice as config from messages import MDC, Errors, MigrationError, Success def init_job_db(config): """ Initialize the sqlite job database A new database is created if no pre-existing one is present. A single table is created: jobs, which has columns: rid, id, proposal, run, flg, status :param config: the configuration parsed from the webservice YAML config :return: a sqlite3 connection instance to the database """ logging.info("Initializing database") conn = sqlite3.connect(config['web-service']['job-db']) conn.execute("PRAGMA foreign_keys = ON") conn.executescript(""" CREATE TABLE IF NOT EXISTS requests( req_id INTEGER PRIMARY KEY, mymdc_id, proposal TEXT, run INTEGER, action, timestamp ); CREATE TABLE IF NOT EXISTS executions( exec_id INTEGER PRIMARY KEY, req_id REFERENCES requests(req_id), command TEXT, det_type, karabo_id, success ); CREATE INDEX IF NOT EXISTS exec_by_req ON executions(req_id); CREATE TABLE IF NOT EXISTS slurm_jobs( job_id INTEGER PRIMARY KEY, exec_id REFERENCES executions(exec_id), status, finished, elapsed ); CREATE INDEX IF NOT EXISTS job_by_exec ON slurm_jobs(exec_id); CREATE INDEX IF NOT EXISTS job_by_finished ON slurm_jobs(finished); """) conn.row_factory = sqlite3.Row return conn def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient: """Initialize an MDC client connection. :param config: the configuration parsed from the webservice YAML config :return: an MDC client connection """ mdconf = config['metadata-client'] client_conn = MetadataClient(client_id=mdconf['user-id'], client_secret=mdconf['user-secret'], user_email=mdconf['user-email'], token_url=mdconf['token-url'], refresh_url=mdconf['refresh-url'], auth_url=mdconf['auth-url'], scope=mdconf['scope'], base_api_url=mdconf['base-api-url']) return client_conn def init_config_repo(config): """ Make sure the configuration repo is present and up-to-data :param config: the configuration defined in the `config-repo` section """ # FIXME: keep this as func, but call this with asyncio.thread os.makedirs(config['local-path'], exist_ok=True) # check if it is a repo try: repo = Repo(config['local-path']) except InvalidGitRepositoryError: repo = Repo.clone_from(config['url'], config['local-path']) try: repo.remote().pull() except Exception: pass logging.info("Config repo is initialized") async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes: """ Upload a new configuration YAML :param config: the configuration defined in the `config-repo` section of the webservice.yaml configuration. :param yaml: the YAML contents to update :param instrument: instrument for which the update is for :param cycle: the facility cylce the update is for :param proposal: the proposal the update is for The YAML contents will be placed into a file at {config.local-path}/{cycle}/{proposal}.yaml If it exists it is overwritten and then the new version is pushed to the configuration git repo. """ # FIXME: keep this as func, but call this with asyncio.thread repo = Repo(config['local-path']) # assure we are on most current version repo.remote().pull() prop_dir = os.path.join(repo.working_tree_dir, cycle) os.makedirs(prop_dir, exist_ok=True) with open("{}/{}.yaml".format(prop_dir, proposal), "w") as f: f.write(yaml) fpath = "{}/{}.yaml".format(prop_dir, proposal) repo.index.add([fpath]) repo.index.commit( "Update to proposal p{}: {}".format(proposal, datetime.now().isoformat())) repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) return Success.UPLOADED_CONFIG.format(cycle, proposal).encode() def merge(source: Dict, destination: Dict) -> Dict: """Deep merge two dictionaries. :param source: source dictionary to merge into destination :param destination: destination dictionary which is being merged in :return: the updated destination dictionary Taken from: https://stackoverflow.com/questions/20656135/ python-deep-merge-dictionary-data """ for key, value in source.items(): if isinstance(value, dict): # check if destination (e.g. karabo-id) has none value if key in destination.keys() and destination[key] is None: destination[key] = {} # get node or create one node = destination.setdefault(key, {}) merge(value, node) else: destination[key] = value return destination def change_config(config, updated_config, instrument, cycle, proposal, apply=False) -> bytes: """ Change the configuration of a proposal If no proposal specific configuration yet exists, one is first created based on the default configuration of the proposal Changes are committed to git. :param config: repo config as given in YAML config file :param updated_config: a dictionary containing the updated config :param instrument: the instrument to change the config for :param cycle: the cycle to change config for :param proposal: the proposal to change config for :param apply: set to True to actually commit a change, otherwise a dry-run is performed :return: The updated config to the requesting zmq socket """ repo = Repo(config['local-path']) repo.remote().pull() # A cycle directory is created, if doesn't exists. cyc_dir = Path(repo.working_tree_dir, cycle) cyc_dir.mkdir(exist_ok=True) fpath = Path(cyc_dir, f"{int(proposal):06d}.yaml") # In case cycle/proposal.yaml doesn't exist, # new file is created based on default.yaml if not fpath.exists(): with open(f"{repo.working_tree_dir}/default.yaml", "r") as f: defconf = yaml.safe_load(f) subconf = {} # Propsal.yaml should have dark, correct and data-mapping keys # with all detector dictionaries of the changed instrument. for action in ["dark", "correct"]: subconf[action] = {} subconf[action][instrument] = defconf[action][instrument] # Copy data-mapping for all detectors of an instrument. subconf["data-mapping"] = {} for k_id in defconf["dark"][instrument].keys(): print("karabo-id", k_id) subconf["data-mapping"][k_id] = defconf["data-mapping"][k_id] with open(fpath, "w") as wf: wf.write(yaml.safe_dump(subconf, default_flow_style=False)) new_conf = None with open(fpath, "r") as rf: existing_conf = yaml.safe_load(rf) new_conf = merge(updated_config, existing_conf) if apply: # Apply updated configuration to the proposal.yaml # and push it to the calibration_configurations remote reporsitory. with open(fpath, "w") as wf: wf.write(yaml.safe_dump(new_conf, default_flow_style=False)) repo.index.add([str(fpath)]) repo.index.commit( f"Update to proposal YAML: {datetime.now().isoformat()}") repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) return yaml.safe_dump(new_conf, default_flow_style=False).encode() async def run_proc_async(cmd: List[str]) -> Tuple[Optional[int], bytes, bytes]: """Run a subprocess to completion using asyncio, capturing stdout Returns the numeric exit code, stdout and stderr (bytes) """ proc = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() return proc.returncode, stdout, stderr def query_rid(conn, rid) -> bytes: c = conn.execute( "SELECT job_id, status FROM slurm_jobs " "INNER JOIN executions USING (exec_id) " "INNER JOIN requests USING (req_id) " "WHERE mymdc_id = ?", (rid,) ) statuses = [] for job_id, status in c.fetchall(): logging.debug("Job %s has status %s", job_id, status) statuses.append(status) if statuses: return "\n".join(statuses).encode() return b'NA' def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: """Convert a dictionary to a list of arguments. Values that are not strings will be cast. Lists will be converted to several strings following their `--key` flag. Booleans will be converted to a `--key` flag, where `key` is the dictionary key. """ for key, value in config.items(): if ' ' in key or (isinstance(value, str) and ' ' in value): raise ValueError('Spaces are not allowed', key, value) if isinstance(value, list): cmd.append(f"--{key}") cmd += [str(v) for v in value] elif isinstance(value, bool): if value: cmd += ["--{}".format(key)] else: if value in ['""', "''"]: value = "" cmd += ["--{}".format(key), str(value)] return cmd async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str: """Run action command (CORRECT or DARK). :param job_db: jobs database :param cmd: to run, should be a in list for as expected by subprocess.run :param mode: "prod" or "sim", in the latter case nothing will be executed but the command will be logged :param proposal: proposal the command was issued for :param run: run the command was issued for :param exec_id: Execution ID in the local jobs database Returns a formatted Success or Error message indicating outcome of the execution. """ if mode == "prod": logging.info(" ".join(cmd)) retcode, stdout, stderr = await run_proc_async(cmd) if retcode != 0: logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, retcode)) logging.error(f"Failed process stdout:\n%s\nFailed process stderr:\n%s", stdout.decode(errors='replace'), stderr.decode(errors='replace')) return Errors.JOB_LAUNCH_FAILED.format(cmd, retcode) if "DARK" in cmd: message = Success.START_CHAR.format(proposal, run) else: message = Success.START_CORRECTION.format(proposal, run) # Save submitted jobs to persistent database. c = job_db.cursor() # FIXME: asyncio rstr = stdout.decode() for r in rstr.split("\n"): if "Submitted the following SLURM jobs:" in r: _, jobids = r.split(":") jobs = [] for jobid in jobids.split(','): jobs.append((int(jobid.strip()), exec_id)) c.executemany( "INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)", jobs ) job_db.commit() else: # mode == "sim" if "DARK" in cmd: message = Success.START_CHAR_SIM.format(proposal, run) else: message = Success.START_CORRECTION_SIM.format(proposal, run) logging.info(message) logging.info((" ".join(cmd)).replace(',', '').replace("'", "")) return message async def wait_on_transfer( mdc, run: int, proposal: str, run_dir: Path, max_tries: int = 300, sleep_completion: int = 10, ) -> Optional[List[str]]: """Query MyMDC to get run migration status. This coro queries MyMDC to get the storage spaces where the data is available from (known as repositories.) If the data is available on offline or dCache storage, this coro will return. Else, it will loop until available. If the data is not avalailable after all retries, a MigrationError will be raised. A ValueError can also be raised if a repository is not supported in this webservice. :param run: the run id :param proposal: the proposal id :param max_tries: maximum number of tries to check for migration completed :param sleep_completion: sleep time in seconds between checks :return repos: a list of repositories in which the data is available :raise MigrationError: when the data is not on offline or dCache within the maximum max_tries * sleep_completion time :raise ValueError: if the reply from MyMDC is not 200 ok or no data was returned. """ loop = get_event_loop() for iteration in range(max_tries): response = await loop.run_in_executor( None, mdc.get_runs_by_proposal_number_api, proposal, run ) if response.status_code != 200: raise ValueError( "FAILED: MyMDC replied with " f"{response.status_code}: {response.reason}" ) if not response.json(): raise ValueError( "FAILED: MyMDC replied empty response for " f"proposal {proposal}, run {run}" ) run_details, = response.json()['runs'] repositories = list(run_details['repositories'].keys()) # List of locations where the data is stored, values are: # {"XFEL_GPFS_ONLINE_SASE_X", "XFEL_GPFS_OFFLINE_RAW_CC", "DESY_DCACHE_RAW_CC"} # # where `X` is the SASE number logging.debug(f"{run_dir=}, {run_dir.resolve()=}, {repositories=}") # Resolving is done here in case the symlink target changed during the loop run_dir = run_dir.resolve() required_repository = None if run_dir.parts[1:4] == ("pnfs", "xfel.eu", "exfel"): required_repository = "DESY_DCACHE_RAW" elif run_dir.parts[1:4] == ("gpfs", "exfel", "d"): required_repository = "XFEL_GPFS_OFFLINE" else: logging.warning( f"Proposal {proposal} run {run} resolved path is not relative " f"to `/pnfs/xfel.eu/exfel` or `/gpfs/exfel/d`: {run_dir=}" ) return logging.debug(f"{required_repository=}") if any(required_repository in r for r in repositories): return repositories logging.info( f"Proposal {proposal} run {run} not migrated yet. " f"Will try again ({iteration}/{max_tries})" ) await asyncio.sleep(sleep_completion) raise MigrationError.timeout( run, proposal, f"{max_tries*sleep_completion}s" ) async def wait_transfers( mdc: MetadataClient, runs: List[str], proposal: str, cycle: int, instrument: str, ) -> bool: """Wait for multiple runs to be transferred to Maxwell. :param runs: Run numbers to wait for :param proposal: Proposal number :return: True if all runs transferred, false on timeout """ logging.debug(f"Waiting for: proposal {proposal}, runs {runs}") in_folder = Path( config['correct']['in-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal ) ) coros = [] for run in runs: run_dir = (in_folder / f"r{int(run):04d}") coros.append(wait_on_transfer(mdc, run, proposal, run_dir)) try: await asyncio.gather(*coros) except (MigrationError, ValueError): logging.error( Errors.TRANSFER_EVAL_FAILED.format(proposal, runs), exc_info=True ) return False logging.info("Transfer complete: proposal %s, runs %s", proposal, runs) return True def check_files(in_folder: str, runs: List[int], karabo_das: List[str]) -> bool: """Check if files for given karabo-das exists for given runs :param in_folder: Input folder :param runs: List of runs :param karabo_das: List of karabo-das :return: True if files are there """ files_exists = True for runnr in runs: rpath = Path(in_folder, 'r{:04d}'.format(int(runnr))) fl = rpath.glob('*.h5') if not any(da in match.name for match in fl for da in karabo_das): files_exists = False return files_exists async def get_slurm_partition(mdc: MetadataClient, action: str, proposal_number: Union[int, str]) -> str: """Check MyMDC for the proposal status and select the appropriate slurm partition. The partition is either upex-high (for darks) or upex-middle (for corrections) if the proposal is 'R'eady or 'A'ctive. In other cases, the jobs default to the exfel partition. :param mdc: an authenticated MyMDC client :param action: the type of action ('correct' or 'dark') :param proposal_number: the proposal number :return: 'exfel' on closed proposals, 'upex-high' if dark request on active proposals, 'upex-middle' if correct request on active proposals. """ # See # https://git.xfel.eu/ITDM/metadata_catalog/-/blob/develop/app/models/proposal.rb # for possible proposals states. loop = get_event_loop() response = await shield(loop.run_in_executor(None, mdc.get_proposal_by_number_api, proposal_number)) if response.status_code != 200: logging.error(f'Failed to check MDC for proposal "{proposal_number}" ' 'status. ASSUMING CLOSED') logging.error(Errors.MDC_RESPONSE.format(response)) partition = 'exfel' status = response.json().get('flg_beamtime_status', 'whoopsie') if status in ['R', 'A']: partition = 'upex-high' if action == 'dark' else 'upex-middle' logging.debug(f"Using {partition} for {proposal_number} because {status}") return partition async def get_slurm_nice(partition: str, instrument: str, cycle: Union[int, str], job_penalty: int = 2, commissioning_penalty: int = 1250) -> int: """Compute priority adjustment based on cycle and number of running jobs. The nice value is computed with base_penalty + job_penalty * num_jobs**2 base_penalty is 0 for user proposals and commissioning_penalty for commissioning proposals. The number of jobs is computed by calling `squeue` and counting based on job name. The default penalty values give commissioning proposals a penalty of 25 running jobs. :param partition: Partition to run jobs in. :param instrument: Instrument to run jobs for. :param cycle: Cycle of proposal to run jobs for. :param job_penalty: Scaling factor per job, 2 by default. :param commissioning_penalty: Base penalty for commissioning, 1250 by default. :return: Nice value to be passed to sbatch --nice """ if partition == 'exfel': return 0 # Don't apply degressive priority on exfel. # List all names for jobs running in the specified partition. returncode, job_names, stderr = await run_proc_async( ['squeue', '-h', '-o', '%.20j', '-p', partition, '--me']) if returncode != 0: logging.error(f'Non-zero return code {returncode} from ' f'`squeue` upon counting number of jobs') logging.warning(f"{stderr=}") return 0 # Fallback if something went wrong. # Base value depending on proposal type using cycle, assuming that # user proposals follow the pattern xxxx0y, while different kinds of # commissioning proposals use xxxx2y or xxxx3y. base_nice = 0 if str(cycle)[4] == '0' else commissioning_penalty # Count number of jobs num_jobs = sum((1 for job in job_names.decode('ascii').split('\n') if f'correct_{instrument}' in job)) return base_nice + num_jobs**2 * job_penalty async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, out_path: str, report_path: str): """Update data paths in MyMDC to provide Globus access DarkRequests have the capability to provide a link to the report via Globus. For this, the absolute path to the report needs to be provided. Additionally, the input path to the raw data, as well as the output path to the processed data can be specified. :param mdc: an authenticated MyMDC client :param rid: the DarkRequest id to update :param in_path: the absolute path to the raw data being calibrated :param out_path: the absolute path to the resulting processed data :param report_path: the absolute path to the PDF report """ data = {'dark_run': {'output_path': out_path, 'input_path': in_path, 'report_url': report_path}} loop = get_event_loop() response = await shield(loop.run_in_executor(None, mdc.update_dark_run_api, rid, data)) if response.status_code != 200: logging.error("Failed to update MDC dark report path for run id %s", rid) logging.error(Errors.MDC_RESPONSE.format(response)) async def update_mdc_status(mdc: MetadataClient, action: str, rid: int, message: str): """Update MyMDC statuses The message content and destination depend on the Action type. A message to MyMDC expects a status flag, which differ between Correction and DarkRequest. Correction can have NA (Not Available), R(unning), A(vailable) DarkRequest can have F(inished), E(rror), R(equested), IP (In Progress), T(imeout) The flag is extracted from the message. Further informations are available at: https://git.xfel.eu/gitlab/detectors/pycalibration/wikis/MyMDC-Communication """ if message.split(':')[0] in ('FAILED', 'WARN'): # Errors flag = 'NA' if action == 'correct' else 'E' elif message.split(':')[0] == 'SUCCESS': # Success flag = 'R' if action == 'correct' else 'IP' if 'Uploaded' in message or 'Finished' in message: flag = 'A' if action == 'correct' else 'F' else: # MDC Timeout flag = 'NA' if action == 'correct' else 'T' if action == 'correct': func = mdc.update_run_api data = {'flg_cal_data_status': flag, 'cal_pipeline_reply': message} elif action == 'dark_request': func = mdc.update_dark_run_api data = {'dark_run': {'flg_status': flag, 'calcat_feedback': message}} else: raise ValueError(f"Unexpected action: {action}") loop = get_event_loop() response = await shield(loop.run_in_executor(None, func, rid, data)) if response.status_code != 200: logging.error("Failed to update MDC status for action %s, run id %s", action, rid) logging.error(Errors.MDC_RESPONSE.format(response)) def _orca_passthrough( proposal_number = None, runs = None, action = None, route = "execute", **kwargs, ): """ Passes through requests received by the webservice to Orca for use during the transition to the new webservice Due to network restrictions on Maxwell, this sends post requests to localhost on port 42751 (a random port number), so either Orca should be running locally or a ssh tunnel should be set up with `ssh -L 42751:localhost:42751 TARGET-SERVER -N &` """ try: base_url = "http://localhost" port = "42751" args = [] args.append(f"action={action}") if action else None args.append(f"runs={','.join(str(r) for r in runs)}") if runs else None args.extend([f"{k}={v}" for k, v in kwargs.items()]) url = f"{base_url}:{port}/{route}/{proposal_number}?{'&'.join(filter(None, args))}" except Exception: logging.warning("error building orca passthrough request", exc_info=True) return None try: requests.post(url) except requests.ConnectionError: logging.warning("Could not connect to orca to send passthrough request") except Exception: logging.error(f"orca post request error for url {url}", exc_info=True) class ActionsServer: def __init__(self, config, mode): self.config = config self.mode = mode init_config_repo(config['config-repo']) self.job_db = init_job_db(config) self.mdc = init_md_client(config) # Set up a ZMQ socket to listen for requests self.zmq_ctx = zmq.asyncio.Context() auth = zmq.auth.thread.ThreadAuthenticator(self.zmq_ctx) if mode == "prod-auth": auth.start() auth.allow(config['web-service']['allowed-ips']) self.socket = self.zmq_ctx.socket(zmq.REP) self.socket.zap_domain = b'global' self.socket.bind("{}:{}".format(config['web-service']['bind-to'], config['web-service']['port'])) async def run(self): """The main server loop The main server loop handles remote requests via a ZMQ interface. Requests are serialised as the repr of a Python list/tuple, with the first element identifying the action, and subsequent elements arguments to be passed to that action. The number & meaning of arguments depends on the action. """ while True: req = await self.socket.recv() logging.debug("Raw request data: %r", req) try: resp = await self.handle_one_req(req) except Exception as e: logging.error("Unexpected error handling request", exc_info=e) resp = Errors.OTHER_ERROR.format(e).encode() logging.debug("Sending response: %r", resp) await self.socket.send(resp) async def handle_one_req(self, raw_req: bytes) -> bytes: """Handle one request, and return the reply to be sent""" try: # protect against unparseable requests req = ast.literal_eval(raw_req.decode('utf-8')) except (SyntaxError, ValueError) as e: logging.error(str(e)) return Errors.REQUEST_FAILED.encode() if len(req) < 2: # catch parseable but malformed requests logging.error(Errors.REQUEST_MALFORMED.format(req)) return Errors.REQUEST_MALFORMED.format(req).encode() action, *payload = req if action not in self.accepted_actions: logging.warning(Errors.UNKNOWN_ACTION.format(action)) return Errors.UNKNOWN_ACTION.format(action).encode() logging.info("Handling request for action %s", action) logging.debug('Running action %s, payload %r', action, payload) handler = getattr(self, 'handle_' + action.replace('-', '_')) # Verify that requests contains the right number of parameters sig = inspect.signature(handler) try: sig.bind(*payload) except TypeError: logging.error( "Wrong number of arguments for action %s", action, exc_info=True ) return Errors.REQUEST_MALFORMED.format(req).encode() res = handler(*payload) if asyncio.iscoroutine(res): res = await res return res accepted_actions = { 'correct', 'dark_request', 'query-rid', 'upload-yaml', 'update_conf', 'check_connection', } # Handler methods for each available action ------------------------------ async def handle_check_connection(self, *args): """Inform the caller that the service is up and running. MyMDC will send the same information as provided to dark or correct requests, but nothing is done with this information. """ return b'success' async def handle_correct( self, rid, _sase, instrument, cycle, proposal, runnr, priority ): """Launch detector correction :param rid: is the runid within the MDC database :param _sase: is the sase beamline :param instrument: is the instrument :param cycle: is the facility cycle :param proposal: is the proposal id :param runnr: the run number in integer form, i.e. without leading "r" :param priority: unused, retained for compatibility This will trigger a correction process to be launched for that run in the given cycle and proposal. """ request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') try: runnr = runnr.strip('r') proposal = self._normalise_proposal_num(proposal) pconf_full = self.load_proposal_config(cycle, proposal) with self.job_db: cur = self.job_db.execute( "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", (rid, proposal, int(runnr), request_time) ) req_id = cur.lastrowid _orca_passthrough( proposal_number=proposal, runs=[runnr], route="execute", ) data_conf = pconf_full['data-mapping'] if instrument in pconf_full['correct']: pconf = pconf_full['correct'][instrument] else: logging.info(f'Instrument {instrument} is unknown') return Errors.NOT_CONFIGURED.encode() in_folder = self.config['correct']['in-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal) out_folder = self.config['correct']['out-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal, run='r{:04d}'.format(int(runnr)) ) except Exception as e: msg = Errors.JOB_LAUNCH_FAILED.format('correct', e) logging.error(msg, exc_info=e) asyncio.ensure_future( update_mdc_status(self.mdc, 'correct', rid, msg) ) return msg.encode() queued_msg = Success.QUEUED.format(proposal, [runnr]) logging.debug(queued_msg) async def _continue(): """Runs in the background after we reply to the 'correct' request""" await update_mdc_status(self.mdc, 'correct', rid, queued_msg) try: transfer_complete = await wait_transfers( self.mdc, [runnr], proposal, cycle, instrument ) if not transfer_complete: # Timed out await update_mdc_status(self.mdc, 'correct', rid, MDC.MIGRATION_TIMEOUT) return rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) # Prepare configs for all detectors in run fl = glob.glob(f"{rpath}/*.h5") detectors = {} for karabo_id in pconf: dconfig = data_conf[karabo_id] # check for files according to mapping in raw run dir. if any(y in x for x in fl for y in dconfig['karabo-da']): thisconf = copy.copy(dconfig) if isinstance(pconf[karabo_id], dict): thisconf.update(copy.copy(pconf[karabo_id])) thisconf["in-folder"] = in_folder thisconf["out-folder"] = out_folder thisconf["karabo-id"] = karabo_id thisconf["run"] = runnr detectors[karabo_id] = thisconf except Exception as corr_e: logging.error("Error during correction", exc_info=corr_e) await update_mdc_status(self.mdc, 'correct', rid, Errors.REQUEST_FAILED) return for karabo_id in list(detectors.keys()): # Check for any detectors for which corrections are # disabled manually. if detectors[karabo_id].pop('disable-correct', False): logging.warning(f'Skipping disabled detector {karabo_id}') del detectors[karabo_id] if len(detectors) == 0: msg = Errors.NOTHING_TO_DO.format(rpath) logging.warning(msg) await update_mdc_status(self.mdc, 'correct', rid, msg) return ret, _ = await self.launch_jobs( [runnr], req_id, detectors, 'correct', instrument, cycle, proposal, request_time, ) await update_mdc_status(self.mdc, 'correct', rid, ret) loop = get_event_loop() await loop.run_in_executor( None, self.mdc.update_run_api, rid, {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()} ) # END of part to run after sending reply asyncio.ensure_future(_continue()) return queued_msg.encode() async def handle_dark_request( self, rid, _sase, instrument, cycle, proposal, karabo_id, operation_mode, *extra ): """Launch dark run processing :param rid: is the runid within the MDC database :param _sase: is the sase beamline :param instrument: is the instrument :param cycle: is the facility cycle :param proposal: is the proposal id :param karabo_id: is the detector karabo id :param operation_mode: is the detector's operation mode, as defined in CalCat :param pdu_names: physical detector units for each modules :param karabo_das: the Data Agreggators representing which detector modules to calibrate :param runnr: is the run number in integer form, i.e. without leading "r" """ request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') try: pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra)) karabo_das = [val.strip() for val in karabo_das] runs = [str(val) for val in wait_runs] # FIX: could this be int instead of str? proposal = self._normalise_proposal_num(proposal) _orca_passthrough( proposal_number=proposal, runs=runs, action="dark", route="execute", karabo_id=karabo_id, ) with self.job_db: cur = self.job_db.execute( "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'DARK', ?)", (rid, proposal, int(wait_runs[-1]), request_time) ) req_id = cur.lastrowid pconf_full = self.load_proposal_config(cycle, proposal) data_conf = pconf_full['data-mapping'] if instrument in pconf_full['dark']: pconf = pconf_full['dark'][instrument] else: logging.info(f'Instrument {instrument} is unknown') return Errors.NOT_CONFIGURED.encode() in_folder = self.config['dark']['in-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal) out_folder = self.config['dark']['out-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal, runs='_'.join(runs)) out_folder = str(Path(out_folder, karabo_id.replace('-', '_'))) except Exception as e: msg = Errors.JOB_LAUNCH_FAILED.format('dark_request', e) logging.error(msg, exc_info=e) asyncio.ensure_future( update_mdc_status(self.mdc, 'dark_request', rid, msg) ) return msg.encode() queued_msg = Success.QUEUED.format(proposal, runs) logging.debug(queued_msg) async def _continue(): """Runs in the background after we reply to the 'dark_request' request""" await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) transfer_complete = await wait_transfers( self.mdc, runs, proposal, cycle, instrument ) if not transfer_complete: # Timed out await update_mdc_status( self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT ) return # Notebooks require one or three runs, depending on the # detector type and operation mode. triple = any(det in karabo_id for det in ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"]) # This fails silently if the hardcoded strings above are # ever changed (triple = False) but the underlying notebook # still expects run-high/run-med/run-low. if triple and len(runs) == 1: runs_dict = {'run-high': runs[0], 'run-med': '0', 'run-low': '0'} elif triple and len(runs) == 3: runs_dict = {'run-high': runs[0], 'run-med': runs[1], 'run-low': runs[2]} else: # single runs_dict = {'run': runs[0]} # We assume that MyMDC does not allow dark request if the data # is not migrated, thus skipping some validation here. thisconf = copy.copy(data_conf[karabo_id]) # Pop internal key to avoid propagation to xfel-calibrate. thisconf.pop('disable-correct', None) if (karabo_id in pconf and isinstance(pconf[karabo_id], dict)): thisconf.update(copy.copy(pconf[karabo_id])) thisconf['in-folder'] = in_folder thisconf['out-folder'] = out_folder thisconf['karabo-id'] = karabo_id thisconf['karabo-da'] = karabo_das thisconf['operation-mode'] = operation_mode thisconf.update(runs_dict) detectors = {karabo_id: thisconf} ret, report_path = await self.launch_jobs( runs, req_id, detectors, 'dark', instrument, cycle, proposal, request_time ) await update_mdc_status(self.mdc, 'dark_request', rid, ret) if len(report_path) == 0: logging.warning("Failed to identify report path for dark_request") else: if len(report_path) > 1: logging.warning( "More than one report path is returned. " "Updating myMDC with the first report path only." ) await update_darks_paths( self.mdc, rid, in_folder, out_folder, report_path[0] ) # END of part to run after sending reply asyncio.ensure_future(_continue()) return queued_msg.encode() def handle_query_rid(self, rid): return query_rid(self.job_db, rid) def handle_upload_yaml(self, _sase, instrument, cycle, proposal, this_yaml): """Reconfigure detector calibration for the specified proposal :param _sase: is the sase beamline :param instrument: is the instrument :param cycle: is the facility cycle :param proposal: is the proposal id :param this_yaml: is url-encoded (quotes and spaces) representation of new YAML file This will create or replace the existing YAML configuration for the proposal and cycle with the newly sent one, and then push it to the git configuration repo. """ this_yaml = urllib.parse.unquote_plus(this_yaml) return upload_config( self.config['config-repo'], this_yaml, instrument, cycle, proposal ) async def handle_update_conf( self, sase, karabo_id, instrument, cycle, proposal, config_yaml, apply ): updated_config = None try: updated_config = json.loads(config_yaml) return change_config( self.config['config-repo'], updated_config, instrument, cycle, proposal, apply.upper() == "TRUE", ) except Exception as e: err_msg = (f"Failure applying config for {proposal}:" f" {e}: {updated_config}") logging.error(err_msg, exc_info=e) return yaml.dump(err_msg, default_flow_style=False).encode() # Helper methods for handlers --------------------------------------------- @staticmethod def _normalise_proposal_num(p: str) -> str: return "{:06d}".format(int(p.strip('p'))) def load_proposal_config(self, cycle, proposal) -> Dict: # Read calibration configuration from yaml conf_file = Path( self.config['config-repo']['local-path'], cycle, f'{proposal}.yaml' ) if not conf_file.exists(): conf_file = Path( self.config['config-repo']['local-path'], "default.yaml" ) logging.debug("Loading config for cycle %s, proposal %s from %s", cycle, proposal, conf_file) with open(conf_file, "r") as f: return yaml.load(f.read(), Loader=yaml.FullLoader) async def launch_jobs( self, run_nrs, req_id, detectors, action, instrument, cycle, proposal, request_time ) -> Tuple[str, List[str]]: report = [] ret = [] partition = await get_slurm_partition(self.mdc, action, proposal) nice = await get_slurm_nice( partition, instrument, cycle, commissioning_penalty=self.config[action]['commissioning-penalty'], job_penalty=self.config[action]['job-penalty']) # run xfel_calibrate for karabo_id, dconfig in detectors.items(): detector = dconfig['detector-type'] del dconfig['detector-type'] cmd = self.config[action]['cmd'].format( detector=detector, sched_prio=nice, partition=partition, action=action, instrument=instrument, cycle=cycle, proposal=proposal, runs="_".join([f"r{r}" for r in run_nrs]), time_stamp=datetime.now().strftime('%y%m%d_%H%M%S'), det_instance=karabo_id, request_time=request_time ).split() cmd = parse_config(cmd, dconfig) with self.job_db: cur = self.job_db.execute( "INSERT INTO executions VALUES (NULL, ?, ?, ?, ?, NULL)", (req_id, shlex.join(cmd), detector, karabo_id) ) exec_id = cur.lastrowid # TODO: Add detector info in returned run action status. ret.append(await run_action( self.job_db, cmd, self.mode, proposal, run_nrs[-1], exec_id )) if '--report-to' in cmd[:-1]: report_idx = cmd.index('--report-to') + 1 report.append(cmd[report_idx] + '.pdf') # return string without a tailing comma. return ", ".join(ret), report def main(argv: Optional[List[str]] = None): if argv is None: argv = sys.argv[1:] # Ensure files are opened as UTF-8 by default, regardless of environment. locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8')) parser = argparse.ArgumentParser( description='Start the calibration webservice' ) parser.add_argument('--config-file', type=str, default=None) parser.add_argument('--mode', type=str, default="sim", choices=['sim', 'prod']) # noqa parser.add_argument('--log-file', type=str, default='./web.log') parser.add_argument( '--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] # noqa ) args = parser.parse_args(argv) config_file = args.config_file log_file = args.log_file log_level = args.log_level mode = args.mode if config_file is not None: config.configure(includes_for_dynaconf=[Path(config_file).absolute()]) fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' # noqa logging.basicConfig( filename=log_file, level=getattr(logging, log_level), format=fmt ) # Launch the ZMQ server to handle requests for calibration server = ActionsServer(config, mode) loop = asyncio.get_event_loop() loop.run_until_complete(server.run()) loop.close() if __name__ == "__main__": main()