Skip to content
Snippets Groups Projects
Commit 7aa9181b authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'fix/webservice-already-running' into 'master'

[Webservice] Check for unfinished corrections before launching a new one

See merge request !836
parents d3932bb4 87fa2921
No related branches found
No related tags found
1 merge request!836[Webservice] Check for unfinished corrections before launching a new one
......@@ -43,3 +43,4 @@ class Success:
REPROD_QUEUED = "SUCCESS: Queued proposal {}, run {} for reproducing previous offline calibration"
DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}"
DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}"
ALREADY_REQUESTED = "SUCCESS: Correction already queued/running for proposal {}, run {}"
......@@ -62,6 +62,7 @@ def init_job_db(config):
action,
timestamp
);
CREATE INDEX IF NOT EXISTS req_by_run ON requests(proposal, run, action);
CREATE TABLE IF NOT EXISTS executions(
exec_id INTEGER PRIMARY KEY,
req_id REFERENCES requests(req_id),
......@@ -962,6 +963,12 @@ class ActionsServer:
proposal = self._normalise_proposal_num(proposal)
pconf_full = self.load_proposal_config(cycle, proposal)
if self.check_unfinished_correction(proposal, int(runnr)):
# A correction is already running for this run
msg = Success.ALREADY_REQUESTED.format(proposal, runnr)
logging.debug(msg)
return msg.encode()
with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute( # 2
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
......@@ -1082,6 +1089,12 @@ class ActionsServer:
request_time = datetime.now()
try:
if self.check_unfinished_correction(proposal, int(runnr)):
# A correction is already running for this run
msg = Success.ALREADY_REQUESTED.format(proposal, runnr)
logging.debug(msg)
return msg.encode()
with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute(
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
......@@ -1368,6 +1381,16 @@ class ActionsServer:
# Helper methods for handlers ---------------------------------------------
def check_unfinished_correction(self, proposal: str, runnr: int):
row = self.job_db.execute(
"SELECT job_id FROM slurm_jobs "
"INNER JOIN executions USING (exec_id) "
"INNER JOIN requests USING (req_id) "
"WHERE proposal = ? AND run = ? AND action = 'CORRECT' "
" AND slurm_jobs.finished = 0", (proposal, runnr)
).fetchone()
return row is not None
@staticmethod
def _normalise_proposal_num(p: str) -> str:
return "{:06d}".format(int(p.strip('p')))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment