diff --git a/webservice/messages.py b/webservice/messages.py index 4928545df757c3e8560e51eb714de4aca2ebdfb2..48b17b367cc4465c365fa129edf260c7d80ee1bb 100644 --- a/webservice/messages.py +++ b/webservice/messages.py @@ -43,3 +43,4 @@ class Success: REPROD_QUEUED = "SUCCESS: Queued proposal {}, run {} for reproducing previous offline calibration" DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}" DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}" + ALREADY_REQUESTED = "SUCCESS: Correction already queued/running for proposal {}, run {}" diff --git a/webservice/webservice.py b/webservice/webservice.py index 45ec4ecf9d0bfc496244ad0ea0dda65068c9e503..2663b27f416e6db59aff20cb3c95cd9734528c00 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -62,6 +62,7 @@ def init_job_db(config): action, timestamp ); + CREATE INDEX IF NOT EXISTS req_by_run ON requests(proposal, run, action); CREATE TABLE IF NOT EXISTS executions( exec_id INTEGER PRIMARY KEY, req_id REFERENCES requests(req_id), @@ -962,6 +963,12 @@ class ActionsServer: proposal = self._normalise_proposal_num(proposal) pconf_full = self.load_proposal_config(cycle, proposal) + if self.check_unfinished_correction(proposal, int(runnr)): + # A correction is already running for this run + msg = Success.ALREADY_REQUESTED.format(proposal, runnr) + logging.debug(msg) + return msg.encode() + with time_db_transaction(self.job_db, 'Insert request'): cur = self.job_db.execute( # 2 "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", @@ -1082,6 +1089,12 @@ class ActionsServer: request_time = datetime.now() try: + if self.check_unfinished_correction(proposal, int(runnr)): + # A correction is already running for this run + msg = Success.ALREADY_REQUESTED.format(proposal, runnr) + logging.debug(msg) + return msg.encode() + with time_db_transaction(self.job_db, 'Insert request'): cur = self.job_db.execute( "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", @@ -1368,6 +1381,16 @@ class ActionsServer: # Helper methods for handlers --------------------------------------------- + def check_unfinished_correction(self, proposal: str, runnr: int): + row = self.job_db.execute( + "SELECT job_id FROM slurm_jobs " + "INNER JOIN executions USING (exec_id) " + "INNER JOIN requests USING (req_id) " + "WHERE proposal = ? AND run = ? AND action = 'CORRECT' " + " AND slurm_jobs.finished = 0", (proposal, runnr) + ).fetchone() + return row is not None + @staticmethod def _normalise_proposal_num(p: str) -> str: return "{:06d}".format(int(p.strip('p')))