diff --git a/src/calng/ShmemToZMQ.py b/src/calng/ShmemToZMQ.py index b31cbccde2d94a8d97e4bd1218945e8250283a18..1e0af8e14f5b3b71f1fa29db231e95370ac714a0 100644 --- a/src/calng/ShmemToZMQ.py +++ b/src/calng/ShmemToZMQ.py @@ -11,32 +11,7 @@ from ._version import version as deviceVersion class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): def initialization(self): super().initialization() - self._source_to_shmem_ary = {} - self._source_to_shmem_mem = {} - self._buffer_lock = threading.Lock() - - def _get_shmem_buffer_data(self, source, shmem_handle): - # TODO: handle failure if this was not a shmem handle - name, dtype, shape, index = shmem_utils.parse_shmem_handle(shmem_handle) - with self._buffer_lock: - # may have to open shared memory buffer - if source not in self._source_to_shmem_ary: - self.log.INFO(f"Opening buffer {name} for source {source}") - try: - mem, ary = shmem_utils.open_shmem_from_handle(shmem_handle) - except OSError: - self.log.WARN(f"Failed to open buffer {name}") - return None - self._source_to_shmem_mem[source] = mem - self._source_to_shmem_ary[source] = ary - elif self._source_to_shmem_ary[source].shape != shape: - self.log.INFO(f"Updating buffer shape for {source} to {shape}") - self._source_to_shmem_ary[source] = self._source_to_shmem_mem[ - source - ].ndarray(shape=shape, dtype=dtype) - - # grab data from shared memory buffer - return self._source_to_shmem_ary[source][index] + self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver() def onInput(self, input_channel): actual = self.getActualTimestamp() @@ -78,7 +53,12 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): f"Hash from {source} did not have {shmem_handle_path}" ) continue - actual_data = self._get_shmem_buffer_data(source, shmem_handle) + elif shmem_handle_path == "": + self.log.INFO( + f"Hash from {source} had empty {shmem_handle_path}" + ) + continue + actual_data = self._shmem_handler.get(shmem_handle) arr[shmem_handle_path] = actual_data data[source] = (dic, arr) @@ -93,8 +73,3 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ): self._updateProperties(output_tic) # block if device is in passive state self.monitoring.wait() - - def preDestruction(self): - for ary in self._source_to_shmem_ary.values(): - del ary - super().preDestruction() diff --git a/src/calng/shmem_utils.py b/src/calng/shmem_utils.py index d4cc860fdb2f253514b00170bc8262d8e1104652..54f3bc9df926dfbfc47d18e01ad880331810fece 100644 --- a/src/calng/shmem_utils.py +++ b/src/calng/shmem_utils.py @@ -23,6 +23,30 @@ def open_shmem_from_handle(handle_string): return buffer_mem, array +class ShmemCircularBufferReceiver: + def __init__(self): + self._name_to_mem = {} + self._name_to_ary = {} + + def get(self, handle_string): + name, dtype, shape, index = parse_shmem_handle(handle_string) + if name not in self._name_to_mem: + mem = posixshmem.SharedMemory(name=name, rw=False) + self._name_to_mem[name] = mem + ary = mem.ndarray(shape=shape, dtype=dtype) + self._name_to_ary[name] = ary + return ary[index] + + ary = self._name_to_ary[name] + if ary.shape != shape or ary.dtype != dtype: + del ary + mem = self._name_to_mem[name] + ary = mem.ndarray(shape=shape, dtype=dtype) + self._name_to_ary[name] = ary + + return ary[index] + + class ShmemCircularBuffer: """Convenience wrapper around posixshmem-backed ndarray buffers