From ead040bfe55e1011abffe81feacf29ae93f17ea6 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Fri, 17 Jun 2022 14:26:09 +0100 Subject: [PATCH] Initial skeleton for repeat action in webservice --- webservice/config/webservice.yaml | 1 + webservice/webservice.py | 38 +++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/webservice/config/webservice.yaml b/webservice/config/webservice.yaml index c91b4b923..a992376fa 100644 --- a/webservice/config/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -31,6 +31,7 @@ kafka: correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} + reports-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/ commissioning-penalty: 1250 job-penalty: 2 cmd : >- diff --git a/webservice/webservice.py b/webservice/webservice.py index 02f682daf..4dc675beb 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -786,6 +786,7 @@ class ActionsServer: accepted_actions = { 'correct', + 'repeat', 'dark_request', 'query-rid', 'upload-yaml', @@ -933,6 +934,43 @@ class ActionsServer: return queued_msg.encode() + async def handle_repeat(self, rid, instrument, cycle, proposal, runnr): + reports_dir = Path(self.config['correct']['reports-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal, runs=runnr + )) + + mddirs_by_krb_id = {} + for calmeta_pth in sorted(reports_dir.glob('*/calibration_metadata.yml')): + with calmeta_pth.open('r', encoding='utf-8') as f: + calmeta = yaml.safe_load(f) + + try: + request_time = calmeta["runtime-summary"]["pipeline-steps"]["request-time"] + karabo_id = calmeta["calibration-configurations"]["karabo-id"] + except KeyError: + logging.warning("Did not find expected metadata in %s", + calmeta_pth, exc_info=True) + else: + mddirs_by_krb_id.setdefault(karabo_id, []).append( + (calmeta_pth.parent, request_time) + ) + + logging.info("Found %d corrections to re-run for p%s r%s: %s", + len(mddirs_by_krb_id), proposal, runnr, list(mddirs_by_krb_id)) + + for karabo_id, mddirs in mddirs_by_krb_id.items(): + # Select the latest metadata directory - with the highest request + # time - to re-run for each detector (karabo_id) + mddir, _ = max(mddirs, key=lambda p: p[1]) + + logging.info("Repeating correction for %s from %s", karabo_id, mddir) + + cmd = ['python', '-m', 'xfel_calibrate.repeat', mddir] + await run_action( + self.job_db, cmd, self.mode, + proposal, runnr, rid + ) + async def handle_dark_request( self, rid, _sase, instrument, cycle, proposal, karabo_id, operation_mode, *extra -- GitLab