Skip to content
Snippets Groups Projects

[Webservice] Restructure database to give more meaningful success/failure information

Merged Thomas Kluyver requested to merge webservice-refactor-db into master
+ 11
6
@@ -111,9 +111,6 @@ class JobsMonitor:
def do_updates(self):
ongoing_jobs_by_exn = self.get_updates_by_exec_id()
# ^ dict grouping statuses of unfinished jobs by execution ID:
# {12345: ['R-5:41', 'PD-0:00', ...]}
# Newly completed executions are present with an empty list.
# For executions still running, regroup the jobs by request
# (by run, for correction requests):
@@ -144,7 +141,13 @@ class JobsMonitor:
for req_id in reqs_finished:
self.process_request_finished(req_id)
def get_updates_by_exec_id(self):
def get_updates_by_exec_id(self) -> dict:
"""Get statuses of unfinished jobs, grouped by execution ID
E.g. {12345: ['R-5:41', 'PD-0:00', ...]}
Newly completed executions are present with an empty list.
"""
c = self.job_db.cursor()
c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0")
@@ -202,7 +205,7 @@ class JobsMonitor:
self.mymdc_update_dark(mymdc_id, msg)
def process_execution_finished(self, exec_id):
"""Send notification that one execution has finished"""
"""Send notification & record that one execution has finished"""
statuses = [r[0] for r in self.job_db.execute(
"SELECT status FROM slurm_jobs WHERE exec_id = ?", (exec_id,)
).fetchall()]
@@ -238,7 +241,7 @@ class JobsMonitor:
return r['req_id']
def process_request_finished(self, req_id):
"""Send notifications that a request has finished"""
"""Send Kafka notifications and update myMDC that a request has finished."""
krb_id_successes = {r[0]: r[1] for r in self.job_db.execute(
"SELECT karabo_id, success FROM executions WHERE req_id = ?",
(req_id,)
@@ -283,6 +286,7 @@ class JobsMonitor:
self.mymdc_update_dark(r['mymdc_id'], msg, status)
def mymdc_update_run(self, run_id, msg, status='R'):
"""Update correction status in MyMdC"""
data = {'flg_cal_data_status': status,
'cal_pipeline_reply': msg}
if status != 'R':
@@ -293,6 +297,7 @@ class JobsMonitor:
log.error(Errors.MDC_RESPONSE.format(response))
def mymdc_update_dark(self, dark_run_id, msg, status='IP'):
"""Update dark run status in MyMdC"""
data = {'dark_run': {'flg_status': status,
'calcat_feedback': msg}}
response = self.mdc.update_dark_run_api(dark_run_id, data)
Loading