From 81e6aebf79d9efa40e59dd54600c13e4daeb6e01 Mon Sep 17 00:00:00 2001
From: David Hammer <dhammer@mailbox.org>
Date: Mon, 1 Nov 2021 14:32:02 +0100
Subject: [PATCH] Abstract receiving, caching of calng shmem handles

---
 src/calng/ShmemToZMQ.py  | 39 +++++++--------------------------------
 src/calng/shmem_utils.py | 24 ++++++++++++++++++++++++
 2 files changed, 31 insertions(+), 32 deletions(-)

diff --git a/src/calng/ShmemToZMQ.py b/src/calng/ShmemToZMQ.py
index b31cbccd..1e0af8e1 100644
--- a/src/calng/ShmemToZMQ.py
+++ b/src/calng/ShmemToZMQ.py
@@ -11,32 +11,7 @@ from ._version import version as deviceVersion
 class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
     def initialization(self):
         super().initialization()
-        self._source_to_shmem_ary = {}
-        self._source_to_shmem_mem = {}
-        self._buffer_lock = threading.Lock()
-
-    def _get_shmem_buffer_data(self, source, shmem_handle):
-        # TODO: handle failure if this was not a shmem handle
-        name, dtype, shape, index = shmem_utils.parse_shmem_handle(shmem_handle)
-        with self._buffer_lock:
-            # may have to open shared memory buffer
-            if source not in self._source_to_shmem_ary:
-                self.log.INFO(f"Opening buffer {name} for source {source}")
-                try:
-                    mem, ary = shmem_utils.open_shmem_from_handle(shmem_handle)
-                except OSError:
-                    self.log.WARN(f"Failed to open buffer {name}")
-                    return None
-                self._source_to_shmem_mem[source] = mem
-                self._source_to_shmem_ary[source] = ary
-            elif self._source_to_shmem_ary[source].shape != shape:
-                self.log.INFO(f"Updating buffer shape for {source} to {shape}")
-                self._source_to_shmem_ary[source] = self._source_to_shmem_mem[
-                    source
-                ].ndarray(shape=shape, dtype=dtype)
-
-            # grab data from shared memory buffer
-            return self._source_to_shmem_ary[source][index]
+        self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver()
 
     def onInput(self, input_channel):
         actual = self.getActualTimestamp()
@@ -78,7 +53,12 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
                         f"Hash from {source} did not have {shmem_handle_path}"
                     )
                     continue
-                actual_data = self._get_shmem_buffer_data(source, shmem_handle)
+                elif shmem_handle_path == "":
+                    self.log.INFO(
+                        f"Hash from {source} had empty {shmem_handle_path}"
+                    )
+                    continue
+                actual_data = self._shmem_handler.get(shmem_handle)
                 arr[shmem_handle_path] = actual_data
 
             data[source] = (dic, arr)
@@ -93,8 +73,3 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
         self._updateProperties(output_tic)
         # block if device is in passive state
         self.monitoring.wait()
-
-    def preDestruction(self):
-        for ary in self._source_to_shmem_ary.values():
-            del ary
-        super().preDestruction()
diff --git a/src/calng/shmem_utils.py b/src/calng/shmem_utils.py
index d4cc860f..54f3bc9d 100644
--- a/src/calng/shmem_utils.py
+++ b/src/calng/shmem_utils.py
@@ -23,6 +23,30 @@ def open_shmem_from_handle(handle_string):
     return buffer_mem, array
 
 
+class ShmemCircularBufferReceiver:
+    def __init__(self):
+        self._name_to_mem = {}
+        self._name_to_ary = {}
+
+    def get(self, handle_string):
+        name, dtype, shape, index = parse_shmem_handle(handle_string)
+        if name not in self._name_to_mem:
+            mem = posixshmem.SharedMemory(name=name, rw=False)
+            self._name_to_mem[name] = mem
+            ary = mem.ndarray(shape=shape, dtype=dtype)
+            self._name_to_ary[name] = ary
+            return ary[index]
+
+        ary = self._name_to_ary[name]
+        if ary.shape != shape or ary.dtype != dtype:
+            del ary
+            mem = self._name_to_mem[name]
+            ary = mem.ndarray(shape=shape, dtype=dtype)
+            self._name_to_ary[name] = ary
+
+        return ary[index]
+
+
 class ShmemCircularBuffer:
     """Convenience wrapper around posixshmem-backed ndarray buffers
 
-- 
GitLab