Skip to content
Snippets Groups Projects
Commit 7f8bf92e authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'send-kafka' into 'master'

Send Kafka notifications when correction is complete

See merge request detectors/pycalibration!452
parents df685e3f 87b15daa
No related branches found
No related tags found
1 merge request!452Send Kafka notifications when correction is complete
......@@ -100,6 +100,7 @@ setup(
"jupyter-core==4.6.1",
"jupyter_client==6.1.7",
"jupyter_console==6.1.0",
"kafka-python==2.0.2",
"karabo_data==0.7.0",
"lxml==4.5.0",
"metadata_client==3.0.8",
......
"""Print Kafka events sent by the webservice.
"""
import json
import os.path as osp
import yaml
from kafka import KafkaConsumer
conf_file = osp.join(osp.dirname(__file__), 'webservice.yaml')
with open(conf_file, "r") as f:
config = yaml.safe_load(f)
topic = config['kafka']['topic']
brokers = config['kafka']['brokers']
print(f"topic: {topic!r}")
print(f"brokers: {brokers!r}")
kc = KafkaConsumer(topic, bootstrap_servers=brokers)
print("Waiting for messages (Ctrl-C to quit)...")
try:
for record in kc:
msg = json.loads(record.value.decode())
print(msg)
except KeyboardInterrupt:
print("Exiting")
kc.close()
import argparse
import os.path as osp
import yaml
from metadata_client.metadata_client import MetadataClient
......@@ -6,8 +7,8 @@ from metadata_client.metadata_client import MetadataClient
parser = argparse.ArgumentParser(
description='Update run status at MDC for a given run id.')
parser.add_argument('--conf-file', type=str, help='Path to webservice config',
default='/home/xcal/calibration_webservice_deployed/webservice/webservice.yaml') # noqa
parser.add_argument('--flg', type=str, choices=["NA", "R", "A"],
default=osp.join(osp.dirname(__file__), 'webservice.yaml'))
parser.add_argument('--flg', type=str, choices=["NA", "R", "A"], required=True,
help='Status flag for MDC request: NA - not available, R - running, A - available.') # noqa
parser.add_argument('--rid', type=int, help='Run id from MDC')
parser.add_argument('--msg', type=str, help='Message string to MDC',
......@@ -32,6 +33,7 @@ client_conn = MetadataClient(client_id=mdconf['user-id'],
scope=mdconf['scope'],
base_api_url=mdconf['base-api-url'])
print(f"Updating run {rid} to status {flg} at {mdconf['base-api-url']}")
response = client_conn.update_run_api(rid, {'flg_cal_data_status': flg,
'cal_pipeline_reply': msg})
......
......@@ -22,6 +22,8 @@ import yaml
import zmq
import zmq.asyncio
import zmq.auth.thread
from kafka import KafkaProducer
from kafka.errors import KafkaError
from git import InvalidGitRepositoryError, Repo
from metadata_client.metadata_client import MetadataClient
......@@ -88,6 +90,25 @@ def init_config_repo(config):
logging.info("Config repo is initialized")
def init_kafka_producer(config):
try:
return KafkaProducer(
bootstrap_servers=config['kafka']['brokers'],
value_serializer=lambda d: json.dumps(d).encode('utf-8'),
max_block_ms=2000, # Don't get stuck trying to send Kafka messages
)
except KafkaError:
logging.warning("Problem initialising Kafka producer; "
"Kafka notifications will not be sent.", exc_info=True)
return NoOpProducer()
class NoOpProducer:
"""Fills in for Kafka producer object when setting that up fails"""
def send(self, topic, value):
pass
async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes:
""" Upload a new configuration YAML
......@@ -324,7 +345,10 @@ def update_job_db(config):
logging.info("Starting config db handling")
conn = init_job_db(config)
mdc = init_md_client(config)
kafka_prod = init_kafka_producer(config)
kafka_topic = config['kafka']['topic']
time_interval = int(config['web-service']['job-update-interval'])
while True:
statii = slurm_status()
# Check that slurm is giving proper feedback
......@@ -338,10 +362,12 @@ def update_job_db(config):
logging.debug("SLURM info {}".format(statii))
for r in c.fetchall():
rid, jobid, _proposal, _run, status, _time, _det, action = r
rid, jobid, proposal, run, status, _time, det, action = r
logging.debug("DB info {}".format(r))
cflg, cstatus = combined.get((rid, action), ([], []))
cflg, cstatus, *_ = combined.setdefault((rid, action), (
[], [], proposal, run, det
))
if jobid in statii:
slstatus, runtime = statii[jobid]
query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?"
......@@ -359,7 +385,6 @@ def update_job_db(config):
else:
cflg.append("NA")
cstatus.append(slstatus)
combined[rid, action] = cflg, cstatus
conn.commit()
flg_order = {"R": 2, "A": 1, "NA": 0}
......@@ -367,7 +392,7 @@ def update_job_db(config):
for rid, action in combined:
if int(rid) == 0: # this job was not submitted from MyMDC
continue
flgs, statii = combined[rid, action]
flgs, statii, proposal, run, det = combined[rid, action]
# sort by least done status
flg = max(flgs, key=lambda i: flg_order[i])
if flg != 'R':
......@@ -375,6 +400,18 @@ def update_job_db(config):
"Jobs finished - action: %s, run id: %s, status: %s",
action, rid, flg,
)
if action == 'CORRECT':
try:
kafka_prod.send(kafka_topic, {
'event': 'correction_complete',
'proposal': proposal,
'run': run,
'detector': det,
'success': (flg == 'A'), # A for Available
})
except KafkaError:
logging.warning("Error sending Kafka notification",
exc_info=True)
msg = "\n".join(statii)
msg_debug = f"Update MDC {rid}, {msg}"
logging.debug(msg_debug.replace('\n', ', '))
......
......@@ -22,6 +22,13 @@ metadata-client:
scope: ''
base-api-url: 'https://in.xfel.eu/metadata/api/'
kafka:
brokers:
- it-kafka-broker01.desy.de
- it-kafka-broker02.desy.de
- it-kafka-broker03.desy.de
topic: xfel-test-offline-cal
correct:
in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment