Skip to content
Snippets Groups Projects

[AGIPD][TESTs]test_agipdlib AGIPDCtrl and get_bias_voltage for AGIPD1M and AGIPD500K

Merged Karim Ahmed requested to merge test/test_agipdlib into master
All threads resolved!
2 files
+ 230
182
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 230
0
 
"""Monitor calibration jobs in Slurm and send status updates"""
 
import argparse
 
import getpass
 
import json
 
import locale
 
import logging
 
import sys
 
import time
 
from datetime import datetime, timezone
 
from pathlib import Path
 
from subprocess import run, PIPE
 
 
from kafka import KafkaProducer
 
from kafka.errors import KafkaError
 
 
try:
 
from .config import webservice as config
 
from .messages import MDC, Errors, MigrationError, Success
 
from .webservice import init_job_db, init_md_client
 
except ImportError:
 
from config import webservice as config
 
from messages import MDC, Errors, MigrationError, Success
 
from webservice import init_job_db, init_md_client
 
 
log = logging.getLogger(__name__)
 
 
 
class NoOpProducer:
 
"""Fills in for Kafka producer object when setting that up fails"""
 
def send(self, topic, value):
 
pass
 
 
 
def init_kafka_producer(config):
 
try:
 
return KafkaProducer(
 
bootstrap_servers=config['kafka']['brokers'],
 
value_serializer=lambda d: json.dumps(d).encode('utf-8'),
 
max_block_ms=2000, # Don't get stuck trying to send Kafka messages
 
)
 
except KafkaError:
 
logging.warning("Problem initialising Kafka producer; "
 
"Kafka notifications will not be sent.", exc_info=True)
 
return NoOpProducer()
 
 
 
def slurm_status(filter_user=True):
 
""" Return the status of slurm jobs by calling squeue
 
 
:param filter_user: set to true to filter ony jobs from current user
 
:return: a dictionary indexed by slurm jobid and containing a tuple
 
of (status, run time) as values.
 
"""
 
cmd = ["squeue"]
 
if filter_user:
 
cmd += ["-u", getpass.getuser()]
 
res = run(cmd, stdout=PIPE)
 
if res.returncode == 0:
 
rlines = res.stdout.decode().split("\n")
 
statii = {}
 
for r in rlines[1:]:
 
try:
 
jobid, _, _, _, status, runtime, _, _ = r.split()
 
jobid = jobid.strip()
 
statii[jobid] = status, runtime
 
except ValueError: # not enough values to unpack in split
 
pass
 
return statii
 
 
 
def slurm_job_status(jobid):
 
""" Return the status of slurm job
 
 
:param jobid: Slurm job Id
 
:return: Slurm state, Elapsed.
 
"""
 
cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"]
 
 
res = run(cmd, stdout=PIPE)
 
if res.returncode == 0:
 
rlines = res.stdout.decode().split("\n")
 
 
logging.debug("Job {} state {}".format(jobid, rlines[2].split()))
 
if len(rlines[2].split()) == 3:
 
return rlines[2].replace("+", "").split()
 
return "NA", "NA", "NA"
 
 
 
def update_job_db(config):
 
""" Update the job database and send out updates to MDC
 
 
:param config: configuration parsed from webservice YAML
 
"""
 
log.info("Starting config db handling")
 
conn = init_job_db(config)
 
mdc = init_md_client(config)
 
kafka_prod = init_kafka_producer(config)
 
kafka_topic = config['kafka']['topic']
 
time_interval = int(config['web-service']['job-update-interval'])
 
 
while True:
 
statii = slurm_status()
 
# Check that slurm is giving proper feedback
 
if statii is None:
 
time.sleep(time_interval)
 
continue
 
try:
 
c = conn.cursor()
 
c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ")
 
combined = {}
 
log.debug("SLURM info {}".format(statii))
 
 
for r in c.fetchall():
 
rid, jobid, proposal, run, status, _time, det, action = r
 
log.debug("DB info {}".format(r))
 
 
cflg, cstatus, *_ = combined.setdefault((rid, action), (
 
[], [], proposal, run, det
 
))
 
if jobid in statii:
 
slstatus, runtime = statii[jobid]
 
query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
 
c.execute(query, (slstatus, runtime, jobid))
 
 
cflg.append('R')
 
cstatus.append("{}-{}".format(slstatus, runtime))
 
else:
 
_, sltime, slstatus = slurm_job_status(jobid)
 
query = "UPDATE jobs SET status=? WHERE jobid LIKE ?"
 
c.execute(query, (slstatus, jobid))
 
 
if slstatus == 'COMPLETED':
 
cflg.append("A")
 
else:
 
cflg.append("NA")
 
cstatus.append(slstatus)
 
conn.commit()
 
 
flg_order = {"R": 2, "A": 1, "NA": 0}
 
dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'}
 
for rid, action in combined:
 
if int(rid) == 0: # this job was not submitted from MyMDC
 
continue
 
flgs, statii, proposal, run, det = combined[rid, action]
 
# sort by least done status
 
flg = max(flgs, key=lambda i: flg_order[i])
 
if flg != 'R':
 
log.info(
 
"Jobs finished - action: %s, run id: %s, status: %s",
 
action, rid, flg,
 
)
 
if action == 'CORRECT':
 
try:
 
kafka_prod.send(kafka_topic, {
 
'event': 'correction_complete',
 
'proposal': proposal,
 
'run': run,
 
'detector': det,
 
'success': (flg == 'A'), # A for Available
 
})
 
except KafkaError:
 
log.warning("Error sending Kafka notification",
 
exc_info=True)
 
 
if all(s.startswith('PD-') for s in statii):
 
# Avoid swamping myMdC with updates for jobs still pending.
 
log.debug(
 
"No update for action %s, rid %s: jobs pending",
 
action, rid
 
)
 
continue
 
 
msg = "\n".join(statii)
 
msg_debug = f"Update MDC {rid}, {msg}"
 
log.debug(msg_debug.replace('\n', ', '))
 
 
if action == 'CORRECT':
 
data = {'flg_cal_data_status': flg,
 
'cal_pipeline_reply': msg}
 
if flg != 'R':
 
data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat()
 
response = mdc.update_run_api(rid, data)
 
 
else: # action == 'DARK' but it's dark_request
 
data = {'dark_run': {'flg_status': dark_flags[flg],
 
'calcat_feedback': msg}}
 
response = mdc.update_dark_run_api(rid, data)
 
 
if response.status_code != 200:
 
log.error("Failed to update MDC for action %s, rid %s",
 
action, rid)
 
log.error(Errors.MDC_RESPONSE.format(response))
 
except Exception:
 
log.error("Failure to update job DB", exc_info=True)
 
 
time.sleep(time_interval)
 
 
 
 
def main(argv=None):
 
if argv is None:
 
argv = sys.argv[1:]
 
 
# Ensure files are opened as UTF-8 by default, regardless of environment.
 
locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))
 
 
parser = argparse.ArgumentParser(
 
description='Start the calibration webservice'
 
)
 
parser.add_argument('--config-file', type=str, default=None)
 
parser.add_argument('--log-file', type=str, default='./monitor.log')
 
parser.add_argument(
 
'--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] # noqa
 
)
 
args = parser.parse_args(argv)
 
 
if args.config_file is not None:
 
config.configure(includes_for_dynaconf=[Path(args.config_file).absolute()])
 
 
fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' # noqa
 
logging.basicConfig(
 
filename=args.log_file,
 
level=getattr(logging, args.log_level),
 
format=fmt
 
)
 
update_job_db(config)
 
 
 
if __name__ == "__main__":
 
main()
Loading