Skip to content
Snippets Groups Projects
Commit 81e6aebf authored by David Hammer's avatar David Hammer
Browse files

Abstract receiving, caching of calng shmem handles

parent 0ec69b2b
No related branches found
No related tags found
2 merge requests!12Snapshot: field test deployed version as of end of run 202201,!3Base correction device, CalCat interaction, DSSC and AGIPD devices
......@@ -11,32 +11,7 @@ from ._version import version as deviceVersion
class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
def initialization(self):
super().initialization()
self._source_to_shmem_ary = {}
self._source_to_shmem_mem = {}
self._buffer_lock = threading.Lock()
def _get_shmem_buffer_data(self, source, shmem_handle):
# TODO: handle failure if this was not a shmem handle
name, dtype, shape, index = shmem_utils.parse_shmem_handle(shmem_handle)
with self._buffer_lock:
# may have to open shared memory buffer
if source not in self._source_to_shmem_ary:
self.log.INFO(f"Opening buffer {name} for source {source}")
try:
mem, ary = shmem_utils.open_shmem_from_handle(shmem_handle)
except OSError:
self.log.WARN(f"Failed to open buffer {name}")
return None
self._source_to_shmem_mem[source] = mem
self._source_to_shmem_ary[source] = ary
elif self._source_to_shmem_ary[source].shape != shape:
self.log.INFO(f"Updating buffer shape for {source} to {shape}")
self._source_to_shmem_ary[source] = self._source_to_shmem_mem[
source
].ndarray(shape=shape, dtype=dtype)
# grab data from shared memory buffer
return self._source_to_shmem_ary[source][index]
self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver()
def onInput(self, input_channel):
actual = self.getActualTimestamp()
......@@ -78,7 +53,12 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
f"Hash from {source} did not have {shmem_handle_path}"
)
continue
actual_data = self._get_shmem_buffer_data(source, shmem_handle)
elif shmem_handle_path == "":
self.log.INFO(
f"Hash from {source} had empty {shmem_handle_path}"
)
continue
actual_data = self._shmem_handler.get(shmem_handle)
arr[shmem_handle_path] = actual_data
data[source] = (dic, arr)
......@@ -93,8 +73,3 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
self._updateProperties(output_tic)
# block if device is in passive state
self.monitoring.wait()
def preDestruction(self):
for ary in self._source_to_shmem_ary.values():
del ary
super().preDestruction()
......@@ -23,6 +23,30 @@ def open_shmem_from_handle(handle_string):
return buffer_mem, array
class ShmemCircularBufferReceiver:
def __init__(self):
self._name_to_mem = {}
self._name_to_ary = {}
def get(self, handle_string):
name, dtype, shape, index = parse_shmem_handle(handle_string)
if name not in self._name_to_mem:
mem = posixshmem.SharedMemory(name=name, rw=False)
self._name_to_mem[name] = mem
ary = mem.ndarray(shape=shape, dtype=dtype)
self._name_to_ary[name] = ary
return ary[index]
ary = self._name_to_ary[name]
if ary.shape != shape or ary.dtype != dtype:
del ary
mem = self._name_to_mem[name]
ary = mem.ndarray(shape=shape, dtype=dtype)
self._name_to_ary[name] = ary
return ary[index]
class ShmemCircularBuffer:
"""Convenience wrapper around posixshmem-backed ndarray buffers
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment