Skip to content
Snippets Groups Projects
Commit 224cd50e authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Kafka needs value serialised to bytes, not str

parent cabeb428
No related branches found
No related tags found
1 merge request!452Send Kafka notifications when correction is complete
......@@ -325,7 +325,10 @@ def update_job_db(config):
logging.info("Starting config db handling")
conn = init_job_db(config)
mdc = init_md_client(config)
kafka_prod = KafkaProducer(bootstrap_servers=config['kafka']['brokers'])
kafka_prod = KafkaProducer(
bootstrap_servers=config['kafka']['brokers'],
value_serializer=lambda d: json.dumps(d).encode('utf-8'),
)
kafka_topic = config['kafka']['topic']
time_interval = int(config['web-service']['job-update-interval'])
......@@ -381,13 +384,13 @@ def update_job_db(config):
action, rid, flg,
)
if action == 'CORRECT':
kafka_prod.send(kafka_topic, json.dumps({
kafka_prod.send(kafka_topic, {
'event': 'correction_complete',
'proposal': proposal,
'run': run,
'detector': det,
'success': (flg == 'A'), # A for Available
}))
})
msg = "\n".join(statii)
msg_debug = f"Update MDC {rid}, {msg}"
logging.debug(msg_debug.replace('\n', ', '))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment