diff --git a/DEPENDS b/DEPENDS index ecfd72308abf45cf6b79eb94ac17c6e19a6306aa..441a9963324f6302b7bb46086087bf6af596711b 100644 --- a/DEPENDS +++ b/DEPENDS @@ -1,4 +1,4 @@ -TrainMatcher, 2.4.8 +TrainMatcher, 2.4.10 calibrationClient, 11.3.0 -calibration/geometryDevices, 0.0.8 -calibration/calngUtils, 0.0.8 +calibration/geometryDevices, 0.0.9 +calibration/calngUtils, 0.0.9 diff --git a/docs/concepts.md b/docs/concepts.md index f3172e54f8080f959119d55a8f8aaacc3934fcbd..2c169ead64227c3ce09d7185c6236ea71b4b9c09 100644 --- a/docs/concepts.md +++ b/docs/concepts.md @@ -67,12 +67,12 @@ The hash being sent from the `dataOutput` pipeline of a correction contains the {name}${dtype}${shape}${index} ``` -* `name` a unique identifier for the shared memory region, usually containing the correction device ID and pipeline name +* `name` a unique identifier for the shared memory region, usually containing the correction device ID and pipeline name * `dtype` the string representation of the numpy data type, i.e. recoverable via `np.dtype(dtype)` * `shape` a comma-separated list of axis lengths * `index` an integer pointing to the position along the first array axis acting as a ring buffer -e.g. `SQS_NQS_DSSC/CAL/CORRECT01_Q1M2:dataOutput$float16$50,512,128,60$5` +e.g. `SQS_NQS_DSSC/CAL/CORRECT01_Q1M2-dataOutput-image.data$float16$50,512,128,60$5` The shared memory is allocated via the [POSIX shared memory API](https://man7.org/linux/man-pages/man7/shm_overview.7.html) - as of Python 3.8, `multiprocessing.shared_memory` provides classes wrapping the POSIX API. For convenience, [`shmem_utils.py`](https://git.xfel.eu/calibration/calngutils/-/blob/master/src/calngUtils/shmem_utils.py) in `calngUtils` provides some wrappers for sending and receiving using our protocol. diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py index a2945e8a261b95df0b483e8e0682a77292d7239a..74871dfdb9fc32cb84ecc7a219ae68e3e2316133 100644 --- a/src/calng/FrameSelectionArbiter.py +++ b/src/calng/FrameSelectionArbiter.py @@ -571,7 +571,7 @@ class AdvancedFrameSelectionArbiter(BaseFrameSelectionArbiter): ) continue - kernel_class = kernel_choice[selection["kernel"]] + kernel_class = kernel_choice[selection["kernel"]].load() kernel_prefix = f"frameSelection.selections.{selection_name}" ( diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index e62a59cffc5c1b9caeb91d5473c6dd9e8346adb9..aed5cba3b00fb750f87d6a2fe9f27c5bb6a9309b 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -454,6 +454,7 @@ class BaseCorrection(PythonDevice): self._last_train_id_processed = 0 # used to keep track (and as fallback) self._last_cell_table = None # used to check whether to update property self._last_pulse_table = None # ditto - though not all detectors have this + self._last_output_shape = None def constant_override_fun(friend_fun, constant, preserve_fields): def aux(): @@ -514,17 +515,10 @@ class BaseCorrection(PythonDevice): for constant in self._constant_enum_class }, ) - self.log.DEBUG("Opening shmem buffer") - self._shmem_buffer = shmem_utils.ShmemCircularBuffer( - self.get("outputShmemBufferSize") * 2**30, - # TODO: have it just allocate memory, then set_shape later per train - (1,), - np.float32, - self.getInstanceId() + ":dataOutput", - ) - if self._cuda_pin_buffers: - self.log.DEBUG("Trying to pin the shmem buffer memory") - self._shmem_buffer.cuda_pin() + if self.get("useShmemHandles"): + self._open_shmem_buffer() + else: + self._shmem_buffer = None self._shmem_receiver = shmem_utils.ShmemCircularBufferReceiver() # CalCat friend comes before kernel runner @@ -569,8 +563,10 @@ class BaseCorrection(PythonDevice): addon_class = self._available_addons[addon_class_name] addon = addon_class(self, addon_prefix, self._parameters[addon_prefix]) self._enabled_addons.append(addon) + + # note: if useShmemHandles differs from class default, also needs schema change if ( - (self.get("useShmemHandles") != self._use_shmem_handles) + (self.get("useShmemHandles") != self.__class__._use_shmem_handles) or self._enabled_addons ): schema_override = Schema() @@ -604,7 +600,8 @@ class BaseCorrection(PythonDevice): self.updateState(State.ON) def __del__(self): - del self.shmem_buffer + if self._shmem_buffer is not None: + del self._shmem_buffer super().__del__() def preReconfigure(self, config): @@ -887,21 +884,12 @@ class BaseCorrection(PythonDevice): ): warn("Input cell IDs out of range of constants") - # TODO: change ShmemCircularBuffer API so we don't have to poke like this - if ( - (output_shape := self.kernel_runner.expected_output_shape(num_frames)) - != self._shmem_buffer._buffer_ary.shape[1:] - or self.kernel_runner._output_dtype != self._shmem_buffer._buffer_ary.dtype - ): - self.set("dataFormat.outputDataShape", list(output_shape)) - self.log.DEBUG("Updating shmem buffer shape / dtype") - self._shmem_buffer.change_shape( - output_shape, self.kernel_runner._output_dtype - ) - buffer_handle, buffer_array = self._shmem_buffer.next_slot() - with self.warning_context( - "processingState", WarningLampType.CORRECTION_RUNNER - ): + buffer_handle, buffer_array = self._get_output_buffer( + self.kernel_runner.expected_output_shape(num_frames), + self.kernel_runner._output_dtype, + ) + + with self.warning_context("processingState", WarningLampType.CORRECTION_RUNNER): corrections, processed_buffer, previews = self.kernel_runner.correct( image_data, cell_table, *additional_data ) @@ -927,14 +915,18 @@ class BaseCorrection(PythonDevice): pulse_table, data_hash, ) - self.kernel_runner.reshape(processed_buffer, out=buffer_array) + # set in case we pass None (out=None means create new) + buffer_array = self.kernel_runner.reshape( + processed_buffer, out=buffer_array + ) for addon in self._enabled_addons: addon.post_reshape( timestamp.getTrainId(), buffer_array, cell_table, pulse_table, data_hash ) - if self.unsafe_get("useShmemHandles"): + if self._shmem_buffer is not None: + # note: useShmemHandles is not reconfigurable, buffer is there or None data_hash.set(self._image_data_path, buffer_handle) shmem_paths = data_hash.get("calngShmemPaths", default=[]) data_hash.set("calngShmemPaths", shmem_paths + [self._image_data_path]) @@ -954,7 +946,16 @@ class BaseCorrection(PythonDevice): def _please_send_me_cached_constants(self, constants, callback): for constant in constants: if constant in self.calcat_friend.cached_constants: - callback(constant, self.calcat_friend.cached_constants[constant]) + with self.warning_context( + f"foundConstants.{constant.name}.state" + ) as warn: + try: + self.log_status_info(f"Using cached {constant.name}") + callback( + constant, self.calcat_friend.cached_constants[constant] + ) + except Exception as e: + warn(f"Exception in reusing cached {constant.name}: {e}") def _check_train_id_and_time(self, timestamp): train_id = timestamp.getTrainId() @@ -1047,6 +1048,45 @@ class BaseCorrection(PythonDevice): else: self.set("dataFormat.pulseId", list(map(int, pulse_table))) + def _open_shmem_buffer(self): + self.log.DEBUG("Opening shmem buffer") + self._shmem_buffer = shmem_utils.ShmemCircularBuffer( + self.get("outputShmemBufferSize") * 2**30, + # TODO: have it just allocate memory, then set_shape later per train + (1,), + np.float32, + f"{self.getInstanceId()}-dataOutput-{self._image_data_path}", + ) + if self._cuda_pin_buffers: + self.log.DEBUG("Trying to pin the shmem buffer memory") + self._shmem_buffer.cuda_pin() + + def _get_output_buffer(self, shape, dtype): + if self._shmem_buffer is not None: + # TODO: change ShmemCircularBuffer API so we don't have to poke like this + if ( + shape != self._shmem_buffer._buffer_ary.shape[1:] + or self.kernel_runner._output_dtype + != self._shmem_buffer._buffer_ary.dtype + ): + self._last_output_shape = shape + self._buffered_status_update["dataFormat.outputDataShape"] = list( + shape + ) + self.log.DEBUG(f"Updating shmem output buffer {shape} of {dtype}") + self._shmem_buffer.change_shape( + shape, self.kernel_runner._output_dtype + ) + return self._shmem_buffer.next_slot() + else: + if shape != self._last_output_shape: + self._buffered_status_update["dataFormat.outputDataShape"] = list( + shape + ) + self._last_output_shape = shape + # None means maybe skip copy (numpy) or copy to pool memory (cupy) + return "", None + def handle_eos(self, channel): self.updateState(State.ON) self.signalEndOfStream("dataOutput") diff --git a/src/calng/correction_addons/autocorrelation.py b/src/calng/correction_addons/autocorrelation.py index d3fc9e8c0bcfd245d619f81fc86f69ca702fed0e..30bf61507eed2cbc79a873f0869c7610edbbcd00 100644 --- a/src/calng/correction_addons/autocorrelation.py +++ b/src/calng/correction_addons/autocorrelation.py @@ -224,13 +224,12 @@ class Autocorrelation(BaseCorrectionAddon): self._shape = shape if self._shmem_buffer is None: - shmem_buffer_name = self._device.getInstanceId() + ":dataOutput/autocorrelation" memory_budget = self._device.get("outputShmemBufferSize") * 2**30 self._shmem_buffer = shmem_utils.ShmemCircularBuffer( memory_budget, shape, np.float32, - shmem_buffer_name, + f"{self._device.getInstanceId()}-dataOutput-image.autocorr", ) if self._device._cuda_pin_buffers: self._shmem_buffer.cuda_pin()