diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 284962422027a1c13d0d4ce10dc048884006720d..820c44a1a0ea952067c581cb496ba75ed7bfa4a6 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -991,8 +991,6 @@ class CalibrationManager(DeviceClientBase, Device): config['input.connectedOutputChannels'] = [ f'{matcher_device_id}:output'] - config['shmemHandlePaths'] = ['image.data'] - if not await self._instantiate_device( server, class_ids['bridge'], bridge_device_id, config ): diff --git a/src/calng/DsscCorrection.py b/src/calng/DsscCorrection.py index 5aae298373aa4d6cbbe337e3e16454ea71d3a216..f86153cbf96fcf7ee8691853e14893cc78e8c0de 100644 --- a/src/calng/DsscCorrection.py +++ b/src/calng/DsscCorrection.py @@ -591,6 +591,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): data.set("image.data", buffer_handle) data.set("image.cellId", cell_table[:, np.newaxis]) data.set("image.pulseId", pulse_table[:, np.newaxis]) + data.set("calngShmemPaths", ["image.data"]) self.write_output(data, metadata) if do_generate_preview: self.write_combiner_preview( diff --git a/src/calng/ShmemToZMQ.py b/src/calng/ShmemToZMQ.py index 16556880ba151fc053f66edbe5020a51eb24814d..1fcdaa4f09ea414c77ba37a0c7553db768dfbed6 100644 --- a/src/calng/ShmemToZMQ.py +++ b/src/calng/ShmemToZMQ.py @@ -1,7 +1,7 @@ import threading from time import time -from karabo.bound import KARABO_CLASSINFO, VECTOR_STRING_ELEMENT +from karabo.bound import KARABO_CLASSINFO from PipeToZeroMQ import PipeToZeroMQ, conversion, device_schema from . import shmem_utils @@ -11,26 +11,8 @@ from ._version import version as deviceVersion @KARABO_CLASSINFO("ShmemToZMQ", deviceVersion) class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): - @staticmethod - def expectedParameters(expected): - super(ShmemToZMQ, ShmemToZMQ).expectedParameters(expected) - ( - VECTOR_STRING_ELEMENT(expected) - .key("shmemHandlePaths") - .displayedName("Paths containing shmem handles") - .description( - "For each data package, each of these paths (if present) will be " - "assumed to contain shmem handles. These will be decoded and the path " - "will be replaced with the data referenced by the handle." - ) - .assignmentOptional() - .defaultValue(["image.data"]) - .commit() - ) - def initialization(self): super().initialization() - self._shmem_handle_paths = list(self.get("shmemHandlePaths")) self._source_to_shmem_ary = {} self._source_to_shmem_mem = {} self._buffer_lock = threading.Lock() @@ -91,17 +73,16 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): dic, arr = conversion.hash_to_dict( hash_data, paths=forward, version=self.version ) - for shmem_handle_path in self._shmem_handle_paths: - if shmem_handle_path not in dic: + for shmem_handle_path in dic.pop("calngShmemPaths", []): + shmem_handle = dic.pop(shmem_handle_path, None) + if shmem_handle_path is None: self.log.INFO( f"Hash from {source} did not have {shmem_handle_path}" ) continue - actual_data = self._get_shmem_buffer_data( - source, dic[shmem_handle_path] - ) - del dic[shmem_handle_path] + actual_data = self._get_shmem_buffer_data(source, shmem_handle) arr[shmem_handle_path] = actual_data + data[source] = (dic, arr) # forward data to all connected ZMQ sockets