From 60024124fd8c680c7b03b00a73bd55bcd2ec0a60 Mon Sep 17 00:00:00 2001
From: David Hammer <dhammer@mailbox.org>
Date: Fri, 14 Apr 2023 12:52:20 +0200
Subject: [PATCH] Add generic arbiter kernels: reduce-threshold and boolean

---
 setup.py                                      |  7 +-
 src/calng/arbiter_kernels/boolean_ops.py      | 51 ++++++++++
 .../arbiter_kernels/integrated_intensity.py   | 41 --------
 src/calng/arbiter_kernels/reduce_threshold.py | 95 +++++++++++++++++++
 src/calng/correction_addons/random_frames.py  |  3 +-
 5 files changed, 151 insertions(+), 46 deletions(-)
 create mode 100644 src/calng/arbiter_kernels/boolean_ops.py
 delete mode 100644 src/calng/arbiter_kernels/integrated_intensity.py
 create mode 100644 src/calng/arbiter_kernels/reduce_threshold.py

diff --git a/setup.py b/setup.py
index 6b876160..2dcf9a60 100644
--- a/setup.py
+++ b/setup.py
@@ -51,14 +51,13 @@ setup(
         ],
         "calng.correction_addon": [
             "IntegratedIntensity = calng.correction_addons.integrated_intensity:IntegratedIntensityAddon [agipd]",  # noqa
-            "RandomFrames = calng.correction_addons.random_frames:RandomFramesAddon",
             "Peakfinder9 = calng.correction_addons.peakfinder9:Peakfinder9Addon [agipd]",  # noqa
+            "RandomFrames = calng.correction_addons.random_frames:RandomFramesAddon",
         ],
         "calng.arbiter_kernel": [
-            "IntegratedIntensity = calng.arbiter_kernels.integrated_intensity:IntegratedIntensity",
-            #"ReduceAndThreshold = calng.arbiter_kernels.reduce_threshold:ReduceAndThresholdArbiter",  # noqa
+            "BooleanCombination = calng.arbiter_kernels.boolean_ops:BooleanCombination",  # noqa
             "RandomFrames = calng.arbiter_kernels.random_frames:RandomSampler",  # noqa
-            #"BooleanCombination = calng.arbiter_kernels.boolean_ops:BooleanCombinationArbiter",  # noqa
+            "ReduceAndThreshold = calng.arbiter_kernels.reduce_threshold:ReduceAndThreshold",  # noqa
         ],
     },
     package_data={"": ["kernels/*"]},
