diff --git a/src/calng/DsscCorrection.py b/src/calng/DsscCorrection.py index c879da96ea2f0d0705163b2488f4f1336645263d..0e79d4caaadd7db83c6c11b43a7300a04b1ebbf4 100644 --- a/src/calng/DsscCorrection.py +++ b/src/calng/DsscCorrection.py @@ -380,6 +380,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.input_data_dtype = getattr(np, config.get("dataFormat.inputImageDtype")) self.output_data_dtype = getattr(np, config.get("dataFormat.outputImageDtype")) + self._shmem_buffer_index = 0 self._offset_map = None self._update_pulse_filter(config.get("pulseFilter")) self._update_shapes( @@ -429,6 +430,7 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): def __del__(self): self.gpu_context.detach() + del self.pipeline def preReconfigure(self, config): if config.has("pulseFilter"): @@ -793,7 +795,13 @@ class DsscCorrection(calibrationBase.CalibrationReceiverBaseDevice): self.set("dataFormat.inputDataShape", list(input_data_shape)) self.set("dataFormat.outputDataShape", list(output_data_shape)) - shmem_buffer_name = self.getInstanceId().replace("/", "_") + ":dataOutput" + self._shmem_buffer_index += 1 + + shmem_buffer_name = ( + self.getInstanceId().replace("/", "_") + + f":dataOutput:{self._shmem_buffer_index}" + ) + self.log.INFO(f"Opening new shmem buffer: {shmem_buffer_name}") with gpu_utils.GPUContextContext(self.gpu_context): self.pipeline = PyCudaPipeline( diff --git a/src/calng/ShmemToZMQ.py b/src/calng/ShmemToZMQ.py index 143e8435ea49d1d9a50b8eb747955ddab27fd55c..b65ceaee7157fc66193bb681435786273c0d3773 100644 --- a/src/calng/ShmemToZMQ.py +++ b/src/calng/ShmemToZMQ.py @@ -1,4 +1,5 @@ from time import time +import threading import shmem_utils from karabo.bound import KARABO_CLASSINFO @@ -14,16 +15,16 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): super(ShmemToZMQ, ShmemToZMQ).expectedParameters(expected) def initialization(self): - """Just overriding to avoid KARABO_ON_INPUT""" - self.KARABO_SLOT(self.activate) self.KARABO_SLOT(self.deactivate) self.KARABO_SLOT(self.reset) self.KARABO_SLOT(self.requestScene) # ON_DATA is a bit more convenient I think self.KARABO_ON_DATA("input", self.on_data) + self._source_to_shmem_name = {} self._source_to_shmem_ary = {} self._source_to_shmem_mem = {} + self._buffer_lock = threading.Lock() def on_data(self, data, metadata): source = metadata["source"] @@ -64,18 +65,34 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): new_meta[source] = conversion.meta_to_dict(metadata) new_meta[source]["ignored_keys"] = ignore - # may have to open shared memory buffer - if source not in self._source_to_shmem_ary: - self.log.INFO( - f"For source {source}, will open shmem handle {handle_string}" - ) - mem, ary = shmem_utils.open_shmem_from_handle(handle_string, rw=False) - self._source_to_shmem_mem[source] = mem - self._source_to_shmem_ary[source] = ary - - # grab data from shared memory buffer - _, _, _, index = shmem_utils.parse_shmem_handle(handle_string) - buffer_data = self._source_to_shmem_ary[source][index] + shmem_name, _, _, shmem_index = shmem_utils.parse_shmem_handle( + handle_string + ) + with self._buffer_lock: + # may have to open shared memory buffer + if ( + source not in self._source_to_shmem_name + or self._source_to_shmem_name[source] != shmem_name + ): + if source in self._source_to_shmem_ary: + self.log.INFO( + f"Releasing old buffer {shmem_name} for source {source}" + ) + del self._source_to_shmem_ary[source] + self.log.INFO(f"Opening buffer {shmem_name} for source {source}") + try: + mem, ary = shmem_utils.open_shmem_from_handle( + handle_string, rw=False + ) + except OSError: + self.log.WARN(f"Failed to open buffer {shmem_name}") + continue + self._source_to_shmem_mem[source] = mem + self._source_to_shmem_ary[source] = ary + self._source_to_shmem_name[source] = shmem_name + + # grab data from shared memory buffer + buffer_data = self._source_to_shmem_ary[source][shmem_index] # conversion separates array stuf into separate dict, "arr" dic, arr = conversion.hash_to_dict( @@ -98,3 +115,8 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): self._updateProperties(output_tic) # block if device is in passive state self.monitoring.wait() + + def preDestruction(self): + for ary in self._source_to_shmem_ary.values(): + del ary + super().preDestruction() diff --git a/src/calng/cuda_pipeline.py b/src/calng/cuda_pipeline.py index 82fb4da83929b79b101b2856ffb8607a7cd3b2b4..c13c2fa20b1e1b892c651fe446d6986ba0f6e317 100644 --- a/src/calng/cuda_pipeline.py +++ b/src/calng/cuda_pipeline.py @@ -80,6 +80,9 @@ class PyCudaPipeline: self.update_block_size(full_block=(1, 1, 64), preview_block=(1, 64, 1)) + def __del__(self): + del self.output_buffer_ary + def load_constants(self, offset_map_host): constant_memory_cells = offset_map_host.shape[-1] if constant_memory_cells != self.constant_memory_cells: