Skip to content
Snippets Groups Projects
Commit cb8c5d80 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Check for unfinished corrections before launching a new one

parent d3932bb4
No related branches found
No related tags found
1 merge request!836[Webservice] Check for unfinished corrections before launching a new one
...@@ -43,3 +43,4 @@ class Success: ...@@ -43,3 +43,4 @@ class Success:
REPROD_QUEUED = "SUCCESS: Queued proposal {}, run {} for reproducing previous offline calibration" REPROD_QUEUED = "SUCCESS: Queued proposal {}, run {} for reproducing previous offline calibration"
DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}" DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}"
DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}" DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}"
ALREADY_REQUESTED = "SUCCESS: Correction already queued/running for proposal {}, run {}"
...@@ -62,6 +62,7 @@ def init_job_db(config): ...@@ -62,6 +62,7 @@ def init_job_db(config):
action, action,
timestamp timestamp
); );
CREATE INDEX IF NOT EXISTS req_by_run ON requests(proposal, run, action);
CREATE TABLE IF NOT EXISTS executions( CREATE TABLE IF NOT EXISTS executions(
exec_id INTEGER PRIMARY KEY, exec_id INTEGER PRIMARY KEY,
req_id REFERENCES requests(req_id), req_id REFERENCES requests(req_id),
...@@ -962,6 +963,12 @@ class ActionsServer: ...@@ -962,6 +963,12 @@ class ActionsServer:
proposal = self._normalise_proposal_num(proposal) proposal = self._normalise_proposal_num(proposal)
pconf_full = self.load_proposal_config(cycle, proposal) pconf_full = self.load_proposal_config(cycle, proposal)
if self.check_unfinished_correction(proposal, int(runnr)):
# A correction is already running for this run
msg = Success.ALREADY_REQUESTED.format(proposal, runnr)
logging.debug(msg)
return msg
with time_db_transaction(self.job_db, 'Insert request'): with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute( # 2 cur = self.job_db.execute( # 2
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
...@@ -1082,6 +1089,12 @@ class ActionsServer: ...@@ -1082,6 +1089,12 @@ class ActionsServer:
request_time = datetime.now() request_time = datetime.now()
try: try:
if self.check_unfinished_correction(proposal, int(runnr)):
# A correction is already running for this run
msg = Success.ALREADY_REQUESTED.format(proposal, runnr)
logging.debug(msg)
return msg
with time_db_transaction(self.job_db, 'Insert request'): with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute( cur = self.job_db.execute(
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
...@@ -1368,6 +1381,16 @@ class ActionsServer: ...@@ -1368,6 +1381,16 @@ class ActionsServer:
# Helper methods for handlers --------------------------------------------- # Helper methods for handlers ---------------------------------------------
def check_unfinished_correction(self, proposal: str, runnr: int):
row = self.job_db.execute(
"SELECT job_id FROM slurm_jobs "
"INNER JOIN executions USING (exec_id) "
"INNER JOIN requests USING (req_id) "
"WHERE proposal = ? AND run = ? AND action = 'CORRECT' "
" AND slurm_jobs.finished = 0", (proposal, runnr)
)
return row is not None
@staticmethod @staticmethod
def _normalise_proposal_num(p: str) -> str: def _normalise_proposal_num(p: str) -> str:
return "{:06d}".format(int(p.strip('p'))) return "{:06d}".format(int(p.strip('p')))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment