From 44e7fb2ffd50472871ce22e7e9b67d290bfd6931 Mon Sep 17 00:00:00 2001 From: David Hammer <dhammer@mailbox.org> Date: Tue, 7 Sep 2021 16:09:56 +0200 Subject: [PATCH] Send shmem handle paths as part of data --- src/calng/CalibrationManager.py | 2 -- src/calng/DsscCorrection.py | 1 + src/calng/ShmemToZMQ.py | 31 ++++++------------------------- 3 files changed, 7 insertions(+), 27 deletions(-) diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 28496242..820c44a1 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -991,8 +991,6 @@ class CalibrationManager(DeviceClientBase, Device): config['input.connectedOutputChannels'] = [ f'{matcher_device_id}:output'] - config['shmemHandlePaths'] = ['image.data'] - if not await self._instantiate_device( server, class_ids['bridge'], bridge_device_id, config ): diff --git a/src/calng/DsscCorrection.py b/src/calng/DsscCorrection.py index 5aae2983..f86153cb 100644 --- a/src/calng/DsscCorrection.py +++ b/src/calng/DsscCorrection.py @@ -591,6 +591,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): data.set("image.data", buffer_handle) data.set("image.cellId", cell_table[:, np.newaxis]) data.set("image.pulseId", pulse_table[:, np.newaxis]) + data.set("calngShmemPaths", ["image.data"]) self.write_output(data, metadata) if do_generate_preview: self.write_combiner_preview( diff --git a/src/calng/ShmemToZMQ.py b/src/calng/ShmemToZMQ.py index 16556880..1fcdaa4f 100644 --- a/src/calng/ShmemToZMQ.py +++ b/src/calng/ShmemToZMQ.py @@ -1,7 +1,7 @@ import threading from time import time -from karabo.bound import KARABO_CLASSINFO, VECTOR_STRING_ELEMENT +from karabo.bound import KARABO_CLASSINFO from PipeToZeroMQ import PipeToZeroMQ, conversion, device_schema from . import shmem_utils @@ -11,26 +11,8 @@ from ._version import version as deviceVersion @KARABO_CLASSINFO("ShmemToZMQ", deviceVersion) class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): - @staticmethod - def expectedParameters(expected): - super(ShmemToZMQ, ShmemToZMQ).expectedParameters(expected) - ( - VECTOR_STRING_ELEMENT(expected) - .key("shmemHandlePaths") - .displayedName("Paths containing shmem handles") - .description( - "For each data package, each of these paths (if present) will be " - "assumed to contain shmem handles. These will be decoded and the path " - "will be replaced with the data referenced by the handle." - ) - .assignmentOptional() - .defaultValue(["image.data"]) - .commit() - ) - def initialization(self): super().initialization() - self._shmem_handle_paths = list(self.get("shmemHandlePaths")) self._source_to_shmem_ary = {} self._source_to_shmem_mem = {} self._buffer_lock = threading.Lock() @@ -91,17 +73,16 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): dic, arr = conversion.hash_to_dict( hash_data, paths=forward, version=self.version ) - for shmem_handle_path in self._shmem_handle_paths: - if shmem_handle_path not in dic: + for shmem_handle_path in dic.pop("calngShmemPaths", []): + shmem_handle = dic.pop(shmem_handle_path, None) + if shmem_handle_path is None: self.log.INFO( f"Hash from {source} did not have {shmem_handle_path}" ) continue - actual_data = self._get_shmem_buffer_data( - source, dic[shmem_handle_path] - ) - del dic[shmem_handle_path] + actual_data = self._get_shmem_buffer_data(source, shmem_handle) arr[shmem_handle_path] = actual_data + data[source] = (dic, arr) # forward data to all connected ZMQ sockets -- GitLab