diff --git a/src/xfel_calibrate/repeat.py b/src/xfel_calibrate/repeat.py index 0e5ef9674f3e6a86b92d6ab49ba1acd8b40169d5..b9b189d5fc0fb55eefdd9823a5ca2c9573383b26 100644 --- a/src/xfel_calibrate/repeat.py +++ b/src/xfel_calibrate/repeat.py @@ -110,7 +110,7 @@ def main(argv=None): "This requires the pyenv command to be available." ) ap.add_argument("--out-folder", help="Directory to put output data") - ap.add_argument("--report-path", help="Location to save PDF report") + ap.add_argument("--report-to", help="Location to save PDF report") ap.add_argument("--slurm-partition", help="Submit jobs in this Slurm partition") ap.add_argument("--slurm-mem", type=int, help="Requested node RAM in GB") ap.add_argument('--no-cluster-job', action="store_true", @@ -140,10 +140,10 @@ def main(argv=None): update_notebooks_params(working_dir, params_to_set) if args.report_path: - report_path = args.report_path + report_to = args.report_to else: # Default to saving report in output folder - report_path = str(Path(out_folder, f'xfel-calibrate-repeat-{run_uuid}')) - cal_metadata['report-path'] = report_path + report_to = str(Path(out_folder, f'xfel-calibrate-repeat-{run_uuid}')) + cal_metadata['report-path'] = f'{report_to}.pdf' cal_metadata.save() @@ -167,7 +167,7 @@ def main(argv=None): fmt_args = {'run_path': working_dir, 'out_path': out_folder, 'version': get_pycalib_version(), - 'report_to': report_path, + 'report_to': report_to, 'in_folder': parameters['in-folder'], 'request_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'), 'submission_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'), diff --git a/webservice/config/webservice.yaml b/webservice/config/webservice.yaml index c91b4b923f70df2982f996d0206d367c0a70c37e..a992376fa75035d0179c55a435be8e321e7a2ecb 100644 --- a/webservice/config/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -31,6 +31,7 @@ kafka: correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} + reports-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/ commissioning-penalty: 1250 job-penalty: 2 cmd : >- diff --git a/webservice/request_repeat.py b/webservice/request_repeat.py new file mode 100644 index 0000000000000000000000000000000000000000..0851f99d5ca2b9164450318238fe65d431a56fb9 --- /dev/null +++ b/webservice/request_repeat.py @@ -0,0 +1,33 @@ +"""Send a request to repeat previous corrections. + +The repeat mechanism is meant for if corrected data has been deleted, +but this script can also be used for testing. +""" + +import argparse +from glob import glob + +import zmq + +parser = argparse.ArgumentParser(description='Request repeat correction.') +parser.add_argument('proposal', type=int, help='The proposal number') +parser.add_argument('run', type=int, help='The run number') +parser.add_argument('--mymdc-id', type=int, default=0, help="Run ID in myMdC") +parser.add_argument('--endpoint', default='tcp://max-exfl016:5555', + help="The ZMQ endpoint to connect to (max-exfl017 for testing)") + +args = parser.parse_args() + +prop_dir = glob('/gpfs/exfel/exp/*/*/p{:06d}'.format(args.proposal))[0] +instrument, cycle = prop_dir.split('/')[4:6] + +con = zmq.Context() +socket = con.socket(zmq.REQ) +con = socket.connect(args.endpoint) + +parm_list = ["recorrect", str(args.mymdc_id), instrument, cycle, + f'{args.proposal:06d}', str(args.run)] + +socket.send(repr(parm_list).encode()) +resp = socket.recv() +print(resp.decode()) diff --git a/webservice/webservice.py b/webservice/webservice.py index 0984ac4c7e7997c5154373e087aa6c0d9ed579cc..b79128fbe4a7bfa08acd079fb84b847624d756e2 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -14,6 +14,7 @@ import sys import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime, timezone +from getpass import getuser from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union @@ -786,6 +787,7 @@ class ActionsServer: accepted_actions = { 'correct', + 'recorrect', 'dark_request', 'query-rid', 'upload-yaml', @@ -933,6 +935,104 @@ class ActionsServer: return queued_msg.encode() + async def handle_recorrect(self, rid, instrument, cycle, proposal, runnr): + request_time = datetime.now() + + try: + with self.job_db: + cur = self.job_db.execute( + "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", + (rid, proposal, int(runnr), request_time.strftime('%Y-%m-%dT%H:%M:%S')) + ) + req_id = cur.lastrowid + + reports_dir = Path(self.config['correct']['reports-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, runs=f"r{runnr}" + )) + + mddirs_by_krb_id = {} + for calmeta_pth in sorted(reports_dir.glob('*/calibration_metadata.yml')): + with calmeta_pth.open('r', encoding='utf-8') as f: + calmeta = yaml.safe_load(f) + + try: + prior_request_time = calmeta["runtime-summary"]\ + ["pipeline-steps"]["request-time"] + karabo_id = calmeta["calibration-configurations"]["karabo-id"] + except KeyError: + logging.warning("Did not find expected metadata in %s", + calmeta_pth, exc_info=True) + else: + mddirs_by_krb_id.setdefault(karabo_id, []).append( + (calmeta_pth.parent, prior_request_time) + ) + + logging.info("Found %d corrections to re-run for p%s r%s: %s", + len(mddirs_by_krb_id), proposal, runnr, list(mddirs_by_krb_id)) + + except Exception as e: + msg = Errors.JOB_LAUNCH_FAILED.format('correct', e) + logging.error(msg, exc_info=e) + asyncio.ensure_future( + update_mdc_status(self.mdc, 'correct', rid, msg) + ) + return msg.encode() + + queued_msg = Success.QUEUED.format(proposal, [runnr]) + logging.debug(queued_msg) + + async def _continue(): + """Runs in the background after we reply to the ZMQ request""" + if len(mddirs_by_krb_id) == 0: + in_folder = self.config['correct']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal) + rpath = os.path.join(in_folder, f"r{int(runnr):04d}/") + msg = Errors.NOTHING_TO_DO.format(rpath) + logging.warning(msg) + await update_mdc_status(self.mdc, 'correct', rid, msg) + return + + await update_mdc_status(self.mdc, 'correct', rid, queued_msg) + + ret = [] + for karabo_id, mddirs in mddirs_by_krb_id.items(): + # Select the latest metadata directory - with the highest request + # time - to re-run for each detector (karabo_id) + mddir, _ = max(mddirs, key=lambda p: p[1]) + + logging.info("Repeating correction for %s from %s", karabo_id, mddir) + + cmd = [ + 'python', '-m', 'xfel_calibrate.repeat', str(mddir), + '--env-cache', + f'/gpfs/exfel/data/scratch/{getuser()}/calib-repeat-envs', + '--report-to', + f'{reports_dir}/{karabo_id}_RECORRECT_{request_time:%y%m%d_%H%M%S}' + ] + + with self.job_db: + cur = self.job_db.execute( + "INSERT INTO executions VALUES (NULL, ?, ?, NULL, ?, NULL)", + (req_id, shlex.join(cmd), karabo_id) + ) + exec_id = cur.lastrowid + + ret.append(await run_action( + self.job_db, cmd, self.mode, + proposal, runnr, exec_id + )) + + await update_mdc_status(self.mdc, 'correct', rid, ", ".join(ret)) + await get_event_loop().run_in_executor( + None, self.mdc.update_run_api, + rid, {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()} + ) + # END of part to run after sending reply + + asyncio.ensure_future(_continue()) + + return queued_msg.encode() + async def handle_dark_request( self, rid, _sase, instrument, cycle, proposal, karabo_id, operation_mode, *extra