From dd7e5433b607c1ec5050f7c194ee43cab7f21e53 Mon Sep 17 00:00:00 2001
From: Thomas Kluyver <thomas@kluyver.me.uk>
Date: Wed, 29 Jun 2022 10:20:07 +0100
Subject: [PATCH] Restructure code to launch jobs after sending ZMQ reply

---
 webservice/webservice.py | 116 ++++++++++++++++++++++++++-------------
 1 file changed, 77 insertions(+), 39 deletions(-)

diff --git a/webservice/webservice.py b/webservice/webservice.py
index a861aadbb..5d66216f9 100644
--- a/webservice/webservice.py
+++ b/webservice/webservice.py
@@ -937,56 +937,94 @@ class ActionsServer:
     async def handle_repeat(self, rid, instrument, cycle, proposal, runnr):
         request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
 
-        with self.job_db:
-            cur = self.job_db.execute(
-                "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
-                (rid, proposal, int(runnr), request_time)
+        try:
+            with self.job_db:
+                cur = self.job_db.execute(
+                    "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
+                    (rid, proposal, int(runnr), request_time)
+                )
+                req_id = cur.lastrowid
+
+            reports_dir = Path(self.config['correct']['reports-folder'].format(
+                instrument=instrument, cycle=cycle, proposal=proposal, runs=runnr
+            ))
+
+            mddirs_by_krb_id = {}
+            for calmeta_pth in sorted(reports_dir.glob('*/calibration_metadata.yml')):
+                with calmeta_pth.open('r', encoding='utf-8') as f:
+                    calmeta = yaml.safe_load(f)
+
+                try:
+                    prior_request_time = calmeta["runtime-summary"]\
+                                            ["pipeline-steps"]["request-time"]
+                    karabo_id = calmeta["calibration-configurations"]["karabo-id"]
+                except KeyError:
+                    logging.warning("Did not find expected metadata in %s",
+                                    calmeta_pth, exc_info=True)
+                else:
+                    mddirs_by_krb_id.setdefault(karabo_id, []).append(
+                        (calmeta_pth.parent, prior_request_time)
+                    )
+
+            logging.info("Found %d corrections to re-run for p%s r%s: %s",
+                         len(mddirs_by_krb_id), proposal, runnr, list(mddirs_by_krb_id))
+
+        except Exception as e:
+            msg = Errors.JOB_LAUNCH_FAILED.format('correct', e)
+            logging.error(msg, exc_info=e)
+            asyncio.ensure_future(
+                update_mdc_status(self.mdc, 'correct', rid, msg)
             )
-            req_id = cur.lastrowid
+            return msg.encode()
 
-        reports_dir = Path(self.config['correct']['reports-folder'].format(
-            instrument=instrument, cycle=cycle, proposal=proposal, runs=runnr
-        ))
+        queued_msg = Success.QUEUED.format(proposal, [runnr])
+        logging.debug(queued_msg)
 
-        mddirs_by_krb_id = {}
-        for calmeta_pth in sorted(reports_dir.glob('*/calibration_metadata.yml')):
-            with calmeta_pth.open('r', encoding='utf-8') as f:
-                calmeta = yaml.safe_load(f)
+        async def _continue():
+            """Runs in the background after we reply to the ZMQ request"""
+            if len(mddirs_by_krb_id) == 0:
+                in_folder = self.config['correct']['in-folder'].format(
+                    instrument=instrument, cycle=cycle, proposal=proposal)
+                rpath = os.path.join(in_folder, f"r{int(runnr):04d}/")
+                msg = Errors.NOTHING_TO_DO.format(rpath)
+                logging.warning(msg)
+                await update_mdc_status(self.mdc, 'correct', rid, msg)
+                return
 
-            try:
-                request_time = calmeta["runtime-summary"]["pipeline-steps"]["request-time"]
-                karabo_id = calmeta["calibration-configurations"]["karabo-id"]
-            except KeyError:
-                logging.warning("Did not find expected metadata in %s",
-                                calmeta_pth, exc_info=True)
-            else:
-                mddirs_by_krb_id.setdefault(karabo_id, []).append(
-                    (calmeta_pth.parent, request_time)
-                )
+            await update_mdc_status(self.mdc, 'correct', rid, queued_msg)
 
-        logging.info("Found %d corrections to re-run for p%s r%s: %s",
-                     len(mddirs_by_krb_id), proposal, runnr, list(mddirs_by_krb_id))
+            ret = []
+            for karabo_id, mddirs in mddirs_by_krb_id.items():
+                # Select the latest metadata directory - with the highest request
+                # time - to re-run for each detector (karabo_id)
+                mddir, _ = max(mddirs, key=lambda p: p[1])
 
-        for karabo_id, mddirs in mddirs_by_krb_id.items():
-            # Select the latest metadata directory - with the highest request
-            # time - to re-run for each detector (karabo_id)
-            mddir, _ = max(mddirs, key=lambda p: p[1])
+                logging.info("Repeating correction for %s from %s", karabo_id, mddir)
 
-            logging.info("Repeating correction for %s from %s", karabo_id, mddir)
+                cmd = ['python', '-m', 'xfel_calibrate.repeat', mddir]
 
-            cmd = ['python', '-m', 'xfel_calibrate.repeat', mddir]
+                with self.job_db:
+                    cur = self.job_db.execute(
+                        "INSERT INTO executions VALUES (NULL, ?, ?, NULL, ?, NULL)",
+                        (req_id, shlex.join(cmd), karabo_id)
+                    )
+                    exec_id = cur.lastrowid
 
-            with self.job_db:
-                cur = self.job_db.execute(
-                    "INSERT INTO executions VALUES (NULL, ?, ?, NULL, ?, NULL)",
-                    (req_id, shlex.join(cmd), karabo_id)
-                )
-                exec_id = cur.lastrowid
+                ret.append(await run_action(
+                    self.job_db, cmd, self.mode,
+                    proposal, runnr, exec_id
+                ))
 
-            await run_action(
-                self.job_db, cmd, self.mode,
-                proposal, runnr, exec_id
+            await update_mdc_status(self.mdc, 'correct', rid, ", ".join(ret))
+            await get_event_loop().run_in_executor(
+                None, self.mdc.update_run_api,
+                rid, {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()}
             )
+            # END of part to run after sending reply
+
+        asyncio.ensure_future(_continue())
+
+        return queued_msg.encode()
 
     async def handle_dark_request(
             self, rid, _sase, instrument, cycle, proposal, karabo_id,
-- 
GitLab