Skip to content
Snippets Groups Projects

[Webservice] Restructure database to give more meaningful success/failure information

Merged Thomas Kluyver requested to merge webservice-refactor-db into master
1 file
+ 20
1
Compare changes
  • Side-by-side
  • Inline
+ 231
107
@@ -3,6 +3,7 @@ import argparse
import json
import locale
import logging
import signal
import time
from datetime import datetime, timezone
from pathlib import Path
@@ -84,116 +85,229 @@ def slurm_job_status(jobid):
return "NA", "NA", "NA"
def update_job_db(config):
""" Update the job database and send out updates to MDC
class JobsMonitor:
def __init__(self, config):
log.info("Starting jobs monitor")
self.job_db = init_job_db(config)
self.mdc = init_md_client(config)
self.kafka_prod = init_kafka_producer(config)
self.kafka_topic = config['kafka']['topic']
self.time_interval = int(config['web-service']['job-update-interval'])
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.job_db.close()
self.kafka_prod.close(timeout=5)
def run(self):
while True:
try:
self.do_updates()
except Exception:
log.error("Failure to update job DB", exc_info=True)
time.sleep(self.time_interval)
def do_updates(self):
ongoing_jobs_by_exn = self.get_updates_by_exec_id()
# For executions still running, regroup the jobs by request
# (by run, for correction requests):
reqs_still_going = {}
for exec_id, running_jobs_info in ongoing_jobs_by_exn.items():
if running_jobs_info:
req_id = self.job_db.execute(
"SELECT req_id FROM executions WHERE exec_id = ?", (exec_id,)
).fetchone()[0]
reqs_still_going.setdefault(req_id, []).extend(running_jobs_info)
# For executions that have finished, send out notifications, and
# check if the whole request (several executions) has finished.
reqs_finished = set()
for exec_id, running_jobs_info in ongoing_jobs_by_exn.items():
if not running_jobs_info:
req_id = self.process_execution_finished(exec_id)
if req_id not in reqs_still_going:
reqs_finished.add(req_id)
# Now send updates for all requests which hadn't already finished
# by the last time this ran:
for req_id, running_jobs_info in reqs_still_going.items():
self.process_request_still_going(req_id, running_jobs_info)
for req_id in reqs_finished:
self.process_request_finished(req_id)
def get_updates_by_exec_id(self) -> dict:
"""Get statuses of unfinished jobs, grouped by execution ID
E.g. {12345: ['R-5:41', 'PD-0:00', ...]}
Newly completed executions are present with an empty list.
"""
c = self.job_db.cursor()
c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0")
:param config: configuration parsed from webservice YAML
"""
log.info("Starting jobs monitor")
conn = init_job_db(config)
mdc = init_md_client(config)
kafka_prod = init_kafka_producer(config)
kafka_topic = config['kafka']['topic']
time_interval = int(config['web-service']['job-update-interval'])
while True:
statii = slurm_status()
# Check that slurm is giving proper feedback
if statii is None:
time.sleep(time_interval)
continue
try:
c = conn.cursor()
c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ")
combined = {}
log.debug(f"SLURM info {statii}")
for r in c.fetchall():
rid, jobid, proposal, run, status, _time, det, action = r
log.debug(f"DB info {r}")
cflg, cstatus, *_ = combined.setdefault((rid, action), (
[], [], proposal, run, det
))
if jobid in statii:
slstatus, runtime = statii[jobid]
query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
c.execute(query, (slstatus, runtime, jobid))
cflg.append('R')
cstatus.append(f"{slstatus}-{runtime}")
else:
_, sltime, slstatus = slurm_job_status(jobid)
query = "UPDATE jobs SET status=? WHERE jobid LIKE ?"
c.execute(query, (slstatus, jobid))
if slstatus == 'COMPLETED':
cflg.append("A")
else:
cflg.append("NA")
cstatus.append(slstatus)
conn.commit()
flg_order = {"R": 2, "A": 1, "NA": 0}
dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'}
for rid, action in combined:
if int(rid) == 0: # this job was not submitted from MyMDC
continue
flgs, statii, proposal, run, det = combined[rid, action]
# sort by least done status
flg = max(flgs, key=lambda i: flg_order[i])
if flg != 'R':
log.info(
"Jobs finished - action: %s, run id: %s, status: %s",
action, rid, flg,
)
if action == 'CORRECT':
try:
kafka_prod.send(kafka_topic, {
'event': 'correction_complete',
'proposal': proposal,
'run': run,
'detector': det,
'success': (flg == 'A'), # A for Available
})
except KafkaError:
log.warning("Error sending Kafka notification",
exc_info=True)
if all(s.startswith('PD-') for s in statii):
# Avoid swamping myMdC with updates for jobs still pending.
log.debug(
"No update for action %s, rid %s: jobs pending",
action, rid
)
continue
msg = "\n".join(statii)
msg_debug = f"Update MDC {rid}, {msg}"
log.debug(msg_debug.replace('\n', ', '))
if action == 'CORRECT':
data = {'flg_cal_data_status': flg,
'cal_pipeline_reply': msg}
if flg != 'R':
data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat()
response = mdc.update_run_api(rid, data)
else: # action == 'DARK' but it's dark_request
data = {'dark_run': {'flg_status': dark_flags[flg],
'calcat_feedback': msg}}
response = mdc.update_dark_run_api(rid, data)
if response.status_code != 200:
log.error("Failed to update MDC for action %s, rid %s",
action, rid)
log.error(Errors.MDC_RESPONSE.format(response))
except Exception:
log.error("Failure to update job DB", exc_info=True)
time.sleep(time_interval)
return {}
log.debug(f"SLURM info {statii}")
ongoing_jobs_by_exn = {}
for r in c.fetchall():
log.debug(f"Job in DB before update: %s", tuple(r))
execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], [])
if str(r['job_id']) in statii:
# statii contains jobs which are still going (from squeue)
slstatus, runtime = statii[str(r['job_id'])]
finished = False
execn_ongoing_jobs.append(f"{slstatus}-{runtime}")
else:
# These jobs have finished (successfully or otherwise)
_, runtime, slstatus = slurm_job_status(r['job_id'])
finished = True
c.execute(
"UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?",
(finished, runtime, slstatus, r['job_id'])
)
self.job_db.commit()
return ongoing_jobs_by_exn
def process_request_still_going(self, req_id, running_jobs_info):
"""Send myMdC updates for a request with jobs still running/pending"""
mymdc_id, action = self.job_db.execute(
"SELECT mymdc_id, action FROM requests WHERE req_id = ?",
(req_id,)
).fetchone()
if all(s.startswith('PD-') for s in running_jobs_info):
# Avoid swamping myMdC with updates for jobs still pending.
log.debug("No update for %s request with mymdc id %s: jobs pending",
action, mymdc_id)
return
msg = "\n".join(running_jobs_info)
log.debug("Update MDC for %s, %s: %s",
action, mymdc_id, ', '.join(running_jobs_info)
)
if action == 'CORRECT':
self.mymdc_update_run(mymdc_id, msg)
else: # action == 'DARK'
self.mymdc_update_dark(mymdc_id, msg)
def process_execution_finished(self, exec_id):
"""Send notification & record that one execution has finished"""
statuses = [r[0] for r in self.job_db.execute(
"SELECT status FROM slurm_jobs WHERE exec_id = ?", (exec_id,)
).fetchall()]
success = set(statuses) == {'COMPLETED'}
r = self.job_db.execute(
"SELECT det_type, karabo_id, req_id, proposal, run, action "
"FROM executions JOIN requests USING (req_id)"
"WHERE exec_id = ?",
(exec_id,)
).fetchone()
with self.job_db:
self.job_db.execute(
"UPDATE executions SET success = ? WHERE exec_id = ?",
(success, exec_id)
)
log.info("Execution finished: %s for (p%s, r%s, %s), success=%s",
r['action'], r['proposal'], r['run'], r['karabo_id'], success)
if r['action'] == 'CORRECT':
try:
self.kafka_prod.send(self.kafka_topic, {
'event': 'correction_complete',
'proposal': r['proposal'],
'run': str(r['run']),
'detector': r['det_type'],
'detector_identifier': r['karabo_id'],
'success': success,
})
except KafkaError:
log.warning("Error sending Kafka notification",
exc_info=True)
return r['req_id']
def process_request_finished(self, req_id):
"""Send Kafka notifications and update myMDC that a request has finished."""
krb_id_successes = {r[0]: r[1] for r in self.job_db.execute(
"SELECT karabo_id, success FROM executions WHERE req_id = ?",
(req_id,)
).fetchall()}
success = (set(krb_id_successes.values()) == {1})
r = self.job_db.execute(
"SELECT * FROM requests WHERE req_id = ?", (req_id,)
).fetchone()
log.info(
"Jobs finished - action: %s, myMdC id: %s, success: %s",
r['action'], r['mymdc_id'], success,
)
if r['action'] == 'CORRECT':
try:
self.kafka_prod.send(self.kafka_topic, {
'event': 'run_corrections_complete',
'proposal': r['proposal'],
'run': str(r['run']),
'success': success,
})
except KafkaError:
log.warning("Error sending Kafka notification",
exc_info=True)
if success:
msg = "Calibration jobs succeeded"
else:
# List success & failure by karabo_id
krb_ids_ok = {k for (k, v) in krb_id_successes.items() if v == 1}
ok = ', '.join(sorted(krb_ids_ok)) if krb_ids_ok else 'none'
krb_ids_failed = {k for (k, v) in krb_id_successes.items() if v == 0}
msg = f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))}"
log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg)
if r['action'] == 'CORRECT':
status = 'A' if success else 'NA' # Not-/Available
self.mymdc_update_run(r['mymdc_id'], msg, status)
else: # r['action'] == 'DARK'
status = 'F' if success else 'E' # Finished/Error
self.mymdc_update_dark(r['mymdc_id'], msg, status)
def mymdc_update_run(self, run_id, msg, status='R'):
"""Update correction status in MyMdC"""
data = {'flg_cal_data_status': status,
'cal_pipeline_reply': msg}
if status != 'R':
data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat()
response = self.mdc.update_run_api(run_id, data)
if response.status_code != 200:
log.error("Failed to update MDC run id %s", run_id)
log.error(Errors.MDC_RESPONSE.format(response))
def mymdc_update_dark(self, dark_run_id, msg, status='IP'):
"""Update dark run status in MyMdC"""
data = {'dark_run': {'flg_status': status,
'calcat_feedback': msg}}
response = self.mdc.update_dark_run_api(dark_run_id, data)
if response.status_code != 200:
log.error("Failed to update MDC dark run id %s", dark_run_id)
log.error(Errors.MDC_RESPONSE.format(response))
def interrupted(signum, frame):
raise KeyboardInterrupt
def main(argv=None):
# Ensure files are opened as UTF-8 by default, regardless of environment.
@@ -218,7 +332,17 @@ def main(argv=None):
level=getattr(logging, args.log_level),
format=fmt
)
update_job_db(config)
# DEBUG logs from kafka-python are very verbose, so we'll turn them off
logging.getLogger('kafka').setLevel(logging.INFO)
# Treat SIGTERM like SIGINT (Ctrl-C) & do a clean shutdown
signal.signal(signal.SIGTERM, interrupted)
with JobsMonitor(config) as jm:
try:
jm.run()
except KeyboardInterrupt:
logging.info("Shutting down on SIGINT/SIGTERM")
if __name__ == "__main__":
Loading