From 6f46477eefbecca9640ed8d1297b7d8124ffdf72 Mon Sep 17 00:00:00 2001 From: Cyril Danilevski <cyril.danilevski@xfel.eu> Date: Mon, 8 Feb 2021 16:04:27 +0100 Subject: [PATCH] Add MyMDC dark request --- .../Characterize_AGIPD_Gain_Darks_NBC.ipynb | 4 +- .../DSSC/Characterize_DSSC_Darks_NBC.ipynb | 3 +- ...s_NewDAQ_FastCCD_NBC_New_Common_Mode.ipynb | 3 +- ...rk_analysis_all_gains_burst_mode_NBC.ipynb | 3 +- notebooks/LPD/LPDChar_Darks_NBC.ipynb | 3 +- .../Characterize_Darks_ePix100_NBC.ipynb | 3 +- .../Characterize_Darks_ePix10K_NBC.ipynb | 3 +- .../pnCCD/Characterize_pnCCD_Dark_NBC.ipynb | 3 +- requirements.txt | 2 +- tests/test_webservice.py | 25 ++ webservice/manual_launch.py | 7 +- webservice/messages.py | 4 +- webservice/webservice.py | 419 ++++++++++++------ 13 files changed, 340 insertions(+), 142 deletions(-) create mode 100644 tests/test_webservice.py diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb index b6cca3ac4..90643887c 100644 --- a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb +++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb @@ -32,6 +32,7 @@ "run_high = 84 # run number in which high gain data was recorded, required\n", "run_med = 87 # run number in which medium gain data was recorded, required\n", "run_low = 88 # run number in which low gain data was recorded, required\n", + "operation_mode = 'ADAPTIVE_GAIN' # Detector operation mode, optional\n", "\n", "karabo_id = \"HED_DET_AGIPD500K2G\" # karabo karabo_id\n", "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", @@ -159,7 +160,8 @@ "\n", "print(f\"Detector in use is {karabo_id}\")\n", "print(f\"Instrument {instrument}\")\n", - "print(f\"Detector instance {dinstance}\")" + "print(f\"Detector instance {dinstance}\")\n", + "print(f\"Operation mode is {operation_mode}\")" ] }, { diff --git a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb index e4ec8fd71..43d7d2437 100644 --- a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb +++ b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb @@ -59,7 +59,8 @@ "\n", "instrument = \"SCS\" # the instrument\n", "high_res_badpix_3d = False # set this to True if you need high-resolution 3d bad pixel plots. Runtime: ~ 1h\n", - "slow_data_aggregators = [1,2,3,4] #quadrant/aggregator\n" + "slow_data_aggregators = [1,2,3,4] # quadrant/aggregator\n", + "operation_mode = '' # Detector operation mode, optional\n" ] }, { diff --git a/notebooks/FastCCD/Characterize_Darks_NewDAQ_FastCCD_NBC_New_Common_Mode.ipynb b/notebooks/FastCCD/Characterize_Darks_NewDAQ_FastCCD_NBC_New_Common_Mode.ipynb index e091b4bbe..9ed3d1515 100644 --- a/notebooks/FastCCD/Characterize_Darks_NewDAQ_FastCCD_NBC_New_Common_Mode.ipynb +++ b/notebooks/FastCCD/Characterize_Darks_NewDAQ_FastCCD_NBC_New_Common_Mode.ipynb @@ -60,7 +60,8 @@ "ADU_to_electron_upper_mg = 23.4 # for upper hemisphere and medium gain\n", "ADU_to_electron_lower_mg = 23.4 # for lower hemisphere and medium gain\n", "ADU_to_electron_upper_lg = 49.3 # for upper hemisphere and low gain\n", - "ADU_to_electron_lower_lg = 47.3 # for lower hemisphere and low gain" + "ADU_to_electron_lower_lg = 47.3 # for lower hemisphere and low gain\n", + "operation_mode = '' # Detector operation mode, optional" ] }, { diff --git a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb index afddd3b29..08ee2d799 100644 --- a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb @@ -52,7 +52,8 @@ "memoryCells = 16 # number of memory cells\n", "db_module = ['Jungfrau_M275', \"Jungfrau_M035\", 'Jungfrau_M273','Jungfrau_M203','Jungfrau_M221','Jungfrau_M267'] # ID of module in calibration database\n", "manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values\n", - "time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us\n" + "time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us\n", + "operation_mode = '' # Detector operation mode, optional" ] }, { diff --git a/notebooks/LPD/LPDChar_Darks_NBC.ipynb b/notebooks/LPD/LPDChar_Darks_NBC.ipynb index 7cd3961d7..5b947b068 100644 --- a/notebooks/LPD/LPDChar_Darks_NBC.ipynb +++ b/notebooks/LPD/LPDChar_Darks_NBC.ipynb @@ -56,7 +56,8 @@ "instrument = \"FXE\" # instrument name\n", "ntrains = 100 # number of trains to use\n", "high_res_badpix_3d = False # plot bad-pixel summary in high resolution\n", - "test_for_normality = False # permorm normality test" + "test_for_normality = False # permorm normality test\n", + "operation_mode = '' # Detector operation mode, optional" ] }, { diff --git a/notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb b/notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb index ec7c96f0f..d1a9fa05e 100644 --- a/notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb +++ b/notebooks/ePix100/Characterize_Darks_ePix100_NBC.ipynb @@ -44,7 +44,8 @@ "db_module = 'ePix100_M17' # detector karabo_id\n", "bias_voltage = 200 # bias voltage\n", "in_vacuum = False # detector operated in vacuum\n", - "fix_temperature = 290. # fix temperature to this value" + "fix_temperature = 290. # fix temperature to this value\n", + "operation_mode = '' # Detector operation mode, optional\n" ] }, { diff --git a/notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb b/notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb index 31fecbcc7..4aeecc749 100644 --- a/notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb +++ b/notebooks/ePix10K/Characterize_Darks_ePix10K_NBC.ipynb @@ -44,7 +44,8 @@ "db_module = 'ePix10K_M43' # detector karabo_id\n", "bias_voltage = 200 # bias voltage\n", "in_vacuum = False # detector operated in vacuum\n", - "fix_temperature = 290. # fix temperature to this value" + "fix_temperature = 290. # fix temperature to this value\n", + "operation_mode = '' # Detector operation mode, optional\n" ] }, { diff --git a/notebooks/pnCCD/Characterize_pnCCD_Dark_NBC.ipynb b/notebooks/pnCCD/Characterize_pnCCD_Dark_NBC.ipynb index 57a98508c..189a1b2b0 100644 --- a/notebooks/pnCCD/Characterize_pnCCD_Dark_NBC.ipynb +++ b/notebooks/pnCCD/Characterize_pnCCD_Dark_NBC.ipynb @@ -69,7 +69,8 @@ "temp_limits = 5 # temperature limits in which calibration parameters are considered equal\n", "\n", "run_parallel = True # for parallel computation\n", - "cpuCores = 40 # specifies the number of running cpu cores" + "cpuCores = 40 # specifies the number of running cpu cores\n", + "operation_mode = '' # Detector operation mode, optional\n" ] }, { diff --git a/requirements.txt b/requirements.txt index 48e32acd9..ebfcbd4ad 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,7 +21,7 @@ jupyter_console == 6.1.0 jupyter-core == 4.6.1 karabo_data == 0.7.0 lxml == 4.5.0 -metadata_client == 3.0.5 +metadata_client == 3.0.8 nbclient == 0.5.1 nbconvert == 5.6.1 nbformat == 5.0.7 diff --git a/tests/test_webservice.py b/tests/test_webservice.py new file mode 100644 index 000000000..1497d9e20 --- /dev/null +++ b/tests/test_webservice.py @@ -0,0 +1,25 @@ +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, Path(__file__).parent / 'webservice') +from webservice.webservice import check_files # noqa + + +def test_check_files(): + in_folder = '/gpfs/exfel/exp/CALLAB/202031/p900113/raw' + runs = [9985, 9984] + karabo_das = ['AGIPD06', 'AGIPD07'] + + assert check_files(in_folder, runs, karabo_das) + + karabo_das = ['LPD06', 'LPD07'] + assert not check_files(in_folder, runs, karabo_das) + + runs = [1, 2] + assert not check_files(in_folder, runs, karabo_das) + + with pytest.raises(PermissionError): + in_folder = '/gpfs/maxwell/home/achilles' # arbitrarily chosen + check_files(in_folder, runs, karabo_das) diff --git a/webservice/manual_launch.py b/webservice/manual_launch.py index b467d89f0..2006d8429 100644 --- a/webservice/manual_launch.py +++ b/webservice/manual_launch.py @@ -5,7 +5,7 @@ socket = con.socket(zmq.REQ) con = socket.connect("tcp://max-exfl017:5555") action = 'dark_request' -dark_run_id = '182' +dark_run_id = '258' sase = 'sase1' instrument = 'CALLAB' cycle = '202031' @@ -19,7 +19,10 @@ run_numbers = '[9985,]' data = [action, dark_run_id, sase, instrument, cycle, proposal, detector_id, operation_mode, *pdu_physical_names, *pdu_karabo_das, run_numbers] +stuff = [action, dark_run_id, sase, instrument, cycle, proposal, 'SPB_DET_AGIPD1M-1', 'ADAPTIVE_GAIN', '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]', '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]', '[9992', ' 9991', ' 9990]'] -socket.send(str(data).encode()) +socket.send(str(stuff).encode()) resp = socket.recv_multipart()[0] print(resp.decode()) + + diff --git a/webservice/messages.py b/webservice/messages.py index 0cfe59646..3f3255daa 100644 --- a/webservice/messages.py +++ b/webservice/messages.py @@ -26,4 +26,6 @@ class Success: START_CHAR = "SUCCESS: Started dark characterization: proposal {}, run {}" START_CORRECTION_SIM = "SUCCESS: Started simulated correction: proposal {}, run {}" START_CHAR_SIM = "SUCCESS: Started dark characterization: proposal {}, run {}" - QUEUED = "Queued proposal {}, run {} for offline calibration" + QUEUED = "SUCCESS: Queued proposal {}, run {} for offline calibration" + DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}" + DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}" diff --git a/webservice/webservice.py b/webservice/webservice.py index 0249a7e44..1e73f38a3 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -7,19 +7,26 @@ import json import logging import os import sqlite3 -import subprocess +import subprocess # FIXME: use asyncio.create_subprocess_* import traceback import urllib.parse +from asyncio import get_event_loop, shield from datetime import datetime +from pathlib import Path +from typing import List import yaml import zmq import zmq.asyncio import zmq.auth.thread from git import InvalidGitRepositoryError, Repo -from messages import MDC, Errors, Success from metadata_client.metadata_client import MetadataClient +try: + from .messages import MDC, Errors, Success +except ImportError: + from messages import MDC, Errors, Success + async def init_job_db(config): """ Initialize the sqlite job database @@ -32,12 +39,14 @@ async def init_job_db(config): :param config: the configuration parsed from the webservice YAML config :return: a sqlite3 connection instance to the database """ + # FIXME: sqlite3 is synchronous, it should be replaced with + # https://pypi.org/project/databases/ logging.info("Initializing database") conn = sqlite3.connect(config['web-service']['job-db']) c = conn.cursor() try: c.execute("SELECT * FROM jobs") - except: + except Exception: # TODO: is it sqlite3.OperationalError? logging.info("Creating initial job database") c.execute("CREATE TABLE jobs(rid, jobid, proposal, run, status, time, det, act)") # noqa return conn @@ -49,6 +58,9 @@ async def init_md_client(config): :param config: the configuration parsed from the webservice YAML config :return: an MDC client connection """ + # FIXME: this blocks the even loop, should use asyncio.Task + # FIXME: calls to this coro should be shielded + # TODO: could the client be a global? This would recuce passing it around mdconf = config['metadata-client'] client_conn = MetadataClient(client_id=mdconf['user-id'], client_secret=mdconf['user-secret'], @@ -66,6 +78,7 @@ def init_config_repo(config): :param config: the configuration defined in the `config-repo` section """ + # FIXME: keep this as func, but call this with asyncio.thread os.makedirs(config['local-path'], exist_ok=True) # check if it is a repo try: @@ -74,7 +87,7 @@ def init_config_repo(config): repo = Repo.clone_from(config['url'], config['local-path']) try: repo.remote().pull() - except: + except Exception: pass logging.info("Config repo is initialized") @@ -97,6 +110,7 @@ async def upload_config(socket, config, yaml, instrument, cycle, proposal): If it exists it is overwritten and then the new version is pushed to the configuration git repo. """ + # FIXME: keep this as func, but call this with asyncio.thread repo = Repo(config['local-path']) # assure we are on most current version repo.remote().pull() @@ -108,7 +122,7 @@ async def upload_config(socket, config, yaml, instrument, cycle, proposal): repo.index.add([fpath]) repo.index.commit( "Update to proposal p{}: {}".format(proposal, - datetime.now().isoformat())) + datetime.now().isoformat())) repo.remote().push() logging.info(Success.UPLOADED_CONFIG.format(cycle, proposal)) socket.send(Success.UPLOADED_CONFIG.format(cycle, proposal).encode()) @@ -140,8 +154,8 @@ def merge(source, destination): return destination -async def change_config(socket, config, updated_config, karabo_id, instrument, cycle, - proposal, apply=False): +async def change_config(socket, config, updated_config, karabo_id, instrument, + cycle, proposal, apply=False): """ Change the configuration of a proposal @@ -171,7 +185,7 @@ async def change_config(socket, config, updated_config, karabo_id, instrument, c defconf = yaml.load(f.read(), Loader=yaml.FullLoader) subconf = {} for action, instruments in defconf.items(): - subconf[action]= {} + subconf[action] = {} if action != "data-mapping": subconf[action][instrument] = instruments[instrument] else: @@ -203,7 +217,7 @@ async def slurm_status(filter_user=True): cmd = ["squeue"] if filter_user: cmd += ["-u", getpass.getuser()] - ret = subprocess.run(cmd, stdout=subprocess.PIPE) + ret = subprocess.run(cmd, stdout=subprocess.PIPE) # FIXME: asyncio if ret.returncode == 0: rlines = ret.stdout.decode().split("\n") statii = {} @@ -212,7 +226,7 @@ async def slurm_status(filter_user=True): jobid, _, _, _, status, runtime, _, _ = r.split() jobid = jobid.strip() statii[jobid] = status, runtime - except: + except ValueError: # not enough values to unpack in split pass return statii return None @@ -226,7 +240,7 @@ async def slurm_job_status(jobid): """ cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"] - ret = subprocess.run(cmd, stdout=subprocess.PIPE) + ret = subprocess.run(cmd, stdout=subprocess.PIPE) # FIXME: asyncio if ret.returncode == 0: rlines = ret.stdout.decode().split("\n") @@ -270,7 +284,7 @@ async def query_rid(conn, socket, rid): socket.send(msg.encode()) -async def parse_config(cmd, config): +def parse_config(cmd, config): for key, value in config.items(): if isinstance(value, list): cmd += ["--{}".format(key)] @@ -289,6 +303,8 @@ async def parse_config(cmd, config): async def update_job_db(config): """ Update the job database and send out updates to MDC + This coro runs as background task + :param config: configuration parsed from webservice YAML """ logging.info("Starting config db handling") @@ -308,7 +324,7 @@ async def update_job_db(config): logging.debug("SLURM info {}".format(statii)) for r in c.fetchall(): - rid, jobid, proposal, run, status, time, _, _ = r + rid, jobid, proposal, run, status, time, _, action = r logging.debug("DB info {}".format(r)) cflg, cstatus = combined.get(rid, ([], [])) @@ -335,16 +351,24 @@ async def update_job_db(config): conn.commit() flg_order = {"R": 2, "A": 1, "NA": 0} + dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'} for rid, value in combined.items(): - if int(rid) == 0: + if int(rid) == 0: # this job was not submitted from MyMDC continue flgs, statii = value + # sort by least done status flg = max(flgs, key=lambda i: flg_order[i]) msg = "\n".join(statii) msg_debug = f"Update MDC {rid}, {msg}" logging.debug(msg_debug.replace('\n', ', ')) - response = mdc.update_run_api(rid, {'flg_cal_data_status': flg, - 'cal_pipeline_reply': msg}) + if action == 'CORRECT': + response = mdc.update_run_api(rid, + {'flg_cal_data_status': flg, + 'cal_pipeline_reply': msg}) + else: # action == 'DARK' but it's dark_request + data = {'dark_run': {'flg_status': dark_flags[flg], + 'calcat_feedback': msg}} + response = mdc.update_dark_run_api(rid, data) if response.status_code != 200: logging.error(Errors.MDC_RESPONSE.format(response)) except Exception as e: @@ -385,16 +409,17 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): Returns a formatted Success or Error message indicating outcome of the execution. """ + # FIXME: this coro has too many returns that can be simplified if mode == "prod": logging.info(" ".join(cmd)) - ret = subprocess.run(cmd, stdout=subprocess.PIPE) + ret = subprocess.run(cmd, stdout=subprocess.PIPE) # FIXME: asyncio if ret.returncode == 0: if "DARK" in cmd: logging.info(Success.START_CHAR.format(proposal, run)) else: logging.info(Success.START_CORRECTION.format(proposal, run)) # enter jobs in job db - c = job_db.cursor() + c = job_db.cursor() # FIXME: asyncio rstr = ret.stdout.decode() query = "INSERT INTO jobs VALUES ('{rid}', '{jobid}', '{proposal}', '{run}', 'PD', '{now}', '{det}', '{act}')" # noqa @@ -406,7 +431,7 @@ async def run_action(job_db, cmd, mode, proposal, run, rid): now=datetime.now().isoformat(), det=cmd[3], act=cmd[4])) job_db.commit() - logging.debug((" ".join(cmd)).replace(',', '').replace("'", "") ) + logging.debug((" ".join(cmd)).replace(',', '').replace("'", "")) if "DARK" in cmd: return Success.START_CHAR.format(proposal, run) else: @@ -436,13 +461,16 @@ async def wait_on_transfer(rpath, max_tries=300): :param max_tries: Maximum number of checks if files are transferred :return: True if files are transferred """ - # dcache files are assumed migrated + # TODO: Make use of MyMDC to request whether the run has been copied. + # It is not sufficient to know that the files are on disk, but also to + # check the copy is finished (ie. that the files are complete). if 'pnfs' in os.path.realpath(rpath): return True rstr = None ret = None tries = 0 + # FIXME: if not kafka, then do event-driven, no sleep # wait until folder gets created while not os.path.exists(rpath): if tries > max_tries: @@ -450,9 +478,11 @@ async def wait_on_transfer(rpath, max_tries=300): tries += 1 await asyncio.sleep(10) + # FIXME: if not kafka, then do event-driven, no sleep # wait until files are migrated while rstr is None or 'status="online"' in rstr or 'status="Online"' in rstr or ret.returncode != 0: # noqa await asyncio.sleep(10) + # FIXME: make use of asyncio.subprocess.run ret = subprocess.run(["getfattr", "-n", "user.status", rpath], stdout=subprocess.PIPE) rstr = ret.stdout.decode() @@ -462,9 +492,10 @@ async def wait_on_transfer(rpath, max_tries=300): return ret.returncode == 0 -async def check_files(in_folder, runs, karabo_das): - """ - Check if files for given karabo-das exists for given runs +def check_files(in_folder: str, + runs: List[int], + karabo_das: List[str]) -> bool: + """Check if files for given karabo-das exists for given runs :param in_folder: Input folder :param runs: List of runs @@ -473,13 +504,78 @@ async def check_files(in_folder, runs, karabo_das): """ files_exists = True for runnr in runs: - rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) - fl = glob.glob(f"{rpath}/*.h5") - if not any(y in x for x in fl for y in karabo_das): + rpath = Path(in_folder, 'r{:04d}'.format(int(runnr))) + fl = rpath.glob('*.h5') + if not any(da in match.name for match in fl for da in karabo_das): files_exists = False return files_exists +async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, + out_path: str, report_path: str): + """Update data paths in MyMDC to provide Globus access + + DarkRequests have the capability to provide a link to the report via + Globus. For this, the absolute path to the report needs to be provided. + + Additionally, the input path to the raw data, as well as the output path to + the processed data can be specified. + + :param mdc: an authenticated MyMDC client + :param rid: the DarkRequest id to update + :param in_path: the absolute path to the raw data being calibrated + :param out_path: the absolute path to the resulting processed data + :param report_path: the absolute path to the PDF report + """ + data = {'dark_run': {'output_path': out_path, + 'input_path': in_path, + 'globus_url': report_path}} + loop = get_event_loop() + response = await shield(loop.run_in_executor(None, mdc.update_dark_run_api, + rid, data)) + if response.status_code != 200: + logging.error(Errors.MDC_RESPONSE.format(response)) + + +async def update_mdc_status(mdc: MetadataClient, action: str, + rid: int, message: str): + """Update MyMDC statuses + + The message content and destination depend on the Action type. + A message to MyMDC expects a status flag, which differ between Correction + and DarkRequest. + + Correction can have NA (Not Available), R(unning), A(vailable) + DarkRequest can have F(inished), E(rror), R(equested), + IP (In Progress), T(imeout) + The flag is extracted from the message. + Further informations are available at: + https://git.xfel.eu/gitlab/detectors/pycalibration/wikis/MyMDC-Communication + """ + if message.split(':')[0] in ('FAILED', 'WARN'): # Errors + flag = 'NA' if action == 'correct' else 'E' + elif message.split(':')[0] == 'SUCCESS': # Success + flag = 'R' if action == 'correct' else 'IP' + if 'Uploaded' in message or 'Finished' in message: + flag = 'A' if action == 'correct' else 'F' + else: # MDC Timeout + flag = 'NA' if action == 'correct' else 'T' + + if action == 'correct': + func = mdc.update_run_api + data = {'flg_cal_data_status': flag, 'cal_pipeline_reply': message} + + if action == 'dark_request': + func = mdc.update_dark_run_api + data = {'dark_run': {'flg_status': flag, 'calcat_feedback': message}} + + loop = get_event_loop() + response = await shield(loop.run_in_executor(None, func, rid, data)) + + if response.status_code != 200: + logging.error(Errors.MDC_RESPONSE.format(response)) + + async def server_runner(config, mode): """ The main server loop @@ -487,12 +583,12 @@ async def server_runner(config, mode): Requests are the form of ZMQ.REQuest and have the format - command, *parms + command, *params where *parms is a string-encoded python list as defined by the commands. The following commands are currently understood: - - correct, with parmeters sase, instrument, cycle, proposal, runnr + - correct, with parmeters rid, sase, instrument, cycle, proposal, runnr where @@ -507,6 +603,25 @@ async def server_runner(config, mode): This will trigger a correction process to be launched for that run in the given cycle and proposal. + - dark_request, with parameters rid, sase, instrument, cycle, proposal, + did, operation_mode, pdu_names, karabo_das, runnr + + where + + :param rid: is the runid within the MDC database + :param sase: is the sase beamline + :param instrument: is the instrument + :param cycle: is the facility cycle + :param proposal: is the proposal id + :param did: is the detector karabo id + :param operation_mode: is the detector's operation mode, as defined in + CalCat + :param pdu_names: physical detector units for each modules + :param karabo_das: the Data Agreggators representing which detector + modules to calibrate + :param runnr: is the run number in integer form, i.e. without leading + "r" + - upload-yaml, with parameters sase, instrument, cycle, proposal, yaml where @@ -544,7 +659,7 @@ async def server_runner(config, mode): if isinstance(response, list) and len(response) == 1: try: # protect against unparseable requests response = eval(response[0]) - except Exception as e: + except SyntaxError as e: logging.error(str(e)) socket.send(Errors.REQUEST_FAILED.encode()) continue @@ -554,10 +669,11 @@ async def server_runner(config, mode): socket.send(Errors.REQUEST_MALFORMED.format(response).encode()) continue + # FIXME: action should be an enum action, payload = response[0], response[1:] - if action not in ["correct", 'dark', 'query-rid', - 'upload-yaml', 'update_conf']: # only handle known actions + if action not in ['correct', 'dark', 'dark_request', 'query-rid', + 'upload-yaml', 'update_conf']: logging.warning(Errors.UNKNOWN_ACTION.format(action)) socket.send(Errors.UNKNOWN_ACTION.format(action).encode()) continue @@ -569,105 +685,117 @@ async def server_runner(config, mode): await query_rid(job_db, socket, rid) continue - async def do_action(action, payload): + async def do_action(action, payload): # FIXME: this needn't be nested in_folder = None run_mapping = {} - priority = None + priority = None # TODO: Investigate argument - if action in ['update_conf']: + if action == 'update_conf': updated_config = None try: sase, karabo_id, instrument, cycle, proposal, config_yaml, apply = payload # noqa updated_config = json.loads(config_yaml) await change_config(socket, config['config-repo'], - updated_config, karabo_id, instrument, cycle, - proposal, apply.upper()=="TRUE") + updated_config, karabo_id, instrument, + cycle, proposal, + apply.upper() == "TRUE") except Exception as e: e = str(e) - err_msg = f"Failure applying config for {proposal}:" + \ - f" {e}: {updated_config}" + err_msg = (f"Failure applying config for {proposal}:" + f" {e}: {updated_config}") logging.error(err_msg) - logging.error(f"Unexpected error: {traceback.format_exc()}") - socket.send(yaml.dump(err_msg, default_flow_style=False).encode()) + logging.error(f"Unexpected error: {traceback.format_exc()}") # noqa + socket.send(yaml.dump(err_msg, + default_flow_style=False).encode()) - if action in ['dark', 'correct']: + if action in ['dark', 'correct', 'dark_request']: request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') try: - wait_runs = [] + wait_runs: List[str] = [] + rid, sase, instrument, cycle, proposal, *payload = payload + if action == 'correct': - (rid, sase, instrument, cycle, proposal, runnr, - priority) = payload - runnr = runnr.replace("r", "") + runnr, priority = payload + runnr = runnr.strip('r') wait_runs = [runnr] - msg = "Correction of run {} at {}({}) " \ - "is requested. Checking files..." - logging.info(msg.format(runnr, instrument, proposal)) + if action == 'dark': - (rid, sase, instrument, cycle, proposal, karabo_ids, - karabo_das) = payload[:7] - msg = "Dark characterization for {} at {} " \ - "is requested. Checking files..." - logging.info(msg.format(karabo_ids, instrument)) + karabo_ids, karabo_das, *runs = payload + karabo_ids = karabo_ids.split(',') karabo_das = karabo_das.split(',') - runs = payload[7:] # can be many for i, run in enumerate(runs): erun = eval(run) if isinstance(erun, (list, tuple)): typ, runnr = erun if typ == "reservation": continue - runnr = runnr.replace("r", "") + runnr = runnr.strip('r') run_mapping[typ] = runnr wait_runs.append(runnr) else: run_mapping['no_mapping_{}'.format(i)] = erun wait_runs.append(erun) - proposal = proposal.replace("p", "") + + if action == 'dark_request': + karabo_id, operation_mode, *payload = payload + payload = eval(','.join(payload)) + pdus, karabo_das, wait_runs = payload + + karabo_das = [val.strip() for val in karabo_das] + wait_runs = [str(val) for val in wait_runs] + + proposal = proposal.strip('p') proposal = "{:06d}".format(int(proposal)) + logging.info(f'{action} of {proposal} run {wait_runs} at ' + '{instrument} is requested. Checking files.') + # Read calibration configuration from yaml - conf_file = "{}/{}/{}.yaml".format( - config['config-repo']['local-path'], cycle, proposal) - if not os.path.exists(conf_file): - conf_file = "{}/default.yaml".format( - config['config-repo']['local-path']) + conf_file = Path(config['config-repo']['local-path'], + cycle, f'{proposal}.yml') + if not conf_file.exists(): + conf_file = Path(config['config-repo']['local-path'], + "default.yaml") with open(conf_file, "r") as f: pconf_full = yaml.load(f.read(), Loader=yaml.FullLoader) - data_conf = pconf_full['data-mapping'] - if instrument in pconf_full[action]: - pconf = pconf_full[action][instrument] - else: - socket.send(Errors.NOT_CONFIGURED.encode()) - logging.info(f'Instrument {instrument} is unknown') - return - - in_folder = config[action]['in-folder'].format( + + # FIXME: remove once MyMDC sends `dark` action + action_ = 'dark' if action == 'dark_request' else action + data_conf = pconf_full['data-mapping'] + if instrument in pconf_full[action_]: + pconf = pconf_full[action_][instrument] + else: + socket.send(Errors.NOT_CONFIGURED.encode()) + logging.info(f'Instrument {instrument} is unknown') + return + + in_folder = config[action_]['in-folder'].format( instrument=instrument, cycle=cycle, proposal=proposal) - msg = "Queued proposal {}, run {} for offline calibration, priority: {}".format( - proposal, ", ".join(wait_runs), priority) + msg = Success.QUEUED.format(proposal, wait_runs) socket.send(msg.encode()) logging.debug(msg) + if action in ['correct', 'dark_request']: + await update_mdc_status(mdc, action, rid, msg) + except Exception as e: e = str(e) - msg = f"Failure to initiate {action}: {e}" + msg = Errors.JOB_LAUNCH_FAILED.format(action, e) logging.error(msg) socket.send(msg.encode()) - if action == 'correct': - response = mdc.update_run_api(rid, { - 'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': Errors.REQUEST_FAILED}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) + if action in ['correct', 'dark_request']: + await update_mdc_status(mdc, action, rid, msg) return # Check if all files for given runs are transferred all_transfers = [] + + # FIXME: this loop should be an asyncio.gather for runnr in wait_runs: rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) transfer_complete = await wait_on_transfer(rpath) @@ -676,17 +804,12 @@ async def server_runner(config, mode): logging.error( Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr)) - if action == 'correct': - response = mdc.update_run_api(rid, { - 'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': MDC.MIGRATION_TIMEOUT}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) + if action in ['correct', 'dark_request']: + await update_mdc_status(mdc, action, rid, + MDC.MIGRATION_TIMEOUT) if not all(all_transfers): - logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, - ",".join( - wait_runs))) + logging.error(Errors.TRANSFER_EVAL_FAILED.format(proposal, ','.join(wait_runs))) # noqa return logging.debug(f"Now doing: {action}") @@ -709,7 +832,7 @@ async def server_runner(config, mode): karabo_da = data_conf[karabo_id]["karabo-da"] # Check if any files for given karabo-das exists - if await check_files(in_folder, wait_runs, karabo_da): + if check_files(in_folder, wait_runs, karabo_da): thisconf = copy.copy(data_conf[karabo_id]) if (karabo_id in pconf and @@ -718,7 +841,7 @@ async def server_runner(config, mode): thisconf["in-folder"] = in_folder thisconf["out-folder"] = '/'.join((out_folder, - karabo_id.replace('-', '_'))) + karabo_id.replace('-', '_'))) # noqa FIXME Make use of pathlib thisconf["karabo-id"] = karabo_id thisconf["karabo-da"] = karabo_da @@ -734,8 +857,8 @@ async def server_runner(config, mode): detectors[karabo_id] = thisconf else: logging.warning("File list for {} at {} is empty" - .format(karabo_id, "{}/*.h5" - .format(rpath))) + .format(karabo_id, + "{}/*.h5".format(rpath))) if len(detectors) == 0: logging.warning(Errors.NOTHING_TO_DO.format(rpath)) @@ -756,8 +879,9 @@ async def server_runner(config, mode): detectors = {} for karabo_id in pconf: dconfig = data_conf[karabo_id] - # check if we find files according to mapping in raw run folder - if any(y in x for x in fl for y in dconfig['karabo-da']): + # check for files according to mapping in raw run dir. + if any(y in x for x in fl + for y in dconfig['karabo-da']): for karabo_da in dconfig['karabo-da']: tfl = glob.glob(f"{rpath}/*{karabo_da}*.h5") corr_file_list = corr_file_list.union(set(tfl)) @@ -774,34 +898,72 @@ async def server_runner(config, mode): detectors[karabo_id] = thisconf copy_file_list = copy_file_list.difference(corr_file_list) asyncio.ensure_future(copy_untouched_files(copy_file_list, - out_folder, runnr)) + out_folder, + runnr)) if len(detectors) == 0: logging.warning(Errors.NOTHING_TO_DO.format(rpath)) - msg = MDC.NOTHING_TO_DO - response = mdc.update_run_api(rid, - {'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': msg}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) + await update_mdc_status(mdc, action, rid, + MDC.NOTHING_TO_DO) return + except Exception as corr_e: logging.error(f"Error during correction: {str(corr_e)}") - response = mdc.update_run_api(rid, { - 'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': Errors.REQUEST_FAILED}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) + await update_mdc_status(mdc, action, rid, + Errors.REQUEST_FAILED) + + if action == 'dark_request': + runs = [str(r) for r in wait_runs] + + # Notebooks require one or three runs, depending on the + # detector type and operation mode. + triple = any(det in karabo_id for det in + ["LPD", "AGIPD", "JUNGFRAU", "JF", "JNGFR"]) + + if triple and len(runs) == 1: + runs_dict = {'run-high': runs[0], + 'run-med': '0', + 'run-low': '0'} + elif triple and len(runs) == 3: + runs_dict = {'run-high': runs[0], + 'run-med': runs[1], + 'run-low': runs[2]} + else: # single + runs_dict = {'run': runs[0]} + + out_folder = config['dark']['out-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, + runs='_'.join(runs)) + out_folder = str(Path(out_folder, + karabo_id.replace('-', '_'))) + + # We assume that MyMDC does not allow dark request if the data + # is not migrated, thus skipping some validation here. + thisconf = copy.copy(data_conf[karabo_id]) + + if (karabo_id in pconf + and isinstance(pconf[karabo_id], dict)): + thisconf.update(copy.copy(pconf[karabo_id])) + + thisconf['in-folder'] = in_folder + thisconf['out-folder'] = out_folder + thisconf['karabo-id'] = karabo_id + thisconf['karabo-da'] = ','.join(karabo_das) + thisconf['operation-mode'] = operation_mode + + thisconf.update(runs_dict) - if action in ['dark', 'correct']: + detectors = {karabo_id: thisconf} + + if action in ['correct', 'dark', 'dark_request']: # run xfel_calibrate - msg = [] + action_ = 'dark' if action == 'dark_request' else action for karabo_id, dconfig in detectors.items(): detector = dconfig['detector-type'] del dconfig['detector-type'] - cmd = config[action]['cmd'].format( + cmd = config[action_]['cmd'].format( detector=detector, - sched_prio=str(config[action]['sched-prio']), - action=action, instrument=instrument, + sched_prio=str(config[action_]['sched-prio']), + action=action_, instrument=instrument, cycle=cycle, proposal=proposal, runs="_".join([f"r{r}" for r in wait_runs]), time_stamp=ts, @@ -809,19 +971,23 @@ async def server_runner(config, mode): request_time=request_time ).split() - cmd = await parse_config(cmd, dconfig) - ret = await run_action(job_db, cmd, mode, proposal, runnr, - rid if action == 'correct' else 0) - if ret != '': - msg.append(karabo_id) - if msg and action == 'correct': - msg = Errors.NOT_SUBMITTED.format(msg) - mdc_res = mdc.update_run_api(rid, - {'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': msg}) - if mdc_res.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(mdc_res)) + cmd = parse_config(cmd, dconfig) + + rid = rid if action in ['correct', 'dark_request'] else 0 + ret = await run_action(job_db, cmd, mode, + proposal, runnr, rid) + if action == 'correct': + await update_mdc_status(mdc, action, rid, ret) + if action == 'dark_request': + await update_mdc_status(mdc, action, rid, ret) + report_idx = cmd.index('--report-to') + 1 + report = cmd[report_idx] + '.pdf' + await update_darks_paths(mdc, rid, in_folder, + out_folder, report) + + # TODO: moving this block further up reduces the need of so + # many nested ifs. Move up and return directly if action == 'upload-yaml': sase, instrument, cycle, proposal, this_yaml = payload this_yaml = urllib.parse.unquote_plus(this_yaml) @@ -833,13 +999,6 @@ async def server_runner(config, mode): do_action(copy.copy(action), copy.copy(payload))) except Exception as e: # actions that fail are only error logged logging.error(str(e)) - # TODO: add "dark" after enabling the action from myMDC - if action == 'correct': - response = mdc.update_run_api(rid, { - 'flg_cal_data_status': 'NA', - 'cal_pipeline_reply': Errors.REQUEST_FAILED}) - if response.status_code != 200: - logging.error(Errors.MDC_RESPONSE.format(response)) parser = argparse.ArgumentParser( -- GitLab