Skip to content
Snippets Groups Projects

[Webservice] Restructure database to give more meaningful success/failure information

Merged Thomas Kluyver requested to merge webservice-refactor-db into master
Files
2
+ 207
107
@@ -84,116 +84,214 @@ def slurm_job_status(jobid):
@@ -84,116 +84,214 @@ def slurm_job_status(jobid):
return "NA", "NA", "NA"
return "NA", "NA", "NA"
def update_job_db(config):
class JobsMonitor:
""" Update the job database and send out updates to MDC
def __init__(self, config):
 
log.info("Starting jobs monitor")
 
self.job_db = init_job_db(config)
 
self.mdc = init_md_client(config)
 
self.kafka_prod = init_kafka_producer(config)
 
self.kafka_topic = config['kafka']['topic']
 
self.time_interval = int(config['web-service']['job-update-interval'])
 
 
def run(self):
 
while True:
 
try:
 
self.do_updates()
 
except Exception:
 
log.error("Failure to update job DB", exc_info=True)
 
time.sleep(self.time_interval)
 
 
def do_updates(self):
 
ongoing_jobs_by_exn = self.get_updates_by_exec_id()
 
# ^ dict grouping statuses of unfinished jobs by execution ID:
 
# {12345: ['R-5:41', 'PD-0:00', ...]}
 
# Newly completed executions are present with an empty list.
 
 
# For executions still running, regroup the jobs by request
 
# (by run, for correction requests):
 
reqs_still_going = {}
 
for exec_id, running_jobs_info in ongoing_jobs_by_exn.items():
 
if running_jobs_info:
 
req_id = self.job_db.execute(
 
"SELECT req_id FROM executions WHERE exec_id = ?", (exec_id,)
 
).fetchone()[0]
 
reqs_still_going.setdefault(req_id, []).extend(running_jobs_info)
 
 
# For executions that have finished, send out notifications, and
 
# check if the whole request (several executions) has finished.
 
reqs_finished = set()
 
for exec_id, running_jobs_info in ongoing_jobs_by_exn.items():
 
if not running_jobs_info:
 
req_id = self.process_execution_finished(exec_id)
 
 
if req_id not in reqs_still_going:
 
reqs_finished.add(req_id)
 
 
# Now send updates for all requests which hadn't already finished
 
# by the last time this ran:
 
 
for req_id, running_jobs_info in reqs_still_going.items():
 
self.process_request_still_going(req_id, running_jobs_info)
 
 
for req_id in reqs_finished:
 
self.process_request_finished(req_id)
 
 
def get_updates_by_exec_id(self):
 
c = self.job_db.cursor()
 
c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0")
:param config: configuration parsed from webservice YAML
"""
log.info("Starting jobs monitor")
conn = init_job_db(config)
mdc = init_md_client(config)
kafka_prod = init_kafka_producer(config)
kafka_topic = config['kafka']['topic']
time_interval = int(config['web-service']['job-update-interval'])
while True:
statii = slurm_status()
statii = slurm_status()
# Check that slurm is giving proper feedback
# Check that slurm is giving proper feedback
if statii is None:
if statii is None:
time.sleep(time_interval)
return {}
continue
log.debug(f"SLURM info {statii}")
try:
c = conn.cursor()
ongoing_jobs_by_exn = {}
c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ")
for r in c.fetchall():
combined = {}
log.debug(f"Job in DB before update: %s", tuple(r))
log.debug(f"SLURM info {statii}")
execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], [])
for r in c.fetchall():
if str(r['job_id']) in statii:
rid, jobid, proposal, run, status, _time, det, action = r
# statii contains jobs which are still going (from squeue)
log.debug(f"DB info {r}")
slstatus, runtime = statii[str(r['job_id'])]
finished = False
cflg, cstatus, *_ = combined.setdefault((rid, action), (
execn_ongoing_jobs.append(f"{slstatus}-{runtime}")
[], [], proposal, run, det
))
else:
if jobid in statii:
# These jobs have finished (successfully or otherwise)
slstatus, runtime = statii[jobid]
_, runtime, slstatus = slurm_job_status(r['job_id'])
query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
finished = True
c.execute(query, (slstatus, runtime, jobid))
c.execute(
cflg.append('R')
"UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?",
cstatus.append(f"{slstatus}-{runtime}")
(finished, runtime, slstatus, r['job_id'])
else:
)
_, sltime, slstatus = slurm_job_status(jobid)
query = "UPDATE jobs SET status=? WHERE jobid LIKE ?"
self.job_db.commit()
c.execute(query, (slstatus, jobid))
return ongoing_jobs_by_exn
if slstatus == 'COMPLETED':
def process_request_still_going(self, req_id, running_jobs_info):
cflg.append("A")
"""Send myMdC updates for a request with jobs still running/pending"""
else:
mymdc_id, action = self.job_db.execute(
cflg.append("NA")
"SELECT mymdc_id, action FROM requests WHERE req_id = ?",
cstatus.append(slstatus)
(req_id,)
conn.commit()
).fetchone()
flg_order = {"R": 2, "A": 1, "NA": 0}
if all(s.startswith('PD-') for s in running_jobs_info):
dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'}
# Avoid swamping myMdC with updates for jobs still pending.
for rid, action in combined:
log.debug("No update for %s request with mymdc id %s: jobs pending",
if int(rid) == 0: # this job was not submitted from MyMDC
action, mymdc_id)
continue
return
flgs, statii, proposal, run, det = combined[rid, action]
# sort by least done status
msg = "\n".join(running_jobs_info)
flg = max(flgs, key=lambda i: flg_order[i])
log.debug("Update MDC for %s, %s: %s",
if flg != 'R':
action, mymdc_id, ', '.join(running_jobs_info)
log.info(
)
"Jobs finished - action: %s, run id: %s, status: %s",
action, rid, flg,
if action == 'CORRECT':
)
self.mymdc_update_run(mymdc_id, msg)
if action == 'CORRECT':
else: # action == 'DARK'
try:
self.mymdc_update_dark(mymdc_id, msg)
kafka_prod.send(kafka_topic, {
'event': 'correction_complete',
def process_execution_finished(self, exec_id):
'proposal': proposal,
"""Send notification that one execution has finished"""
'run': run,
statuses = [r[0] for r in self.job_db.execute(
'detector': det,
"SELECT status FROM slurm_jobs WHERE exec_id = ?", (exec_id,)
'success': (flg == 'A'), # A for Available
).fetchall()]
})
success = set(statuses) == {'COMPLETED'}
except KafkaError:
r = self.job_db.execute(
log.warning("Error sending Kafka notification",
"SELECT det_type, karabo_id, req_id, proposal, run, action "
exc_info=True)
"FROM executions JOIN requests USING (req_id)"
"WHERE exec_id = ?",
if all(s.startswith('PD-') for s in statii):
(exec_id,)
# Avoid swamping myMdC with updates for jobs still pending.
).fetchone()
log.debug(
with self.job_db:
"No update for action %s, rid %s: jobs pending",
self.job_db.execute(
action, rid
"UPDATE executions SET success = ? WHERE exec_id = ?",
)
(success, exec_id)
continue
)
log.info("Execution finished: %s for (p%s, r%s, %s), success=%s",
msg = "\n".join(statii)
r['action'], r['proposal'], r['run'], r['karabo_id'], success)
msg_debug = f"Update MDC {rid}, {msg}"
log.debug(msg_debug.replace('\n', ', '))
if r['action'] == 'CORRECT':
try:
if action == 'CORRECT':
self.kafka_prod.send(self.kafka_topic, {
data = {'flg_cal_data_status': flg,
'event': 'correction_complete',
'cal_pipeline_reply': msg}
'proposal': r['proposal'],
if flg != 'R':
'run': r['run'],
data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat()
'detector': r['det_type'],
response = mdc.update_run_api(rid, data)
'karabo_id': r['karabo_id'],
'success': success,
else: # action == 'DARK' but it's dark_request
})
data = {'dark_run': {'flg_status': dark_flags[flg],
except KafkaError:
'calcat_feedback': msg}}
log.warning("Error sending Kafka notification",
response = mdc.update_dark_run_api(rid, data)
exc_info=True)
if response.status_code != 200:
return r['req_id']
log.error("Failed to update MDC for action %s, rid %s",
action, rid)
def process_request_finished(self, req_id):
log.error(Errors.MDC_RESPONSE.format(response))
"""Send notifications that a request has finished"""
except Exception:
krb_id_successes = {r[0]: r[1] for r in self.job_db.execute(
log.error("Failure to update job DB", exc_info=True)
"SELECT karabo_id, success FROM executions WHERE req_id = ?",
(req_id,)
time.sleep(time_interval)
).fetchall()}
success = (set(krb_id_successes.values()) == {1})
 
