diff --git a/setup.py b/setup.py index e12fac28af282b16625e6ad92c6bb7ddc1968697..2e1236355e348595f1d11d4b53ef3909ea6db652 100644 --- a/setup.py +++ b/setup.py @@ -100,6 +100,7 @@ setup( "jupyter-core==4.6.1", "jupyter_client==6.1.7", "jupyter_console==6.1.0", + "kafka-python==2.0.2", "karabo_data==0.7.0", "lxml==4.5.0", "metadata_client==3.0.8", diff --git a/webservice/webservice.py b/webservice/webservice.py index 43b0ecc6e6145361dca6d1d767d8135de1a8ec6d..3aa21df611baad79ec4f0091c1e26da0a85f742a 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -22,6 +22,7 @@ import yaml import zmq import zmq.asyncio import zmq.auth.thread +from kafka import KafkaProducer from git import InvalidGitRepositoryError, Repo from metadata_client.metadata_client import MetadataClient @@ -324,7 +325,10 @@ def update_job_db(config): logging.info("Starting config db handling") conn = init_job_db(config) mdc = init_md_client(config) + kafka_prod = KafkaProducer(bootstrap_servers=config['kafka']['brokers']) + kafka_topic = config['kafka']['topic'] time_interval = int(config['web-service']['job-update-interval']) + while True: statii = slurm_status() # Check that slurm is giving proper feedback @@ -338,10 +342,12 @@ def update_job_db(config): logging.debug("SLURM info {}".format(statii)) for r in c.fetchall(): - rid, jobid, _proposal, _run, status, _time, _det, action = r + rid, jobid, proposal, run, status, _time, det, action = r logging.debug("DB info {}".format(r)) - cflg, cstatus = combined.get((rid, action), ([], [])) + cflg, cstatus, *_ = combined.setdefault((rid, action), ( + [], [], proposal, run, det + )) if jobid in statii: slstatus, runtime = statii[jobid] query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" @@ -359,7 +365,6 @@ def update_job_db(config): else: cflg.append("NA") cstatus.append(slstatus) - combined[rid, action] = cflg, cstatus conn.commit() flg_order = {"R": 2, "A": 1, "NA": 0} @@ -367,7 +372,7 @@ def update_job_db(config): for rid, action in combined: if int(rid) == 0: # this job was not submitted from MyMDC continue - flgs, statii = combined[rid, action] + flgs, statii, proposal, run, det = combined[rid, action] # sort by least done status flg = max(flgs, key=lambda i: flg_order[i]) if flg != 'R': @@ -375,6 +380,14 @@ def update_job_db(config): "Jobs finished - action: %s, run id: %s, status: %s", action, rid, flg, ) + if action == 'CORRECT': + kafka_prod.send(kafka_topic, json.dumps({ + 'event': 'correction_complete', + 'proposal': proposal, + 'run': run, + 'detector': det, + 'success': (flg == 'A'), # A for Available + })) msg = "\n".join(statii) msg_debug = f"Update MDC {rid}, {msg}" logging.debug(msg_debug.replace('\n', ', ')) diff --git a/webservice/webservice.yaml b/webservice/webservice.yaml index bce222c10206cd20db850fd38aefa5f32a8cb830..377315d99b6d7361850480913979a67190eadda7 100644 --- a/webservice/webservice.yaml +++ b/webservice/webservice.yaml @@ -22,6 +22,13 @@ metadata-client: scope: '' base-api-url: 'https://in.xfel.eu/metadata/api/' +kafka: + brokers: + - it-kafka-broker01.desy.de + - it-kafka-broker02.desy.de + - it-kafka-broker03.desy.de + topic: xfel-test-offline-cal + correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run}