diff --git a/webservice/config/webservice.yaml b/webservice/config/webservice.yaml index ce6228b6eb8acafac98cfab1d22ca89b719f8990..0e23f1ca8bb889d82b84e5b097d5a2efcb419bb1 100644 --- a/webservice/config/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -23,11 +23,14 @@ metadata-client: base-api-url: "https://in.xfel.eu/metadata/api/" kafka: - brokers: - - it-kafka-broker01.desy.de - - it-kafka-broker02.desy.de - - it-kafka-broker03.desy.de - topic: xfel-test-offline-cal + producer-config: + bootstrap_servers: + - "exflwgs06.desy.de:9092" + security_protocol: "SASL_PLAINTEXT" + sasl_mechanism: "PLAIN" + sasl_plain_username: "cal" + # sasl_plain_password: # Configure per-user + topic: cal.offline-corrections correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index d30ed8230ea62c527db01351f4d5c50a14834ddf..f54de2dcb8d1a7abf32b23cc2436fbef2d508aa7 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -43,15 +43,19 @@ class NoOpProducer: def init_kafka_producer(config): try: - return KafkaProducer( - bootstrap_servers=config['kafka']['brokers'], + kp = KafkaProducer( value_serializer=lambda d: json.dumps(d).encode('utf-8'), max_block_ms=2000, # Don't get stuck trying to send Kafka messages + **config['kafka']['producer-config'].to_dict() ) except KafkaError: log.warning("Problem initialising Kafka producer; " "Kafka notifications will not be sent.", exc_info=True) return NoOpProducer() + else: + log.info("Connected to Kafka broker (%s) to send notifications", + kp.config['bootstrap_servers']) + return kp def slurm_status(filter_user=True):