Skip to content
Snippets Groups Projects

Make webservice more systemd friendly

Closed Thomas Kluyver requested to merge separate-jobs-watcher into master
1 unresolved thread
2 files
+ 209
167
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 209
0
import argparse
import getpass
import json
import locale
import logging
from subprocess import run, PIPE
import sys
import time
from typing import List, Optional
from kafka import KafkaProducer
from kafka.errors import KafkaError
import yaml
from messages import Errors
from webservice import init_job_db, init_md_client
def init_kafka_producer(config):
try:
return KafkaProducer(
bootstrap_servers=config['kafka']['brokers'],
value_serializer=lambda d: json.dumps(d).encode('utf-8'),
max_block_ms=2000, # Don't get stuck trying to send Kafka messages
)
except KafkaError:
logging.warning("Problem initialising Kafka producer; "
"Kafka notifications will not be sent.", exc_info=True)
return NoOpProducer()
class NoOpProducer:
"""Fills in for Kafka producer object when setting that up fails"""
def send(self, topic, value):
pass
def slurm_status(filter_user=True):
""" Return the status of slurm jobs by calling squeue
:param filter_user: set to true to filter ony jobs from current user
:return: a dictionary indexed by slurm jobid and containing a tuple
of (status, run time) as values.
"""
cmd = ["squeue"]
if filter_user:
cmd += ["-u", getpass.getuser()]
res = run(cmd, stdout=PIPE)
if res.returncode == 0:
rlines = res.stdout.decode().split("\n")
statii = {}
for r in rlines[1:]:
try:
jobid, _, _, _, status, runtime, _, _ = r.split()
jobid = jobid.strip()
statii[jobid] = status, runtime
except ValueError: # not enough values to unpack in split
pass
return statii
def slurm_job_status(jobid):
""" Return the status of slurm job
:param jobid: Slurm job Id
:return: Slurm state, Elapsed.
"""
cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"]
res = run(cmd, stdout=PIPE)
if res.returncode == 0:
rlines = res.stdout.decode().split("\n")
logging.debug("Job {} state {}".format(jobid, rlines[2].split()))
if len(rlines[2].split()) == 3:
return rlines[2].replace("+", "").split()
return "NA", "NA", "NA"
def update_job_db(config):
""" Update the job database and send out updates to MDC
This runs in its own thread.
:param config: configuration parsed from webservice YAML
"""
logging.info("Starting config db handling")
conn = init_job_db(config)
mdc = init_md_client(config)
kafka_prod = init_kafka_producer(config)
kafka_topic = config['kafka']['topic']
time_interval = int(config['web-service']['job-update-interval'])
while True:
statii = slurm_status()
# Check that slurm is giving proper feedback
if statii is None:
time.sleep(time_interval)
continue
try:
c = conn.cursor()
c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ")
combined = {}
logging.debug("SLURM info {}".format(statii))
for r in c.fetchall():
rid, jobid, proposal, run, status, _time, det, action = r
logging.debug("DB info {}".format(r))
cflg, cstatus, *_ = combined.setdefault((rid, action), (
[], [], proposal, run, det
))
if jobid in statii:
slstatus, runtime = statii[jobid]
query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
c.execute(query, (slstatus, runtime, jobid))
cflg.append('R')
cstatus.append("{}-{}".format(slstatus, runtime))
else:
_, sltime, slstatus = slurm_job_status(jobid)
query = "UPDATE jobs SET status=? WHERE jobid LIKE ?"
c.execute(query, (slstatus, jobid))
if slstatus == 'COMPLETED':
cflg.append("A")
else:
cflg.append("NA")
cstatus.append(slstatus)
conn.commit()
flg_order = {"R": 2, "A": 1, "NA": 0}
dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'}
for rid, action in combined:
if int(rid) == 0: # this job was not submitted from MyMDC
continue
flgs, statii, proposal, run, det = combined[rid, action]
# sort by least done status
flg = max(flgs, key=lambda i: flg_order[i])
if flg != 'R':
logging.info(
"Jobs finished - action: %s, run id: %s, status: %s",
action, rid, flg,
)
if action == 'CORRECT':
try:
kafka_prod.send(kafka_topic, {
'event': 'correction_complete',
'proposal': proposal,
'run': run,
'detector': det,
'success': (flg == 'A'), # A for Available
})
except KafkaError:
logging.warning("Error sending Kafka notification",
exc_info=True)
msg = "\n".join(statii)
msg_debug = f"Update MDC {rid}, {msg}"
logging.debug(msg_debug.replace('\n', ', '))
if action == 'CORRECT':
response = mdc.update_run_api(rid,
{'flg_cal_data_status': flg,
'cal_pipeline_reply': msg})
else: # action == 'DARK' but it's dark_request
data = {'dark_run': {'flg_status': dark_flags[flg],
'calcat_feedback': msg}}
response = mdc.update_dark_run_api(rid, data)
if response.status_code != 200:
logging.error("Failed to update MDC for action %s, rid %s",
action, rid)
logging.error(Errors.MDC_RESPONSE.format(response))
except Exception:
logging.error("Failure to update job DB", exc_info=True)
time.sleep(time_interval)
def main(argv: Optional[List[str]] = None):
if argv is None:
argv = sys.argv[1:]
# Ensure files are opened as UTF-8 by default, regardless of environment.
locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))
parser = argparse.ArgumentParser(
description='Start the calibration webservice'
)
parser.add_argument('--config-file', type=str, default='./webservice.yaml')
parser.add_argument(
'--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR']
)
args = parser.parse_args(argv)
with open(args.config_file, "r") as f:
config = yaml.safe_load(f.read())
fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s'
logging.basicConfig(
level=getattr(logging, args.log_level),
format=fmt
)
# Update job statuses from Slurm in a separate thread
update_job_db(config)
if __name__ == "__main__":
main()
Loading