From 224cd50e22e0d94cca87322f883462601a7f74ff Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Wed, 17 Mar 2021 11:10:19 +0000 Subject: [PATCH] Kafka needs value serialised to bytes, not str --- webservice/webservice.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/webservice/webservice.py b/webservice/webservice.py index 3aa21df61..07a628446 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -325,7 +325,10 @@ def update_job_db(config): logging.info("Starting config db handling") conn = init_job_db(config) mdc = init_md_client(config) - kafka_prod = KafkaProducer(bootstrap_servers=config['kafka']['brokers']) + kafka_prod = KafkaProducer( + bootstrap_servers=config['kafka']['brokers'], + value_serializer=lambda d: json.dumps(d).encode('utf-8'), + ) kafka_topic = config['kafka']['topic'] time_interval = int(config['web-service']['job-update-interval']) @@ -381,13 +384,13 @@ def update_job_db(config): action, rid, flg, ) if action == 'CORRECT': - kafka_prod.send(kafka_topic, json.dumps({ + kafka_prod.send(kafka_topic, { 'event': 'correction_complete', 'proposal': proposal, 'run': run, 'detector': det, 'success': (flg == 'A'), # A for Available - })) + }) msg = "\n".join(statii) msg_debug = f"Update MDC {rid}, {msg}" logging.debug(msg_debug.replace('\n', ', ')) -- GitLab