From cb8c5d80220c3f743d776c3973197252f79f86b7 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Wed, 19 Apr 2023 13:50:37 +0100 Subject: [PATCH] Check for unfinished corrections before launching a new one --- webservice/messages.py | 1 + webservice/webservice.py | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/webservice/messages.py b/webservice/messages.py index 4928545df..48b17b367 100644 --- a/webservice/messages.py +++ b/webservice/messages.py @@ -43,3 +43,4 @@ class Success: REPROD_QUEUED = "SUCCESS: Queued proposal {}, run {} for reproducing previous offline calibration" DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}" DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}" + ALREADY_REQUESTED = "SUCCESS: Correction already queued/running for proposal {}, run {}" diff --git a/webservice/webservice.py b/webservice/webservice.py index 45ec4ecf9..44d673f72 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -62,6 +62,7 @@ def init_job_db(config): action, timestamp ); + CREATE INDEX IF NOT EXISTS req_by_run ON requests(proposal, run, action); CREATE TABLE IF NOT EXISTS executions( exec_id INTEGER PRIMARY KEY, req_id REFERENCES requests(req_id), @@ -962,6 +963,12 @@ class ActionsServer: proposal = self._normalise_proposal_num(proposal) pconf_full = self.load_proposal_config(cycle, proposal) + if self.check_unfinished_correction(proposal, int(runnr)): + # A correction is already running for this run + msg = Success.ALREADY_REQUESTED.format(proposal, runnr) + logging.debug(msg) + return msg + with time_db_transaction(self.job_db, 'Insert request'): cur = self.job_db.execute( # 2 "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", @@ -1082,6 +1089,12 @@ class ActionsServer: request_time = datetime.now() try: + if self.check_unfinished_correction(proposal, int(runnr)): + # A correction is already running for this run + msg = Success.ALREADY_REQUESTED.format(proposal, runnr) + logging.debug(msg) + return msg + with time_db_transaction(self.job_db, 'Insert request'): cur = self.job_db.execute( "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", @@ -1368,6 +1381,16 @@ class ActionsServer: # Helper methods for handlers --------------------------------------------- + def check_unfinished_correction(self, proposal: str, runnr: int): + row = self.job_db.execute( + "SELECT job_id FROM slurm_jobs " + "INNER JOIN executions USING (exec_id) " + "INNER JOIN requests USING (req_id) " + "WHERE proposal = ? AND run = ? AND action = 'CORRECT' " + " AND slurm_jobs.finished = 0", (proposal, runnr) + ) + return row is not None + @staticmethod def _normalise_proposal_num(p: str) -> str: return "{:06d}".format(int(p.strip('p'))) -- GitLab