diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 9056987c4701be6d56ef9e8266f5a8aa1b66e880..09b6b74ce08e8825e8c3df3972ed0cb0222faea2 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -946,7 +946,6 @@ class CalibrationManager(DeviceClientBase, Device): Hash('fsSelect', True, 'fsSource', input_source_by_module[vname]) for vname in modules_by_group[group]] - config['pathToStack'] = 'image.data' if self.restoreMatcherSources: try: @@ -977,6 +976,8 @@ class CalibrationManager(DeviceClientBase, Device): config['input.connectedOutputChannels'] = [ f'{matcher_device_id}:output'] + config['shmemHandlePaths'] = ['image.data'] + if not await self._instantiate_device( server, class_ids['bridge'], bridge_device_id, config ): diff --git a/src/calng/ShmemToZMQ.py b/src/calng/ShmemToZMQ.py index c69fdd28c734600a9370f4780505462d4133857e..8740a4f840921e7ab47006b9f896a8468dbb5590 100644 --- a/src/calng/ShmemToZMQ.py +++ b/src/calng/ShmemToZMQ.py @@ -1,8 +1,9 @@ import threading from time import time -from karabo.bound import KARABO_CLASSINFO -from PipeToZeroMQ import PipeToZeroMQ, conversion +import shmem_utils +from karabo.bound import KARABO_CLASSINFO, VECTOR_STRING_ELEMENT +from PipeToZeroMQ import PipeToZeroMQ, conversion, device_schema import shmem_utils @@ -14,107 +15,99 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): @staticmethod def expectedParameters(expected): super(ShmemToZMQ, ShmemToZMQ).expectedParameters(expected) + ( + VECTOR_STRING_ELEMENT(expected) + .key("shmemHandlePaths") + .displayedName("Paths containing shmem handles") + .description( + "For each data package, each of these paths (if present) will be " + "assumed to contain shmem handles. These will be decoded and the path " + "will be replaced with the data referenced by the handle." + ) + .assignmentOptional() + .defaultValue(["image.data"]) + .commit() + ) def initialization(self): - self.KARABO_SLOT(self.activate) - self.KARABO_SLOT(self.deactivate) - self.KARABO_SLOT(self.reset) - self.KARABO_SLOT(self.requestScene) - # ON_DATA is a bit more convenient I think - self.KARABO_ON_DATA("input", self.on_data) - self._source_to_shmem_name = {} + super().initialization() + self._shmem_handle_paths = list(self.get("shmemHandlePaths")) self._source_to_shmem_ary = {} self._source_to_shmem_mem = {} self._buffer_lock = threading.Lock() - def on_data(self, data, metadata): - source = metadata["source"] - if source not in self.sources: - self.log.INFO(f"New direct source encountered: {source}") - schema = self.getFullSchema() - self.appendSchema(PipeToZeroMQ.timestamp_schema(schema, source)) - self.sources.add(source) - + def _get_shmem_buffer_data(self, source, shmem_handle): + # TODO: handle failure if this was not a shmem handle + name, dtype, shape, index = shmem_utils.parse_shmem_handle(shmem_handle) + with self._buffer_lock: + # may have to open shared memory buffer + if source not in self._source_to_shmem_ary: + self.log.INFO(f"Opening buffer {name} for source {source}") + try: + mem, ary = shmem_utils.open_shmem_from_handle(shmem_handle) + except OSError: + self.log.WARN(f"Failed to open buffer {name}") + return None + self._source_to_shmem_mem[source] = mem + self._source_to_shmem_ary[source] = ary + elif self._source_to_shmem_ary[source].shape != shape: + self.log.INFO(f"Updating buffer shape for {source} to {shape}") + self._source_to_shmem_ary[source] = self._source_to_shmem_mem[ + source + ].ndarray(shape=shape, dtype=dtype) + + # grab data from shared memory buffer + return self._source_to_shmem_ary[source][index] + + def onInput(self, input_channel): actual = self.getActualTimestamp() input_tic = time() self.info["inputUpdated"] += 1 - self._time_info(metadata, actual, self.info) + self.info["dataRecv"] = input_channel.size() + all_meta = input_channel.getMetaData() - new_meta = {} - new_data = {} + data = {} + meta = {} - # stacked data from ModuleStacker - for source, handle_string in zip(data.get("sources"), data.get("image.data")): + for idx in range(input_channel.size()): + # Read metadata + metadata = self._extract_metadata(all_meta, idx) + source = metadata["source"] if source not in self.sources: - self.log.INFO(f"New original source encountered: {source}") + schema = self.getFullSchema() + self.appendSchema(device_schema.timestamp_schema(schema, source)) self.sources.add(source) + self._time_info(metadata, actual, self.info) + + # Read data + hash_data = input_channel.read(idx) + # filters if self.allowed_sources and source not in self.allowed_sources: continue + forward, ignore = self._filter_properties(hash_data, source) - forward, ignore = self._filter_properties(data, source) - - new_meta[source] = conversion.meta_to_dict(metadata) - new_meta[source]["ignored_keys"] = ignore - new_meta[source]["source"] = source - - ( - shmem_name, - shmem_dtype, - shmem_shape, - shmem_index, - ) = shmem_utils.parse_shmem_handle(handle_string) - with self._buffer_lock: - # may have to (re)open shared memory buffer - if source not in self._source_to_shmem_ary: - self.log.INFO( - f"Opening shmem buffer for {source} from handle: " - f"{handle_string}" - ) - try: - mem, ary = shmem_utils.open_shmem_from_handle(handle_string) - except OSError: - self.log.WARN(f"Failed to open buffer {shmem_name}") - continue - - self._source_to_shmem_mem[source] = mem - self._source_to_shmem_ary[source] = ary - self._source_to_shmem_name[source] = shmem_name - elif self._source_to_shmem_ary[source].shape != shmem_shape: - self.log.INFO( - f"Updating buffer shape for {source} to {shmem_shape}" - ) - self._source_to_shmem_ary[source] = self._source_to_shmem_mem[ - source - ].ndarray(shape=shmem_shape, dtype=shmem_dtype) - - # grab data from shared memory buffer - buffer_data = self._source_to_shmem_ary[source][shmem_index] - - # conversion separates array stuf into separate dict, "arr" + meta[source] = conversion.meta_to_dict(metadata) + meta[source]["ignored_keys"] = ignore + # only this bit differs from PipeToZeroMQ.onInput dic, arr = conversion.hash_to_dict( - data, paths=forward, version=self.version + hash_data, paths=forward, version=self.version ) - - # and image.data was list of strings (in dic), to be replaced - del dic["image.data"] - arr["image.data"] = buffer_data - new_data[source] = (dic, arr) - # note: assumes version "2.2" of the PipeToZeroMQ packing format - - # additional unstacked sources - if data.has("unstacked"): - for source in data.get("unstacked").getKeys(): - forward, ignore = self._filter_properties(data, source) - new_data[source] = conversion.hash_to_dict( - data.get(f"unstacked.{source}"), paths=forward, version=self.version + for shmem_handle_path in self._shmem_handle_paths: + if shmem_handle_path not in dic: + self.log.INFO( + f"Hash from {source} did not have {shmem_handle_path}" + ) + continue + actual_data = self._get_shmem_buffer_data( + source, dic[shmem_handle_path] ) - new_meta[source] = conversion.meta_to_dict(metadata) - new_meta[source]["source"] = source - data.erase("unstacked") + del dic[shmem_handle_path] + arr[shmem_handle_path] = actual_data + data[source] = (dic, arr) # forward data to all connected ZMQ sockets - self._send(new_data, new_meta) + self._send(data, meta) output_tic = time() self.info["onInputTotal"] = 1000 * (output_tic - input_tic)