diff --git a/src/calng/arbiter_kernels/boolean_ops.py b/src/calng/arbiter_kernels/boolean_ops.py
new file mode 100644
index 00000000..11a80536
--- /dev/null
+++ b/src/calng/arbiter_kernels/boolean_ops.py
@@ -0,0 +1,51 @@
+from karabo.bound import (
+    NODE_ELEMENT,
+    STRING_ELEMENT,
+)
+import numpy as np
+from .base_kernel import BaseArbiterKernel
+
+
+class BooleanCombination(BaseArbiterKernel):
+    _node_name = "booleanCombination"
+
+    def __init__(self, config):
+        self._key = config.get("key")
+        self._operator = getattr(np, config.get("operator"))
+
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            NODE_ELEMENT(schema)
+            .key(prefix)
+            .description(
+                "This kernel allows combining individual masks from many sources. It "
+                "is intended for cases where, for example, each correction device via "
+                "some addon provides its own frame mask. This arbiter kernel would "
+                "then be able to AND (keep only frames making it through all masks) or "
+                "OR these individual masks."
+            )
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.key")
+            .assignmentOptional()
+            .defaultValue("data.dataFramePattern")
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.operator")
+            .assignmentOptional()
+            .defaultValue("all")
+            .options("all,any")
+            .reconfigurable()
+            .commit(),
+        )
+
+    def consider(self, train_id, sources, num_frames):
+        # pretty sure this is special case of reduce and threshold in some algebra
+        return self._operator(
+            [data[self._key] for (data, _) in sources.values() if data.has(self._key)],
+            axis=0,
+        ).astype(np.uint8, copy=False)
diff --git a/src/calng/arbiter_kernels/integrated_intensity.py b/src/calng/arbiter_kernels/integrated_intensity.py
deleted file mode 100644
index 0deb1c63..00000000
--- a/src/calng/arbiter_kernels/integrated_intensity.py
+++ /dev/null
@@ -1,41 +0,0 @@
-from karabo.bound import (
-    DOUBLE_ELEMENT,
-    NODE_ELEMENT,
-)
-import numpy as np
-from .base_kernel import BaseArbiterKernel
-
-
-class IntegratedIntensity(BaseArbiterKernel):
-    _node_name = "integratedIntensity"
-
-    def __init__(self, config):
-        self._threshold = config.get("threshold")
-
-    @staticmethod
-    def extend_device_schema(schema, prefix):
-        (
-            NODE_ELEMENT(schema)
-            .key(prefix)
-            .commit(),
-
-            DOUBLE_ELEMENT(schema)
-            .key(f"{prefix}.threshold")
-            .assignmentOptional()
-            .defaultValue(1e8)
-            .reconfigurable()
-            .commit(),
-        )
-
-    def consider(self, train_id, sources, num_frames):
-        return (
-            np.sum(
-                [
-                    data["image.integratedIntensity"]
-                    for (data, _) in sources.values()
-                    if data.has("image.integratedIntensity")
-                ],
-                axis=0,
-            )
-            > self._threshold
-        ).astype(np.uint8, copy=False)
diff --git a/src/calng/arbiter_kernels/reduce_threshold.py b/src/calng/arbiter_kernels/reduce_threshold.py
new file mode 100644
index 00000000..dac4abc4
--- /dev/null
+++ b/src/calng/arbiter_kernels/reduce_threshold.py
@@ -0,0 +1,95 @@
+import operator
+
+from karabo.bound import (
+    DOUBLE_ELEMENT,
+    NODE_ELEMENT,
+    STRING_ELEMENT,
+)
+import numpy as np
+from .base_kernel import BaseArbiterKernel
+
+
+class ReduceAndThreshold(BaseArbiterKernel):
+    _node_name = "reduceAndThreshold"
+
+    def __init__(self, config):
+        self._key = config.get("key")
+        self._threshold = config.get("threshold")
+        self._reduction = getattr(np, config.get("reduction"))
+        self._comparator = getattr(operator, config.get("comparator"))
+
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            NODE_ELEMENT(schema)
+            .key(prefix)
+            .description(
+                "This kernel allows custom frame masks based on comparing some "
+                "reduction along modules over each frame to some threshold. "
+                "Integrated intensity will be used as a running example in the "
+                "descriptions of the properties under this node; these settings (used "
+                "with the example integrated intensity addon) would yield a frame "
+                "mask of frames with total (across modules) integrated intensity of "
+                "at least [threshold] ADUs."
+            )
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.key")
+            .description(
+                "The key in the input hashes which holds the data values for reduction "
+                "and comparison. Typically some summary computed per frame by an "
+                "addon; for example, 'image.integratedIntensity'"
+            )
+            .assignmentOptional()
+            .defaultValue("")
+            .reconfigurable()
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.threshold")
+            .assignmentOptional()
+            .defaultValue(0)
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.reduction")
+            .description(
+                "The reduction to apply across modules before comparing with the "
+                "threshold. For example 'sum'."
+            )
+            .assignmentOptional()
+            .defaultValue("sum")
+            .options("sum,mean,max,min,std")
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.comparator")
+            .description(
+                "The comparator to use for comparing the reduced per-frame values to "
+                "the threshold. For the integrated intensity example, 'ge'. The "
+                "reduced values will be on the left and the threshold will be on the "
+                "right, so the example would be compute: "
+                "mask = sum(integrated intensity) ≥ threshold."
+            )
+            .assignmentOptional()
+            .defaultValue("ge")
+            .options("lt,le,gt,ge,eq,ne")
+            .reconfigurable()
+            .commit(),
+        )
+
+    def consider(self, train_id, sources, num_frames):
+        return self._comparator(
+            self._reduction(
+                [
+                    data[self._key]
+                    for (data, _) in sources.values()
+                    if data.has(self._key)
+                ],
+                axis=0,
+            ),
+            self._threshold,
+        ).astype(np.uint8, copy=False)
diff --git a/src/calng/correction_addons/random_frames.py b/src/calng/correction_addons/random_frames.py
index 42ab7c1a..e51221e1 100644
--- a/src/calng/correction_addons/random_frames.py
+++ b/src/calng/correction_addons/random_frames.py
@@ -33,7 +33,8 @@ class RandomFramesAddon(BaseCorrectionAddon):
         )
 
     def __init__(self, config):
-        self._probability = config["probability"] / 100
+        # TODO: figure out why no / 100 here...
+        self._probability = config["probability"]
 
     def post_correction(self, processed_data, cell_table, pulse_table, output_hash):
         output_hash["data.dataFramePattern"] = (
-- 
GitLab