r = self.job_db.execute(
 
"SELECT * FROM requests WHERE req_id = ?", (req_id,)
 
).fetchone()
 
log.info(
 
"Jobs finished - action: %s, myMdC id: %s, success: %s",
 
r['action'], r['mymdc_id'], success,
 
)
 
if r['action'] == 'CORRECT':
 
try:
 
self.kafka_prod.send(self.kafka_topic, {
 
'event': 'run_corrections_complete',
 
'proposal': r['proposal'],
 
'run': r['run'],
 
'success': success,
 
})
 
except KafkaError:
 
log.warning("Error sending Kafka notification",
 
exc_info=True)
 
 
if success:
 
msg = "Calibration jobs succeeded"
 
else:
 
# List success & failure by karabo_id
 
krb_ids_ok = {k for (k, v) in krb_id_successes.items() if v == 1}
 
ok = ', '.join(sorted(krb_ids_ok)) if krb_ids_ok else 'none'
 
krb_ids_failed = {k for (k, v) in krb_id_successes.items() if v == 0}
 
msg = f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))}"
 
 
log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg)
 
 
if r['action'] == 'CORRECT':
 
status = 'A' if success else 'NA' # Not-/Available
 
self.mymdc_update_run(r['mymdc_id'], msg, status)
 
else: # r['action'] == 'DARK'
 
