diff --git a/setup.py b/setup.py index e12fac28af282b16625e6ad92c6bb7ddc1968697..2e1236355e348595f1d11d4b53ef3909ea6db652 100644 --- a/setup.py +++ b/setup.py @@ -100,6 +100,7 @@ setup( "jupyter-core==4.6.1", "jupyter_client==6.1.7", "jupyter_console==6.1.0", + "kafka-python==2.0.2", "karabo_data==0.7.0", "lxml==4.5.0", "metadata_client==3.0.8", diff --git a/webservice/listen_kafka.py b/webservice/listen_kafka.py new file mode 100644 index 0000000000000000000000000000000000000000..b0dcbb42a72841d96f6e6135bdaca11f45a8a929 --- /dev/null +++ b/webservice/listen_kafka.py @@ -0,0 +1,28 @@ +"""Print Kafka events sent by the webservice. +""" +import json +import os.path as osp + +import yaml +from kafka import KafkaConsumer + +conf_file = osp.join(osp.dirname(__file__), 'webservice.yaml') + +with open(conf_file, "r") as f: + config = yaml.safe_load(f) + +topic = config['kafka']['topic'] +brokers = config['kafka']['brokers'] +print(f"topic: {topic!r}") +print(f"brokers: {brokers!r}") + +kc = KafkaConsumer(topic, bootstrap_servers=brokers) + +print("Waiting for messages (Ctrl-C to quit)...") +try: + for record in kc: + msg = json.loads(record.value.decode()) + print(msg) +except KeyboardInterrupt: + print("Exiting") + kc.close() diff --git a/webservice/update_mdc.py b/webservice/update_mdc.py index 52750549997d7b45e938678fa45e68088bfff8be..38231ee4c9074fab44228b01c421bf75e2358a30 100644 --- a/webservice/update_mdc.py +++ b/webservice/update_mdc.py @@ -1,4 +1,5 @@ import argparse +import os.path as osp import yaml from metadata_client.metadata_client import MetadataClient @@ -6,8 +7,8 @@ from metadata_client.metadata_client import MetadataClient parser = argparse.ArgumentParser( description='Update run status at MDC for a given run id.') parser.add_argument('--conf-file', type=str, help='Path to webservice config', - default='/home/xcal/calibration_webservice_deployed/webservice/webservice.yaml') # noqa -parser.add_argument('--flg', type=str, choices=["NA", "R", "A"], + default=osp.join(osp.dirname(__file__), 'webservice.yaml')) +parser.add_argument('--flg', type=str, choices=["NA", "R", "A"], required=True, help='Status flag for MDC request: NA - not available, R - running, A - available.') # noqa parser.add_argument('--rid', type=int, help='Run id from MDC') parser.add_argument('--msg', type=str, help='Message string to MDC', @@ -32,6 +33,7 @@ client_conn = MetadataClient(client_id=mdconf['user-id'], scope=mdconf['scope'], base_api_url=mdconf['base-api-url']) +print(f"Updating run {rid} to status {flg} at {mdconf['base-api-url']}") response = client_conn.update_run_api(rid, {'flg_cal_data_status': flg, 'cal_pipeline_reply': msg}) diff --git a/webservice/webservice.py b/webservice/webservice.py index e07b8a42890d1f884682bb61dae5865bea51e793..0e9dab69004e30121ee8304acd29567a71444ba3 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -22,6 +22,8 @@ import yaml import zmq import zmq.asyncio import zmq.auth.thread +from kafka import KafkaProducer +from kafka.errors import KafkaError from git import InvalidGitRepositoryError, Repo from metadata_client.metadata_client import MetadataClient @@ -88,6 +90,25 @@ def init_config_repo(config): logging.info("Config repo is initialized") +def init_kafka_producer(config): + try: + return KafkaProducer( + bootstrap_servers=config['kafka']['brokers'], + value_serializer=lambda d: json.dumps(d).encode('utf-8'), + max_block_ms=2000, # Don't get stuck trying to send Kafka messages + ) + except KafkaError: + logging.warning("Problem initialising Kafka producer; " + "Kafka notifications will not be sent.", exc_info=True) + return NoOpProducer() + + +class NoOpProducer: + """Fills in for Kafka producer object when setting that up fails""" + def send(self, topic, value): + pass + + async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes: """ Upload a new configuration YAML @@ -324,7 +345,10 @@ def update_job_db(config): logging.info("Starting config db handling") conn = init_job_db(config) mdc = init_md_client(config) + kafka_prod = init_kafka_producer(config) + kafka_topic = config['kafka']['topic'] time_interval = int(config['web-service']['job-update-interval']) + while True: statii = slurm_status() # Check that slurm is giving proper feedback @@ -338,10 +362,12 @@ def update_job_db(config): logging.debug("SLURM info {}".format(statii)) for r in c.fetchall(): - rid, jobid, _proposal, _run, status, _time, _det, action = r + rid, jobid, proposal, run, status, _time, det, action = r logging.debug("DB info {}".format(r)) - cflg, cstatus = combined.get((rid, action), ([], [])) + cflg, cstatus, *_ = combined.setdefault((rid, action), ( + [], [], proposal, run, det + )) if jobid in statii: slstatus, runtime = statii[jobid] query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" @@ -359,7 +385,6 @@ def update_job_db(config): else: cflg.append("NA") cstatus.append(slstatus) - combined[rid, action] = cflg, cstatus conn.commit() flg_order = {"R": 2, "A": 1, "NA": 0} @@ -367,7 +392,7 @@ def update_job_db(config): for rid, action in combined: if int(rid) == 0: # this job was not submitted from MyMDC continue - flgs, statii = combined[rid, action] + flgs, statii, proposal, run, det = combined[rid, action] # sort by least done status flg = max(flgs, key=lambda i: flg_order[i]) if flg != 'R': @@ -375,6 +400,18 @@ def update_job_db(config): "Jobs finished - action: %s, run id: %s, status: %s", action, rid, flg, ) + if action == 'CORRECT': + try: + kafka_prod.send(kafka_topic, { + 'event': 'correction_complete', + 'proposal': proposal, + 'run': run, + 'detector': det, + 'success': (flg == 'A'), # A for Available + }) + except KafkaError: + logging.warning("Error sending Kafka notification", + exc_info=True) msg = "\n".join(statii) msg_debug = f"Update MDC {rid}, {msg}" logging.debug(msg_debug.replace('\n', ', ')) diff --git a/webservice/webservice.yaml b/webservice/webservice.yaml index bce222c10206cd20db850fd38aefa5f32a8cb830..377315d99b6d7361850480913979a67190eadda7 100644 --- a/webservice/webservice.yaml +++ b/webservice/webservice.yaml @@ -22,6 +22,13 @@ metadata-client: scope: '' base-api-url: 'https://in.xfel.eu/metadata/api/' +kafka: + brokers: + - it-kafka-broker01.desy.de + - it-kafka-broker02.desy.de + - it-kafka-broker03.desy.de + topic: xfel-test-offline-cal + correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run}