diff --git a/src/calng/ShmemTrainMatcher.py b/src/calng/ShmemTrainMatcher.py index 780be449ac2d0dcfb7f0b9b3067f072036f0f83c..4a08f9a4497fd2bcee8b1793bbeb6c0082c364dd 100644 --- a/src/calng/ShmemTrainMatcher.py +++ b/src/calng/ShmemTrainMatcher.py @@ -9,6 +9,7 @@ from karabo.bound import ( KARABO_CLASSINFO, STRING_ELEMENT, TABLE_ELEMENT, + ChannelMetaData, Hash, Schema, State, @@ -110,6 +111,15 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher): .defaultValue(False) .reconfigurable() .commit(), + + BOOL_ELEMENT(expected) + .key("enableKaraboOutput") + .displayedName("Enable Karabo channel") + .allowedStates(State.PASSIVE) + .assignmentOptional() + .defaultValue(True) + .reconfigurable() + .commit(), ) def initialization(self): @@ -125,6 +135,10 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher): else: self._thread_pool = None + if not self.get("enableKaraboOutput"): + # it is set already by super by default, so only need to turn off + self.output = None + def preReconfigure(self, conf): super().preReconfigure(conf) if conf.has("merge") or conf.has("sources"): @@ -135,6 +149,11 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher): self._thread_pool = None if conf["useThreadPool"]: self._thread_pool = concurrent.futures.ThreadPoolExecutor() + if conf.has("enableKaraboOutput"): + if conf["enableKaraboOutput"]: + self.output = self._ss.getOutputChannel("output") + else: + self.output = None def _prepare_merge_groups(self, merge): source_group_patterns = [] @@ -288,7 +307,19 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher): for source, (data, timestamp) in sources.items() ] ) - sources.update(new_sources_map) - super().on_matched_data(train_id, sources) + # karabo output + if self.output is not None: + for source, (data, timestamp) in sources.items(): + self.output.write(data, ChannelMetaData(source, timestamp)) + self.output.update() + + # karabo bridge output + for source, (data, timestamp) in sources.items(): + self.zmq_output.write(source, data, timestamp) + self.zmq_output.update() + + self.info["sent"] += 1 + self.info["trainId"] = train_id + self.rate_out.update()