status = 'F' if success else 'E' # Finished/Error
 
self.mymdc_update_dark(r['mymdc_id'], msg, status)
 
 
def mymdc_update_run(self, run_id, msg, status='R'):
 
data = {'flg_cal_data_status': status,
 
'cal_pipeline_reply': msg}
 
if status != 'R':
 
data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat()
 
response = self.mdc.update_run_api(run_id, data)
 
if response.status_code != 200:
 
log.error("Failed to update MDC run id %s", run_id)
 
log.error(Errors.MDC_RESPONSE.format(response))
 
 
def mymdc_update_dark(self, dark_run_id, msg, status='IP'):
 
data = {'dark_run': {'flg_status': status,
 
'calcat_feedback': msg}}
 
response = self.mdc.update_dark_run_api(dark_run_id, data)
 
 
if response.status_code != 200:
 
log.error("Failed to update MDC dark run id %s", dark_run_id)
 
log.error(Errors.MDC_RESPONSE.format(response))
def main(argv=None):
def main(argv=None):
# Ensure files are opened as UTF-8 by default, regardless of environment.
# Ensure files are opened as UTF-8 by default, regardless of environment.
@@ -218,7 +316,9 @@ def main(argv=None):
@@ -218,7 +316,9 @@ def main(argv=None):
level=getattr(logging, args.log_level),
level=getattr(logging, args.log_level),
format=fmt
format=fmt
)
)
update_job_db(config)
# DEBUG logs from kafka-python are very verbose, so we'll turn them off
 
logging.getLogger('kafka').setLevel(logging.INFO)
 
JobsMonitor(config).run()
if __name__ == "__main__":
if __name__ == "__main__":
Loading