From 7cd410fae246b1f9cbb43e6b55b74e634be7d2c3 Mon Sep 17 00:00:00 2001
From: Egor Sobolev <egor.sobolev@xfel.eu>
Date: Fri, 2 Aug 2024 09:45:43 +0200
Subject: [PATCH] Refactoring of arbiter to add flexibility and to allow reuse
 kernels

---
 setup.py                                      |   7 +-
 src/calng/FrameSelectionArbiter.py            | 567 ++++++++++++++----
 src/calng/arbiter_kernels/base_kernel.py      |  37 +-
 src/calng/arbiter_kernels/boolean_ops.py      |  44 +-
 .../arbiter_kernels/intensity_threshold.py    | 114 ++++
 src/calng/arbiter_kernels/litframes.py        |  22 +
 src/calng/arbiter_kernels/manual.py           | 109 ++++
 src/calng/arbiter_kernels/ppu_arbiter.py      |  11 +-
 src/calng/arbiter_kernels/random_frames.py    |  19 +-
 src/calng/arbiter_kernels/reduce_threshold.py |  51 +-
 src/calng/base_correction.py                  |   4 +-
 src/calng/expr.py                             | 126 ++++
 src/calng/scenes.py                           | 195 ++++--
 tests/common_setup.py                         |  44 ++
 tests/test_frameselection.py                  | 452 ++++++++++++++
 15 files changed, 1562 insertions(+), 240 deletions(-)
 create mode 100644 src/calng/arbiter_kernels/intensity_threshold.py
 create mode 100644 src/calng/arbiter_kernels/litframes.py
 create mode 100644 src/calng/arbiter_kernels/manual.py
 create mode 100644 src/calng/expr.py
 create mode 100644 tests/test_frameselection.py

diff --git a/setup.py b/setup.py
index 088f2c52..2f3602f6 100644
--- a/setup.py
+++ b/setup.py
@@ -35,7 +35,8 @@ setup(
             "PnccdCorrection = calng.corrections.PnccdCorrection:PnccdCorrection",
             "ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ",
             "ShmemTrainMatcher = calng.ShmemTrainMatcher:ShmemTrainMatcher",
-            "FrameSelectionArbiter = calng.FrameSelectionArbiter:FrameSelectionArbiter",  # noqa
+            "SimpleFrameSelectionArbiter = calng.FrameSelectionArbiter:SimpleFrameSelectionArbiter",  # noqa
+            "AdvancedFrameSelectionArbiter = calng.FrameSelectionArbiter:AdvancedFrameSelectionArbiter",  # noqa
             "DetectorAssembler = calng.DetectorAssembler:DetectorAssembler",
             "Gotthard2Assembler = calng.Gotthard2Assembler:Gotthard2Assembler",
             "LpdminiSplitter = calng.LpdminiSplitter:LpdminiSplitter",
@@ -56,7 +57,11 @@ setup(
             "SaturationMonitor = calng.correction_addons.saturation_monitor:SaturationMonitor",# noqa
         ],
         "calng.arbiter_kernel": [
+            "Assign = calng.arbiter_kernels.base_kernel:Assign",
             "BooleanCombination = calng.arbiter_kernels.boolean_ops:BooleanCombination",  # noqa
+            "IntensityThreshold = calng.arbiter_kernels.intensity_threshold:IntensityThreshold",  # noqa
+            "LitFrames = calng.arbiter_kernels.litframes:LitFrames",
+            "ManualFilter = calng.arbiter_kernels.manual:ManualFilter",
             "Ppu = calng.arbiter_kernels.ppu_arbiter:PpuKernel",
             "RandomFrames = calng.arbiter_kernels.random_frames:RandomSampler",  # noqa
             "ReduceAndThreshold = calng.arbiter_kernels.reduce_threshold:ReduceAndThreshold",  # noqa
diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py
index 21b8ddf1..079252af 100644
--- a/src/calng/FrameSelectionArbiter.py
+++ b/src/calng/FrameSelectionArbiter.py
@@ -1,50 +1,121 @@
+import enum
 from importlib.metadata import entry_points
 
+import numpy as np
 from geometryDevices import utils as geom_utils
 from karabo.bound import (
+    BOOL_ELEMENT,
     KARABO_CLASSINFO,
     NODE_ELEMENT,
     OUTPUT_CHANNEL,
+    OVERWRITE_ELEMENT,
     STRING_ELEMENT,
+    TABLE_ELEMENT,
     VECTOR_BOOL_ELEMENT,
-    ChannelMetaData,
     Epochstamp,
     Hash,
     Schema,
+    State,
     Timestamp,
     Trainstamp,
 )
 from TrainMatcher import TrainMatcher
 
-from . import utils
+from . import scenes
 from ._version import version as deviceVersion
 from .arbiter_kernels.base_kernel import KernelWarning
+from .expr import BinaryExpression
+from .utils import WarningContextSystem
 
-my_schema = Schema()
-(
-    NODE_ELEMENT(my_schema)
-    .key("data")
-    .commit(),
-
-    VECTOR_BOOL_ELEMENT(my_schema)
-    .key("data.dataFramePattern")
-    .assignmentOptional()
-    .defaultValue([])
-    .commit(),
-)
+
+class DeviceWarning(enum.Enum):
+    PLAN = "plan"
+    DECISION = "decision"
+
+
+def output_schema():
+    res = Schema()
+    (
+        NODE_ELEMENT(res)
+        .key("data")
+        .commit(),
+
+        VECTOR_BOOL_ELEMENT(res)
+        .key("data.dataFramePattern")
+        .assignmentOptional()
+        .defaultValue([])
+        .commit(),
+    )
+    return res
 
 
 kernels = [
     kernel_class.load()
-    for kernel_class in entry_points().get("calng.arbiter_kernel", [])
+    for kernel_class in entry_points().select().get("calng.arbiter_kernel", [])
 ]
-kernel_choice = {
-    kernel_class.__name__: kernel_class for kernel_class in kernels
-}
+kernel_choice = {kernel_class.__name__: kernel_class for kernel_class in kernels}
+
+
+def selection_plan_schema(kernels):
+    schema = Schema()
+    (
+        BOOL_ELEMENT(schema)
+        .key("enable")
+        .displayedName("Enable")
+        .assignmentOptional()
+        .defaultValue(False)
+        .reconfigurable()
+        .commit(),
+
+        STRING_ELEMENT(schema)
+        .key("name")
+        .displayedName("Name")
+        .description(
+            "The name assigned to the result of running this kernel. This name can be "
+            "used in (later) preselection expressions and in the final decision "
+            "expression. Each row must have a unique name and names must be valid "
+            "identifiers. Each selection step gets its configuration from a schema "
+            "node corresponding to its name."
+        )
+        .assignmentOptional()
+        .defaultValue("name")
+        .reconfigurable()
+        .commit(),
+
+        STRING_ELEMENT(schema)
+        .key("kernel")
+        .displayedName("Kernel")
+        .description(
+            "Choose which kernel to run in this selection step. The special kernel "
+            "'Assign' does nothing, but by using the preselection expression, it "
+            "effectively assigns the result of the preselection to the given name, "
+            "allowing you to name combined results from other kernels."
+        )
+        .options(",".join(kernel_choice))
+        .assignmentOptional()
+        .defaultValue("Assign")
+        .reconfigurable()
+        .commit(),
+
+        STRING_ELEMENT(schema)
+        .key("preselection")
+        .displayedName("Preselection")
+        .description(
+            "If not empty, this is evaluated as a logical expression (see "
+            "frameSelection.decision) involving previous selection steps. The "
+            "resulting mask is then passed to the kernel running in this selection "
+            "step. It is up to the kernel how the mask is used. The 'Assign' kernel "
+            "will just return the mask it is given."
+        )
+        .assignmentOptional()
+        .defaultValue("")
+        .reconfigurable()
+        .commit(),
+    )
+    return schema
 
 
-@KARABO_CLASSINFO("FrameSelectionArbiter", deviceVersion)
-class FrameSelectionArbiter(TrainMatcher.TrainMatcher):
+class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher):
     @staticmethod
     def expectedParameters(expected):
         (
@@ -52,9 +123,7 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher):
             .key("geometryDevice")
             .displayedName("Geometry device")
             .description(
-                "The the device which will provide geometry. The device is expected to "
-                "have current geometry as a string (for now using custom non-robust "
-                "serialization scheme) element named serializedGeometry."
+                "The the device which will provide geometry."
             )
             .assignmentMandatory()
             .commit(),
@@ -63,134 +132,394 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher):
             .key("frameSelection")
             .displayedName("Frame selection")
             .commit(),
+        )
+
+    def __init__(self, config):
+        super().__init__(config)
+        self.registerSlot(self.slotReceiveGeometry)
 
+    def initialization(self):
+        super().initialization()
+        self.warning_context = WarningContextSystem(
+            self, on_success={"state": "PASSIVE"}
+        )
+
+        self._geometry = None
+        if self.get("geometryDevice"):
+            self.signalSlotable.connect(
+                self.get("geometryDevice"),
+                "signalNewGeometry",
+                "",  # slot device ID (default: self)
+                "slotReceiveGeometry",
+            )
+
+    def _guess_number_of_frames(self, sources):
+        # TODO: robust frame deduction
+        for (data, _) in sources.values():
+            if data.has("image.cellId"):
+                return data.get("image.cellId").size
+        self.log.ERROR("Unable to figure out number of frames")
+
+    def slotReceiveGeometry(self, device_id, serialized_geometry):
+        self.log.INFO(f"Received geometry from {device_id}")
+        try:
+            self._geometry = geom_utils.deserialize_geometry(serialized_geometry)
+        except Exception as e:
+            self.log.WARN(f"Failed to deserialize geometry; {e}")
+
+@KARABO_CLASSINFO("SimpleFrameSelectionArbiter", deviceVersion)
+class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter):
+    @staticmethod
+    def expectedParameters(expected):
+        (
             STRING_ELEMENT(expected)
             .key("frameSelection.kernelChoice")
-            .displayedName("Kernel to use")
-            .assignmentMandatory()
-            .options(",".join(kernel_choice.keys()))
+            .options(",".join(kernel_choice))
+            .assignmentOptional()
+            .defaultValue("ManualFilter")
             .reconfigurable()
             .commit(),
 
             STRING_ELEMENT(expected)
             .key("frameSelection.kernelState")
             .setSpecialDisplayType("State")
-            .displayedName("Kernel state")
             .readOnly()
             .initialValue("OFF")
             .commit(),
+        )
 
-            NODE_ELEMENT(expected)
-            .key("frameSelection.kernels")
-            .displayedName("Kernels")
+    def initialization(self):
+        super().initialization()
+
+        self._update_schema(self["frameSelection.kernelChoice"])
+        self._initialize_kernel(
+            self["frameSelection.kernelChoice"], self["frameSelection.kernelParameters"]
+        )
+
+        if self["state"] != State.ERROR:
+            self.start()  # Auto-start this type of matcher.
+
+    def _update_schema(self, kernel_name):
+        schema_update = Schema()
+        (
+            NODE_ELEMENT(schema_update)
+            .key("frameSelection")
             .commit(),
 
-            OUTPUT_CHANNEL(expected)
+            NODE_ELEMENT(schema_update)
+            .key("frameSelection.kernelParameters")
+            .commit(),
+        )
+        kernel_class = kernel_choice[kernel_name]
+        kernel_class.extend_device_schema(
+            schema_update, "frameSelection.kernelParameters"
+        )
+        output_schema_update = output_schema()
+        kernel_class.extend_output_schema(output_schema_update, kernel_class.__name__)
+        (
+            OUTPUT_CHANNEL(schema_update)
             .key("output")
-            .dataSchema(my_schema)
+            .dataSchema(output_schema_update)
+            .commit(),
+        )
+        self.updateSchema(schema_update)
+
+    def _initialize_kernel(self, class_name, conf):
+        kernel_class = kernel_choice[class_name]
+        self.kernel = kernel_class(self, kernel_class.__name__, conf)
+
+    def preReconfigure(self, conf):
+        super().preReconfigure(conf)
+
+        if conf.has("frameSelection.kernelChoice"):
+            name = conf["frameSelection.kernelChoice"]
+            self._update_schema(name)
+            self._initialize_kernel(name, self["frameSelection.kernelParameters"])
+        if conf.has("frameSelection.kernelParameters"):
+            self.kernel.reconfigure(conf["frameSelection.kernelParameters"])
+
+    def on_matched_data(self, train_id, sources):
+        num_frames = self._guess_number_of_frames(sources)
+
+        # run kernels
+        out_hash = Hash()
+        with self.warning_context(
+            "frameSelection.kernelState", KernelWarning.PROCESSING
+        ) as warn:
+            try:
+                decision = self.kernel.consider(
+                    train_id,
+                    sources,
+                    num_frames,
+                    np.ones(num_frames, dtype=bool),
+                    out_hash,
+                )
+            except Exception as ex:
+                warn(f"Kernel failed: {ex}")
+                decision = np.ones(num_frames, dtype=bool)
+
+        out_hash["data.dataFramePattern"] = list(map(bool, decision))
+
+        # don't use TM's stored self.output as we have re-injected
+        self.writeChannel(
+            "output",
+            out_hash,
+            timestamp=Timestamp(Epochstamp(), Trainstamp(train_id)),
+        )
+        self.info["sent"] += 1
+        self.info["trainId"] = train_id
+        self.rate_out.update()
+
+
+@KARABO_CLASSINFO("AdvancedFrameSelectionArbiter", deviceVersion)
+class AdvancedFrameSelectionArbiter(BaseFrameSelectionArbiter):
+    @staticmethod
+    def expectedParameters(expected):
+        (
+            OVERWRITE_ELEMENT(expected)
+            .key("availableScenes")
+            .setNewDefaultValue(["overview", "trainMatcherScene"])
+            .commit(),
+
+            TABLE_ELEMENT(expected).key("frameSelection.plan")
+            .displayedName("Selection Plan")
+            .description("")
+            .setColumns(selection_plan_schema(kernel_choice.keys()))
+            .assignmentOptional()
+            .defaultValue([])
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(expected)
+            .key("frameSelection.decision")
+            .displayedName("Decision")
+            .assignmentMandatory()
+            .reconfigurable()
+            .commit(),
+
+            NODE_ELEMENT(expected)
+            .key("frameSelection.selections")
+            .displayedName("Selections")
             .commit(),
         )
-        for kernel_class in kernel_choice.values():
-            kernel_class.extend_device_schema(
-                expected, f"frameSelection.kernels.{kernel_class.__name__}"
-            )
 
     def __init__(self, config):
         super().__init__(config)
-        self.registerSlot(self.slotReceiveGeometry)
+        self._selection_steps = {}  # name -> (kernel class, preselection expression)
+        self._selection_kernels = {}  # name -> kernel instance
+        self._decision_expr = BinaryExpression(None)
 
     def initialization(self):
         super().initialization()
-        self.warning_context = utils.WarningContextSystem(
-            self,
-            on_success={"frameSelection.kernelState": "ON"},
-        )
-        with self.warning_context("frameSelection.kernelState", KernelWarning.INIT):
-            self._kernel_class = kernel_choice[self.get("frameSelection.kernelChoice")]
-            self._kernel = self._kernel_class(
-                self._parameters[
-                    f"frameSelection.kernels.{self._kernel_class.__name__}"
-                ]
-            )
-            self._kernel._device = self
 
-        self._geometry = None
-        if self.get("geometryDevice"):
-            self.signalSlotable.connect(
-                self.get("geometryDevice"),
-                "signalNewGeometry",
-                "",  # slot device ID (default: self)
-                "slotReceiveGeometry",
+        self._validate_plan_and_update_schema(self["frameSelection.plan"])
+        self._initialize_kernels()
+        self._configure_decision(self["frameSelection.decision"])
+
+        if self["state"] != State.ERROR:
+            self.start()  # Auto-start this type of matcher.
+
+    def start(self):
+        super().start()
+        self.warning_context.on_success["state"] = "ACTIVE"
+
+    def stop(self):
+        super().stop()
+        self.warning_context.on_success["state"] = "PASSIVE"
+
+    def requestScene(self, params):
+        scene_name = params.get("name", default="")
+        if scene_name == "trainMatcherScene":
+            params["name"] = "scene"
+            return super().requestScene(params)
+        if scene_name != "overview":
+            self.log.INFO(
+                f"Unexpected scene name {scene_name} requested, "
+                "will just return overview"
             )
-        self.start()  # Auto-start this type of matcher.
+        payload = Hash("name", scene_name, "success", True)
+        payload["data"] = scenes.complex_arbiter_overview(
+            device_id=self.getInstanceId(),
+            schema=self.getFullSchema(),
+        )
+        self.reply(
+            Hash(
+                "type",
+                "deviceScene",
+                "origin",
+                self.getInstanceId(),
+                "payload",
+                payload,
+            )
+        )
 
     def on_matched_data(self, train_id, sources):
-        # TODO: robust frame deduction
-        num_frames = None
-        for (data, _) in sources.values():
-            if not data.has("image.cellId"):
-                continue
-            num_frames = data.get("image.cellId").size
-            break
-        else:
-            # TODO: also use warning context
-            self.log.WARN("Unable to figure out number of frames")
+        num_frames = self._guess_number_of_frames(sources)
 
-        with self.warning_context(
-            "frameSelection.kernelState", KernelWarning.PROCESSING
-        ):
-            decision = self._kernel.consider(train_id, sources, num_frames)
-        if isinstance(decision, Hash):
-            assert decision.has("data.dataFramePattern")
-            decision["data.dataFramePattern"] = list(
-                map(bool, decision["data.dataFramePattern"])
-            )
-            result = decision
-        else:
-            result = Hash()
-            # TODO: avoid recasting
-            result["data.dataFramePattern"] = list(map(bool, decision))
-        self.output.write(
-            result,
-            ChannelMetaData(
-                f"{self.getInstanceId()}:output",
-                Timestamp(Epochstamp(), Trainstamp(train_id)),
-            ),
-        )
-        self.output.update()
+        # run kernels
+        out_hash = Hash()
+        selections = self._process_selection_steps(sources, train_id, num_frames, out_hash)
+
+        # final selection decision
+        decision = self._decision_expr.eval(selections, np.ones(num_frames, bool))
+        out_hash["data.dataFramePattern"] = list(map(bool, decision))
+
+        # don't use TM's stored self.output as we have re-injected
+        self.writeChannel(
+            "output",
+            out_hash,
+            timestamp=Timestamp(Epochstamp(), Trainstamp(train_id)),
+        )
+        self.info["sent"] += 1
+        self.info["trainId"] = train_id
         self.rate_out.update()
 
-    def start(self):
-        self.set("frameSelection.kernelState", "OFF")
-        self.warning_context.reset()
-        super().start()
+    def _process_selection_steps(self, sources, train_id, num_frames, out_hash):
+        res = {}
+        for selection_name, kernel in self._selection_kernels.items():
+            prefix = f"frameSelection.selections.{selection_name}"
+            with self.warning_context(
+                f"{prefix}.state",
+                KernelWarning.PRESELECTION,
+            ) as warn:
+                default_mask = np.ones(num_frames, dtype=bool)
+                _, selection_expr = self._selection_steps[selection_name]
+                try:
+                    mask = selection_expr.eval(
+                        res, default_mask
+                    )
+                except Exception as ex:
+                    warn(f"Evaluating preselection for {prefix} failed: {ex}")
+                    mask = default_mask
+            with self.warning_context(
+                f"{prefix}.state",
+                KernelWarning.PROCESSING,
+            ) as warn:
+                try:
+                    res[selection_name] = kernel.consider(
+                        train_id, sources, num_frames, mask, out_hash
+                    )
+                except Exception as ex:
+                    warn(f"Kernel for {prefix} failed: {ex}")
+                    res[selection_name] = np.ones(num_frames, dtype=bool)
+        return res
+
+    def _configure_decision(self, decision_string):
+        with self.warning_context("state", DeviceWarning.DECISION) as warn:
+            try:
+                self._decision_expr = BinaryExpression.parse(
+                    decision_string, self._selection_steps.keys()
+                )
+            except Exception as ex:
+                # TODO: maybe re-raise to reject reconfiguration
+                warn(f"Failure to parse decision string '{decision_string}': {ex}")
+
+    def _validate_plan_and_update_schema(self, plan):
+        new_selection_steps = {}
+
+        # TODO: don't update this many levels, just updaet frameSelection.selections
+        schema_update = Schema()
+        (
+            NODE_ELEMENT(schema_update)
+            .key("frameSelection")
+            .displayedName("Frame selection")
+            .commit(),
+
+            NODE_ELEMENT(schema_update)
+            .key("frameSelection.selections")
+            .displayedName("Selections")
+            .commit(),
+        )
+        output_schema_update = output_schema()
+        with self.warning_context("state", DeviceWarning.PLAN) as warn:
+            for selection in plan:
+                if not selection["enable"]:
+                    continue
+
+                selection_name = selection["name"].strip()
+                if not selection_name.isidentifier():
+                    warn(f"Invalid selection name '{selection_name}'")
+                    continue
+                elif selection_name in new_selection_steps:
+                    warn(f"Duplicate selection name '{selection_name}'")
+                    continue
+
+                try:
+                    preselection = BinaryExpression.parse(
+                        selection["preselection"],
+                        new_selection_steps,  # just needs to check names
+                    )
+                except Exception as ex:
+                    warn(
+                        "Failure to parse (or undefined variable used in) "
+                        f"preselection for {selection_name}: {ex}"
+                    )
+                    continue
+
+                kernel_class = kernel_choice[selection["kernel"]]
+
+                kernel_prefix = f"frameSelection.selections.{selection_name}"
+                (
+                    NODE_ELEMENT(schema_update)
+                    .key(kernel_prefix)
+                    .commit(),
+
+                    STRING_ELEMENT(schema_update)
+                    .key(f"{kernel_prefix}.kernel")
+                    .readOnly()
+                    .initialValue(kernel_class.__name__)
+                    .commit(),
+
+                    STRING_ELEMENT(schema_update)
+                    .key(f"{kernel_prefix}.state")
+                    .setSpecialDisplayType("State")
+                    .readOnly()
+                    .initialValue("OFF")
+                    .commit(),
+                )
+                kernel_class.extend_device_schema(
+                    schema_update, kernel_prefix
+                )
+                kernel_class.extend_output_schema(output_schema_update, selection_name)
+                new_selection_steps[selection_name] = (kernel_class, preselection)
+
+        (
+            OUTPUT_CHANNEL(schema_update)
+            .key("output")
+            .dataSchema(output_schema_update)
+            .commit(),
+        )
+        self._selection_steps = new_selection_steps
+        self.updateSchema(schema_update)
+
+    def _initialize_kernels(self):
+        # instantiate kernels for selections
+        self._selection_kernels.clear()
+        for selection_name, (
+            kernel_class,
+            preselection,
+        ) in self._selection_steps.items():
+            prefix = f"frameSelection.selections.{selection_name}"
+            with self.warning_context(f"{prefix}.state", KernelWarning.INIT) as warn:
+                try:
+                    self._selection_kernels[selection_name] = kernel_class(
+                        self, selection_name, self.get(prefix)
+                    )
+                except Exception as ex:
+                    warn(f"Failed to instantiate kernel for {selection_name}: {ex}")
 
     def preReconfigure(self, conf):
         super().preReconfigure(conf)
-        self._reinstantiate_arbiter_kernel = False
-        if conf.has("frameSelection.kernelChoice"):
-            self.log.INFO("Switching frame selection kernel")
-            self._kernel_class = kernel_choice[conf.get("frameSelection.kernelChoice")]
-            self._reinstantiate_arbiter_kernel = True
-        elif conf.has(f"frameSelection.kernels.{self._kernel_class.__name__}"):
-            self.log.INFO("Reconfiguring frame selection kernel")
-            # TODO: update instead of rebuild for complex kernels
-            # (decide based on reconfigurability?)
-            self._reinstantiate_arbiter_kernel = True
-
-    def postReconfigure(self):
-        super().postReconfigure()
-        if self._reinstantiate_arbiter_kernel:
-            self._kernel = self._kernel_class(
-                self._parameters[
-                    f"frameSelection.kernels.{self._kernel_class.__name__}"
-                ]
-            )
-            self._reinstantiate_arbiter_kernel = False
 
-    def slotReceiveGeometry(self, device_id, serialized_geometry):
-        self.log.INFO(f"Received geometry from {device_id}")
-        try:
-            self._geometry = geom_utils.deserialize_geometry(serialized_geometry)
-        except Exception as e:
-            self.log.WARN(f"Failed to deserialize geometry; {e}")
+        if conf.has("frameSelection.plan"):
+            self._validate_plan_and_update_schema(conf["frameSelection.plan"])
+            self._initialize_kernels()
+
+        if conf.has("frameSelection.decision"):
+            self._configure_decision(conf["frameSelection.decision"])
+
+        if conf.has("frameSelection.selections"):
+            for name, kernel in self._selection_kernels.items():
+                kernel_prefix = f"frameSelection.selections.{name}"
+                if conf.has(kernel_prefix):
+                    kernel.reconfigure(conf[kernel_prefix])
diff --git a/src/calng/arbiter_kernels/base_kernel.py b/src/calng/arbiter_kernels/base_kernel.py
index 6d36a956..8ae88cab 100644
--- a/src/calng/arbiter_kernels/base_kernel.py
+++ b/src/calng/arbiter_kernels/base_kernel.py
@@ -1,37 +1,44 @@
 import enum
 
-import numpy as np
-
 
 class KernelWarning(enum.Enum):
     INIT = "init"
-    MISSINGSOURCE = "missingsource"
-    MISSINGKEY = "missingkey"
     PROCESSING = "processing"
+    PRESELECTION = "preselection"
 
 
 class BaseArbiterKernel:
-    def __init__(self, config):
-        """No need for prefix - hosting device should pass us the relevant subnode"""
-        self._config = config
-        self._device = None
+    def __init__(self, device, name, config):
+        self._device = device
+        self._name = name
+        self.reconfigure(config)
 
     @property
     def geometry(self):
         return self._device._geometry
 
-    def warning_context(self, warn_type):
-        return self._device.warning_context("frameSelection.kernelState", warn_type)
-
     @staticmethod
     def extend_device_schema(schema, prefix):
         """Should add configurability to the arbiter (matcher) the kernel will be
         running in.  Prefix will include node name, so will typically be
-        "frameSelection.kernels.{class.__name__}". The parent node will not have been
-        created already (unlike for correction addons)."""
+        "frameSelection.kernels.{class.__name__}". The parent node will have been
+        created already (unlike for correction addons), feel free to overwrite the
+        description."""
+        pass
+
+    @staticmethod
+    def extend_output_schema(schema, name):
         pass
 
-    def consider(self, train_id, sources, num_frames):
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
         """Must return tuple of uint8 mask array of size equal to num_frames
         and base output hash or None."""
-        return np.ones(num_frames, dtype=np.uint8), None
+        raise NotImplementedError()
+
+    def reconfigure(self, config):
+        pass
+
+
+class Assign(BaseArbiterKernel):
+    def consider(self, tid, src, nframes, mask, out):
+        return mask
diff --git a/src/calng/arbiter_kernels/boolean_ops.py b/src/calng/arbiter_kernels/boolean_ops.py
index 135516f6..acd1caa7 100644
--- a/src/calng/arbiter_kernels/boolean_ops.py
+++ b/src/calng/arbiter_kernels/boolean_ops.py
@@ -1,23 +1,18 @@
 from karabo.bound import (
-    NODE_ELEMENT,
+    OVERWRITE_ELEMENT,
     STRING_ELEMENT,
 )
 import numpy as np
-from .base_kernel import BaseArbiterKernel, KernelWarning
+from .base_kernel import BaseArbiterKernel
 
 
 class BooleanCombination(BaseArbiterKernel):
-    def __init__(self, config):
-        super().__init__(config)
-        self._key = config.get("key")
-        self._operator = getattr(np, config.get("operator"))
-
     @staticmethod
     def extend_device_schema(schema, prefix):
         (
-            NODE_ELEMENT(schema)
+            OVERWRITE_ELEMENT(schema)
             .key(prefix)
-            .description(
+            .setNewDescription(
                 "This kernel allows combining individual masks from many sources. It "
                 "is intended for cases where, for example, each correction device via "
                 "some addon provides its own frame mask. This arbiter kernel would "
@@ -42,17 +37,22 @@ class BooleanCombination(BaseArbiterKernel):
             .commit(),
         )
 
-    def consider(self, train_id, sources, num_frames):
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
         # pretty sure this is special case of reduce and threshold in some algebra
-        with self.warning_context(KernelWarning.MISSINGKEY) as warn:
-            sources_with_key = [
-                data[self._key] for (data, _) in sources.values() if data.has(self._key)
-            ]
-            if not sources_with_key:
-                warn(f"No sources had '{self._key}'")
-            else:
-                return self._operator(
-                    sources_with_key,
-                    axis=0,
-                ).astype(np.uint8, copy=False)
-        return np.ones(num_frames, dtype=np.uint8)
+        res = np.zeros(num_frames, dtype=bool)
+        mask = mask.astype(bool, copy=False)
+        sources_with_key = [
+            data[self._key][mask]
+            for (data, _) in sources.values()
+            if data.has(self._key)
+        ]
+        if not sources_with_key:
+            raise KeyError(f"No sources had '{self._key}'")
+        res[mask] = self._operator(sources_with_key, axis=0)
+        return res
+
+    def reconfigure(self, config):
+        if config.has("key"):
+            self._key = config.get("key")
+        if config.has("operator"):
+            self._operator = getattr(np, config.get("operator"))
diff --git a/src/calng/arbiter_kernels/intensity_threshold.py b/src/calng/arbiter_kernels/intensity_threshold.py
new file mode 100644
index 00000000..d68670ad
--- /dev/null
+++ b/src/calng/arbiter_kernels/intensity_threshold.py
@@ -0,0 +1,114 @@
+from karabo.bound import (
+    DOUBLE_ELEMENT,
+    INT32_ELEMENT,
+    OVERWRITE_ELEMENT,
+    VECTOR_UINT32_ELEMENT,
+    VECTOR_DOUBLE_ELEMENT,
+)
+from collections import deque
+import numpy as np
+from .base_kernel import BaseArbiterKernel
+
+
+def module_number(source):
+    token = source.split("/")[-1]
+    return int(token[:-8]) if token.endswith("CH0:xtdf") else None
+
+
+class IntensityThreshold(BaseArbiterKernel):
+    def __init__(self, name, config, host_device):
+        super().__init__(name, config, host_device)
+        self._key = "integratedIntensity.mean"
+        self._modules = set(config.get("modules"))
+        self._snr = config.get("snr")
+        self._min_frames = config.get("minFrames")
+        self._history = deque([], self._min_frames)
+
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            OVERWRITE_ELEMENT(schema)
+            .key(prefix)
+            .setNewDescription(
+                "This kernel selects the frames by comparing the averaged"
+                "intensity to the threshold."
+            )
+            .commit(),
+
+            VECTOR_UINT32_ELEMENT(schema)
+            .key(f"{prefix}.modules")
+            .assignmentOptional()
+            .defaultValue([3, 4, 8, 15])
+            .reconfigurable()
+            .commit(),
+
+            DOUBLE_ELEMENT(schema)
+            .key(f"{prefix}.snr")
+            .assignmentOptional()
+            .defaultValue(3.5)
+            .reconfigurable()
+            .commit(),
+
+            INT32_ELEMENT(schema)
+            .key(f"{prefix}.minFrames")
+            .assignmentOptional()
+            .defaultValue(100)
+            .reconfigurable()
+            .commit(),
+        )
+
+    @staticmethod
+    def extend_output_schema(schema, name):
+        (
+            DOUBLE_ELEMENT(schema)
+            .key(f"{name}.threshold")
+            .assignmentOptional()
+            .defaultValue(np.nan)
+            .commit(),
+
+            VECTOR_DOUBLE_ELEMENT(schema)
+            .key(f"{name}.intensity")
+            .assignmentOptional()
+            .defaultValue([])
+            .commit()
+        )
+
+    def reconfigure(self, config):
+        if config.has("modules"):
+            self._modules = set(config.get("modules"))
+        if config.has("snr"):
+            self._snr = config.get("snr")
+        if config.has("minFrames"):
+            self._min_frames = config.get("minFrames")
+            self._history = deque(self._history, self._min_frames)
+
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
+        if mask.sum() == 0:
+            out_hash.set(f"{self._name}.threshold", 0)
+            out_hash.set(f"{self._name}.intensity", [])
+            return mask
+
+        intensity = np.mean(
+            [
+                data[self._key][mask]
+                for source, (data, _) in sources.items()
+                if data.has(self._key) and (module_number(source) in self._modules)
+            ],
+            axis=0,
+        )
+        self._history.extend(intensity[-self._min_frames:])
+
+        if num_frames < self._min_frames:
+            q1, mu, q3 = np.percentile(intensity, [25, 50, 75])
+        else:
+            q1, mu, q3 = np.percentile(self._history, [25, 50, 75])
+
+        sig = (q3 - q1) / 1.34896
+        threshold = mu + self._snr * sig
+
+        hits = intensity > threshold
+
+        out_hash.set(f"{self._name}.threshold", threshold)
+        out_hash.set(f"{self._name}.intensity", intensity)
+
+        return mask & hits
diff --git a/src/calng/arbiter_kernels/litframes.py b/src/calng/arbiter_kernels/litframes.py
new file mode 100644
index 00000000..ce50b732
--- /dev/null
+++ b/src/calng/arbiter_kernels/litframes.py
@@ -0,0 +1,22 @@
+import numpy as np
+from .base_kernel import BaseArbiterKernel
+
+
+class LitFrames(BaseArbiterKernel):
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        pass
+
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
+        for source, (data, _) in sources.items():
+            if not data.has("data.nPulsePerFrame"):
+                continue
+
+            lff_data = np.array(data["data.nPulsePerFrame"])
+            if len(lff_data) != num_frames:
+                raise ValueError(
+                    f"{source}'s nPulsesPerFrame had weird size {len(lff_data)}; "
+                    f"I think there should be {num_frames} frames!"
+                )
+            return lff_data > 0
+        raise KeyError("No source had 'data.nPulsesPerFrame'")
diff --git a/src/calng/arbiter_kernels/manual.py b/src/calng/arbiter_kernels/manual.py
new file mode 100644
index 00000000..59759c1f
--- /dev/null
+++ b/src/calng/arbiter_kernels/manual.py
@@ -0,0 +1,109 @@
+import enum
+import numpy as np
+from karabo.bound import STRING_ELEMENT
+
+from .base_kernel import BaseArbiterKernel
+
+
+class FramefilterSpecType(enum.Enum):
+    RANGE = "range"
+    MASK = "mask"
+    INDICES = "indices"
+
+
+class ManualFilter(BaseArbiterKernel):
+    @staticmethod
+    def extend_device_schema(schema, prefix):
+        (
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.resize")
+            .description(
+                "If number of frames doesn't match the length of the specified "
+                "filter, what to do? If 'Error', the kernel will just fail (resulting "
+                "in all true for the arbiter). If True or False, will expand manual "
+                "filter with the corresponding value (shrinking if manual filter is "
+                "too big)."
+            )
+            .options("True,False,Error")
+            .assignmentOptional()
+            .defaultValue("False")
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.spec")
+            .description(
+                "String specifying the manual mask to apply. See 'type' for details."
+            )
+            .assignmentOptional()
+            .defaultValue("")
+            .reconfigurable()
+            .commit(),
+
+            STRING_ELEMENT(schema)
+            .key(f"{prefix}.type")
+            .description(
+                "How to specify the manual filter. 'range' means parse and pass the "
+                "spec to np.arange (so should be between 1 and 3 comma-separated "
+                "integers specifying [start], stop, [step]). 'mask' means "
+                "comma-separated list of 0 and 1 (zero means don't include, 1 means "
+                "include). 'indices' means comma-separated list of indices (these will "
+                "be included, anything else will not be)."
+            )
+            .options(",".join(ffst.value for ffst in FramefilterSpecType))
+            .assignmentOptional()
+            .defaultValue("mask")
+            .reconfigurable()
+            .commit(),
+        )
+
+    def reconfigure(self, config):
+        # TODO: handle rollback if illegal config
+        if config.has("type"):
+            self._filter_type = FramefilterSpecType(config["type"])
+        if config.has("spec"):
+            self._filter_spec = config["spec"]
+        if config.has("spec") or config.has("type"):
+            self._filter = _parse_frame_filter(self._filter_type, self._filter_spec)
+
+        if config.has("resize"):
+            if config["resize"] == "Error":
+
+                def aux(num_frames):
+                    if num_frames != self._filter.size:
+                        raise ValueError("Number of frames doesn't match my filter")
+                    return self._filter
+
+            else:
+                value = config["resize"] == "True"  # note: can't just use bool(...)
+
+                def aux(num_frames):
+                    if num_frames < self._filter.size:
+                        return self._filter[:num_frames]
+                    return np.pad(
+                        self._filter,
+                        (0, num_frames - self._filter.size),
+                        constant_values=value,
+                    )
+
+            self._resize = aux
+
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
+        return mask & self._resize(num_frames)
+
+
+def _parse_frame_filter(filter_type, filter_spec):
+    if filter_type in (FramefilterSpecType.RANGE, FramefilterSpecType.INDICES):
+        if filter_type is FramefilterSpecType.RANGE:
+            # allow exceptions to bubble up
+            numbers = tuple(int(part) for part in filter_spec.split(","))
+            indices = np.arange(*numbers, dtype=np.uint16)
+        else:
+            indices = np.fromstring(filter_spec, dtype=np.uint16, sep=",")
+        mask = np.zeros(np.max(indices)+1, dtype=bool)
+        mask[indices] = True
+    elif filter_type is FramefilterSpecType.MASK:
+        mask = np.fromstring(filter_spec, dtype=bool, sep=",")
+    else:
+        raise TypeError(f"Unknown frame filter type {filter_type}")
+    return mask
diff --git a/src/calng/arbiter_kernels/ppu_arbiter.py b/src/calng/arbiter_kernels/ppu_arbiter.py
index 04fa919d..bbc293ac 100644
--- a/src/calng/arbiter_kernels/ppu_arbiter.py
+++ b/src/calng/arbiter_kernels/ppu_arbiter.py
@@ -1,7 +1,6 @@
 import numpy as np
 
 from karabo.bound import (
-    NODE_ELEMENT,
     STRING_ELEMENT,
 )
 
@@ -9,8 +8,6 @@ from .base_kernel import BaseArbiterKernel, KernelWarning
 
 
 class PpuKernel(BaseArbiterKernel):
-    _node_name = "ppu"
-
     def __init__(self, config):
         super().__init__(config)
         self._ppu_device_id = config.get("ppuDevice")
@@ -20,10 +17,6 @@ class PpuKernel(BaseArbiterKernel):
     @staticmethod
     def extend_device_schema(schema, prefix):
         (
-            NODE_ELEMENT(schema)
-            .key(prefix)
-            .commit(),
-
             STRING_ELEMENT(schema)
             .key(f"{prefix}.ppuDevice")
             .description(
@@ -38,7 +31,7 @@ class PpuKernel(BaseArbiterKernel):
             .commit(),
         )
 
-    def consider(self, train_id, sources, num_frames):
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
         with self.warning_context(KernelWarning.MISSINGSOURCE):
             ppu_data = next(
                 data
@@ -50,6 +43,6 @@ class PpuKernel(BaseArbiterKernel):
                 self._num_trains = ppu_data["trainTrigger.numberOfTrains.value"]
 
         if train_id in range(self._target, self._target + self._num_trains):
-            return np.ones(num_frames, dtype=np.uint8)
+            return mask
         else:
             return np.zeros(num_frames, dtype=np.uint8)
diff --git a/src/calng/arbiter_kernels/random_frames.py b/src/calng/arbiter_kernels/random_frames.py
index 3e33861e..7ad6c326 100644
--- a/src/calng/arbiter_kernels/random_frames.py
+++ b/src/calng/arbiter_kernels/random_frames.py
@@ -1,6 +1,5 @@
 from karabo.bound import (
     DOUBLE_ELEMENT,
-    NODE_ELEMENT,
     Unit,
 )
 import numpy as np
@@ -8,17 +7,9 @@ from .base_kernel import BaseArbiterKernel
 
 
 class RandomSampler(BaseArbiterKernel):
-    def __init__(self, config):
-        super().__init__(config)
-        self._threshold = (100 - config.get("probability")) / 100
-
     @staticmethod
     def extend_device_schema(schema, prefix):
         (
-            NODE_ELEMENT(schema)
-            .key(prefix)
-            .commit(),
-
             DOUBLE_ELEMENT(schema)
             .key(f"{prefix}.probability")
             .unit(Unit.PERCENT)
@@ -28,7 +19,9 @@ class RandomSampler(BaseArbiterKernel):
             .commit(),
         )
 
-    def consider(self, train_id, sources, num_frames):
-        return (np.random.random(num_frames) > self._threshold).astype(
-            np.uint8, copy=False
-        )
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
+        return mask & (np.random.random(num_frames) > self._threshold)
+
+    def reconfigure(self, config):
+        if config.has("probability"):
+            self._threshold = (100 - config.get("probability")) / 100
diff --git a/src/calng/arbiter_kernels/reduce_threshold.py b/src/calng/arbiter_kernels/reduce_threshold.py
index 8c18e18b..556f55b9 100644
--- a/src/calng/arbiter_kernels/reduce_threshold.py
+++ b/src/calng/arbiter_kernels/reduce_threshold.py
@@ -2,27 +2,20 @@ import operator
 
 from karabo.bound import (
     DOUBLE_ELEMENT,
-    NODE_ELEMENT,
+    OVERWRITE_ELEMENT,
     STRING_ELEMENT,
 )
 import numpy as np
-from .base_kernel import BaseArbiterKernel, KernelWarning
+from .base_kernel import BaseArbiterKernel
 
 
 class ReduceAndThreshold(BaseArbiterKernel):
-    def __init__(self, config):
-        super().__init__(config)
-        self._key = config.get("key")
-        self._threshold = config.get("threshold")
-        self._reduction = getattr(np, config.get("reduction"))
-        self._comparator = getattr(operator, config.get("comparator"))
-
     @staticmethod
     def extend_device_schema(schema, prefix):
         (
-            NODE_ELEMENT(schema)
+            OVERWRITE_ELEMENT(schema)
             .key(prefix)
-            .description(
+            .setNewDescription(
                 "This kernel allows custom frame masks based on comparing some "
                 "reduction along modules over each frame to some threshold. "
                 "Integrated intensity will be used as a running example in the "
@@ -80,15 +73,27 @@ class ReduceAndThreshold(BaseArbiterKernel):
             .commit(),
         )
 
-    def consider(self, train_id, sources, num_frames):
-        with self.warning_context(KernelWarning.MISSINGKEY) as warn:
-            sources_with_key = [
-                data[self._key] for (data, _) in sources.values() if data.has(self._key)
-            ]
-            if not sources_with_key:
-                warn(f"No sources had '{self._key}'")
-            else:
-                return self._comparator(
-                    self._reduction(sources_with_key, axis=0), self._threshold
-                ).astype(np.uint8, copy=False)
-        return np.ones(num_frames, dtype=np.uint8)
+    def reconfigure(self, config):
+        if config.has("key"):
+            self._key = config.get("key")
+        if config.has("threshold"):
+            self._threshold = config.get("threshold")
+        if config.has("reduction"):
+            self._reduction = getattr(np, config.get("reduction"))
+        if config.has("comparator"):
+            self._comparator = getattr(operator, config.get("comparator"))
+
+    def consider(self, train_id, sources, num_frames, mask, out_hash):
+        res = np.zeros(num_frames, dtype=bool)
+        mask = mask.astype(bool, copy=False)
+        sources_with_key = [
+            data[self._key][mask]
+            for (data, _) in sources.values()
+            if data.has(self._key)
+        ]
+        if not sources_with_key:
+            raise KeyError(f"No sources had '{self._key}'")
+        res[mask] = self._comparator(
+            self._reduction(sources_with_key, axis=0), self._threshold
+        )
+        return res
diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py
index caa4ff6d..d505dbd5 100644
--- a/src/calng/base_correction.py
+++ b/src/calng/base_correction.py
@@ -1,5 +1,4 @@
 import concurrent.futures
-import contextlib
 import functools
 import itertools
 import math
@@ -18,7 +17,6 @@ from calngUtils import (
     timing,
     trackers,
 )
-from calngUtils.misc import ChainHash
 from karabo.bound import (
     BOOL_ELEMENT,
     DOUBLE_ELEMENT,
@@ -1039,7 +1037,7 @@ def add_addon_nodes(schema, device_class, prefix="addons"):
     det_name = device_class.__name__[:-len("Correction")].lower()
     device_class._available_addons = [
         addon.load()
-        for addon in entry_points().get("calng.correction_addon", [])
+        for addon in entry_points().select().get("calng.correction_addon", [])
         if not addon.extras
         or det_name in (extra[0] for extra in addon.extras)
     ]
diff --git a/src/calng/expr.py b/src/calng/expr.py
new file mode 100644
index 00000000..cf2182af
--- /dev/null
+++ b/src/calng/expr.py
@@ -0,0 +1,126 @@
+import numpy as np
+import tokenize
+from token import (
+    AMPER,
+    CIRCUMFLEX,
+    DEDENT,
+    INDENT,
+    ISEOF,
+    LPAR,
+    NAME,
+    NEWLINE,
+    NL,
+    RPAR,
+    TILDE,
+    VBAR,
+)
+from collections import deque
+
+
+class BaseExpression:
+
+    def __init__(self, expr):
+        self.expr = expr
+
+    @classmethod
+    def parse(cls, string, variables={}):
+        if not string.strip():
+            return cls(None)
+
+        stack = deque()
+        result = []
+        token_iter = tokenize.generate_tokens(iter([string]).__next__)
+        try:
+            token = next(token_iter)
+        except tokenize.TokenError:
+            raise SyntaxError("invalid syntax")
+
+        while not ISEOF(token.type):
+            tok_type = token.exact_type
+
+            if tok_type == LPAR:
+                stack.append(token)
+            elif tok_type in cls.UNARY:
+                stack.append(token)
+            elif tok_type == RPAR:
+                try:
+                    head = stack.pop()
+                    while head.exact_type != LPAR:
+                        result.append(head)
+                        head = stack.pop()
+                except IndexError:
+                    raise SyntaxError(f"unmatched ')', pos: {token.start[1]}")
+            elif tok_type in cls.BINARY:
+                prio = cls.PRIORITY[tok_type] | cls.UNARY.keys()
+                while stack and stack[-1].exact_type in prio:
+                    result.append(stack.pop())
+                stack.append(token)
+            elif tok_type in cls.BLANK:
+                pass
+            elif tok_type == NAME:
+                if token.string not in variables:
+                    raise NameError(
+                        f"name '{token.string}' is not defined, "
+                        f"pos: {token.start[1]}"
+                    )
+                result.append(token)
+            else:
+                raise SyntaxError(f"invalid syntax, pos: {token.start[1]}")
+
+            try:
+                token = next(token_iter)
+            except tokenize.TokenError:
+                raise SyntaxError(f"invalid syntax, pos: {token.end[1] + 1}")
+
+        while stack:
+            head = stack.pop()
+            if head.exact_type == LPAR:
+                raise SyntaxError("unexpected EOF while parsing")
+            result.append(head)
+
+        for token in result:
+            tok_type = token.exact_type
+            if tok_type in cls.BINARY:
+                stack.pop()
+            elif tok_type == NAME:
+                stack.append(token)
+
+        if len(stack) != 1:
+            raise SyntaxError(f"invalid syntax, pos: {stack[-1].end[1] + 1}")
+
+        return cls(result)
+
+    def eval(self, variables, default):
+        if self.expr is None:
+            return default
+        stack = deque()
+        for token in self.expr:
+            tok_type = token.exact_type
+            if tok_type in self.UNARY:
+                op = self.UNARY[tok_type]
+                stack.append(op(stack.pop()))
+            elif tok_type in self.BINARY:
+                op = self.BINARY[tok_type]
+                b = stack.pop()
+                a = stack.pop()
+                stack.append(op(a, b))
+            else:
+                stack.append(variables[token.string])
+
+        return stack.pop()
+
+
+class BinaryExpression(BaseExpression):
+    BLANK = {
+        NEWLINE,
+        NL,
+        INDENT,
+        DEDENT,
+    }
+    UNARY = {TILDE: np.logical_not}
+    BINARY = {AMPER: np.logical_and, CIRCUMFLEX: np.logical_xor, VBAR: np.logical_or}
+    PRIORITY = {
+        AMPER: {AMPER},
+        CIRCUMFLEX: {AMPER, CIRCUMFLEX},
+        VBAR: {AMPER, CIRCUMFLEX, VBAR},
+    }
diff --git a/src/calng/scenes.py b/src/calng/scenes.py
index 3eb66b0f..884ae1c8 100644
--- a/src/calng/scenes.py
+++ b/src/calng/scenes.py
@@ -9,6 +9,7 @@ from calngUtils.scene_utils import (
     Space,
     VerticalLayout,
     Vline,
+    attrs_to_dict,
     boxed,
     recursive_editable,
     scene_generator,
@@ -30,6 +31,7 @@ from karabo.common.scenemodel.api import (
     TableElementModel,
     TrendGraphModel,
     UnknownWidgetDataModel,
+    VectorGraphModel,
     VectorXYGraphModel,
     WebLinkModel,
 )
@@ -403,6 +405,72 @@ class AssemblerDeviceStatus(VerticalLayout):
         )
 
 
+@titled("Device status", width=8 * NARROW_INC)
+@boxed
+class MatcherSubclassStatus(VerticalLayout):
+    def __init__(self, device_id):
+        super().__init__(padding=0)
+        name = DisplayLabelModel(
+            keys=[f"{device_id}.deviceId"],
+            width=14 * BASE_INC,
+            height=BASE_INC,
+        )
+        state = DisplayStateColorModel(
+            show_string=True,
+            keys=[f"{device_id}.state"],
+            width=7 * BASE_INC,
+            height=BASE_INC,
+        )
+        train_id = DisplayLabelModel(
+            keys=[f"{device_id}.trainId"],
+            width=7 * BASE_INC,
+            height=BASE_INC,
+        )
+        rate = DisplayRoundedFloat(
+            keys=[f"{device_id}.outProcessing"],
+            width=7 * BASE_INC,
+            height=BASE_INC,
+        )
+        matchratio = DisplayRoundedFloat(
+            keys=[f"{device_id}.matchingRatio"],
+            width=7 * BASE_INC,
+            height=BASE_INC,
+        )
+        self.children.extend(
+            [
+                name,
+                HorizontalLayout(
+                    state,
+                    train_id,
+                    padding=0,
+                ),
+                HorizontalLayout(
+                    rate,
+                    matchratio,
+                    padding=0,
+                ),
+                DeviceSceneLinkModel(
+                    text="I'm actually a TrainMatcher",
+                    keys=[f"{device_id}.availableScenes"],
+                    target="trainMatcherScene",
+                    target_window=SceneTargetWindow.Dialog,
+                    width=14 * BASE_INC,
+                    height=BASE_INC,
+                ),
+                LabelModel(
+                    text="My geometry device:",
+                    width=14 * BASE_INC,
+                    height=BASE_INC,
+                ),
+                DisplayLabelModel(
+                    keys=[f"{device_id}.geometryDevice"],
+                    width=14 * BASE_INC,
+                    height=BASE_INC,
+                ),  # TODO: some day, get dynamic link to this friend
+            ]
+        )
+
+
 @titled("Stats")
 @boxed
 class StatsBox(HorizontalLayout):
@@ -571,54 +639,54 @@ class PreviewDisplayArea(VerticalLayout):
         device_id,
         schema_hash,
         channel_name,
+        image_key_name="image.data",
+        mask_key_name="image.mask",
         data_width=None,
         mask_width=None,
         data_height=None,
         mask_height=None,
     ):
         super().__init__()
-        try:
-            class_id = schema_hash.getAttribute(
-                f"{channel_name}.schema.image.data", "classId"
+        attrs = attrs_to_dict(
+            schema_hash.getAttributes(f"{channel_name}.schema.{image_key_name}")
+        )
+        print(attrs)
+        if attrs.get("classId") == "ImageData":
+            model = DetectorGraphModel
+            extra = dict(colormap="viridis")
+        elif attrs.get("classId") == "NDArray":
+            model = NDArrayGraphModel
+            extra = dict()
+        elif attrs.get("valueType", "").startswith("VECTOR"):
+            model = VectorGraphModel
+            extra = dict()
+        else:
+            warning = (
+                "Not sure what to do with "
+                f"{device_id}:{channel_name}.schema.{image_key_name}"
             )
-        except RuntimeError:
-            warning = f"{channel_name} in schema of {device_id} missing classId"
             print(warning)
+            print(attrs)
             self.children.append(
                 LabelModel(text=warning, width=10 * BASE_INC, height=BASE_INC)
             )
             return
-        if class_id == "ImageData":
-            self.children.extend(
-                [
-                    DetectorGraphModel(
-                        keys=[f"{device_id}.{channel_name}.schema.image.data"],
-                        colormap="viridis",
-                        width=60 * BASE_INC if data_width is None else data_width,
-                        height=18 * BASE_INC if data_height is None else data_height,
-                    ),
-                    DetectorGraphModel(
-                        keys=[f"{device_id}.{channel_name}.schema.image.mask"],
-                        colormap="viridis",
-                        width=60 * BASE_INC if mask_width is None else mask_width,
-                        height=18 * BASE_INC if mask_height is None else mask_height,
-                    ),
-                ]
+        self.children.append(
+            model(
+                keys=[f"{device_id}.{channel_name}.schema.{image_key_name}"],
+                width=60 * BASE_INC if data_width is None else data_width,
+                height=18 * BASE_INC if data_height is None else data_height,
+                **extra,
             )
-        else:
-            self.children.extend(
-                [
-                    NDArrayGraphModel(
-                        keys=[f"{device_id}.{channel_name}.schema.image.data"],
-                        width=60 * BASE_INC if data_width is None else data_width,
-                        height=18 * BASE_INC if data_height is None else data_height,
-                    ),
-                    NDArrayGraphModel(
-                        keys=[f"{device_id}.{channel_name}.schema.image.mask"],
-                        width=60 * BASE_INC if mask_width is None else mask_width,
-                        height=12 * BASE_INC if mask_height is None else mask_height,
-                    ),
-                ]
+        )
+        if mask_key_name is not None:
+            self.children.append(
+                model(
+                    keys=[f"{device_id}.{channel_name}.schema.{mask_key_name}"],
+                    width=60 * BASE_INC if mask_width is None else mask_width,
+                    height=12 * BASE_INC if mask_height is None else mask_height,
+                    **extra,
+                )
             )
 
 
@@ -1106,3 +1174,60 @@ def condition_checker_overview(device_id, schema):
             height=20 * BASE_INC,
         ),
     )
+
+
+@scene_generator
+def complex_arbiter_overview(device_id, schema):
+    schema_hash = schema_to_hash(schema)
+    return VerticalLayout(
+        HorizontalLayout(
+            VerticalLayout(
+                MatcherSubclassStatus(device_id),
+                HorizontalLayout(
+                    DisplayCommandModel(
+                        keys=[f"{device_id}.start"],
+                        width=5 * BASE_INC,
+                        height=BASE_INC,
+                    ),
+                    DisplayCommandModel(
+                        keys=[f"{device_id}.stop"],
+                        width=5 * BASE_INC,
+                        height=BASE_INC,
+                    ),
+                ),
+                EditableRow(device_id, schema_hash, "frameSelection.decision"),
+            ),
+            VerticalLayout(
+                TableElementModel(
+                    keys=[f"{device_id}.frameSelection.plan"],
+                    klass="EditableTableElement",
+                    width=20 * BASE_INC,
+                    height=10 * BASE_INC,
+                ),
+                DisplayTextLogModel(
+                    keys=[f"{device_id}.status"],
+                    width=20 * BASE_INC,
+                    height=8 * BASE_INC,
+                ),
+            ),
+        ),
+        HorizontalLayout(
+            children=[
+                recursive_editable(
+                    device_id,
+                    schema_hash,
+                    f"frameSelection.selections.{selection}",
+                )
+                for selection in schema_hash["frameSelection.selections"].getKeys()
+            ]
+        ),
+        PreviewDisplayArea(
+            device_id,
+            schema_hash,
+            "output",
+            image_key_name="data.dataFramePattern",
+            mask_key_name=None,
+            data_width=40 * BASE_INC,
+            data_height=14 * BASE_INC,
+        ),
+    )
diff --git a/tests/common_setup.py b/tests/common_setup.py
index fafe2cb7..c3d20b6d 100644
--- a/tests/common_setup.py
+++ b/tests/common_setup.py
@@ -1,5 +1,6 @@
 import contextlib
 import pathlib
+import queue
 import threading
 
 import h5py
@@ -98,3 +99,46 @@ def get_constant_from_file(path_to_file, file_name, data_set_name):
     File", "File Name", and "Data Set Name" fields displayed for a given CCV."""
     with h5py.File(caldb_store / path_to_file / file_name, "r") as fd:
         return np.array(fd[f"{data_set_name}/data"])
+
+
+def kwhash(**kwargs):
+    res = Hash()
+    for k, v in kwargs.items():
+        res[k] = v
+    return res
+
+
+class OverrideBase:
+    overrides = tuple()
+
+    @classmethod
+    @contextlib.contextmanager
+    def override_context(cls, other_cls):
+        cache = {}
+        for meth_name in cls.overrides:
+            cache[meth_name] = getattr(other_cls, meth_name)
+            setattr(other_cls, meth_name, getattr(cls, meth_name))
+        try:
+            yield
+        finally:
+            for meth_name in cls.overrides:
+                setattr(other_cls, meth_name, cache[meth_name])
+
+
+class OverridePythonDevice(OverrideBase):
+    overrides = ("writeChannel",)
+
+    def writeChannel(self, output_name, out_hash, timestamp=None):
+        self._last_written_channel = output_name
+        self._last_written_data = out_hash
+
+
+def dict_to_hash(d):
+    res = Hash()
+    for k, v in d.items():
+        if isinstance(v, dict):
+            v = dict_to_hash(v)
+        elif isinstance(v, list):
+            v = list(map(dict_to_hash, v))
+        res[k] = v
+    return res
diff --git a/tests/test_frameselection.py b/tests/test_frameselection.py
new file mode 100644
index 00000000..2399de2a
--- /dev/null
+++ b/tests/test_frameselection.py
@@ -0,0 +1,452 @@
+import pytest
+
+import numpy as np
+from common_setup import OverridePythonDevice, dict_to_hash, kwhash
+from karabo.bound import Configurator, PythonDevice
+
+from calng.FrameSelectionArbiter import (
+    SimpleFrameSelectionArbiter,
+    AdvancedFrameSelectionArbiter,
+)
+
+
+@pytest.fixture
+def advanced_arbiter():
+    configurator = Configurator(PythonDevice)
+    configurator.registerClass(AdvancedFrameSelectionArbiter)
+
+    with OverridePythonDevice.override_context(PythonDevice):
+        arbara = configurator.create(
+            AdvancedFrameSelectionArbiter,
+            kwhash(
+                geometryDevice="geometry",
+                sources=[],
+                frameSelection=kwhash(
+                    decision="",
+                ),
+            ),
+        )
+        arbara.initialization()
+        yield arbara
+
+
+@pytest.fixture
+def simple_arbiter():
+    configurator = Configurator(PythonDevice)
+    configurator.registerClass(SimpleFrameSelectionArbiter)
+
+    with OverridePythonDevice.override_context(PythonDevice):
+        arbara = configurator.create(
+            SimpleFrameSelectionArbiter,
+            kwhash(
+                geometryDevice="geometry",
+                sources=[],
+            ),
+        )
+        arbara.initialization()
+        yield arbara
+
+
+def test_simple(simple_arbiter):
+    # don't care about matching, just use one source
+    # only contains "cell table" to suggest number of frames
+    sources_dict = {
+        "arbitrarySource": (
+            kwhash(
+                image=kwhash(cellId=np.zeros(2)),
+            ),
+            None,
+        ),
+    }
+    simple_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.kernelChoice": "ManualFilter"})
+    )
+    simple_arbiter.slotReconfigure(
+        dict_to_hash(
+            {"frameSelection.kernelParameters": {"spec": "0,1", "type": "mask"}}
+        )
+    )
+    simple_arbiter.on_matched_data(1, sources_dict)
+    assert simple_arbiter._last_written_data["data.dataFramePattern"] == [False, True]
+
+
+def test_manual(simple_arbiter):
+    # don't care about matching, just use one source
+    # only contains "cell table" to suggest number of frames
+    sources_dict = {
+        "arbitrarySource": (
+            kwhash(
+                image=kwhash(cellId=np.zeros(100)),
+            ),
+            None,
+        ),
+    }
+    simple_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.kernelChoice": "ManualFilter"})
+    )
+
+    include = [10, 20, 30]
+    simple_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection.kernelParameters": {
+                    "resize": "False",
+                    "spec": ",".join(str(i) for i in include),
+                    "type": "indices",
+                }
+            }
+        )
+    )
+    simple_arbiter.on_matched_data(1, sources_dict)
+    mask = simple_arbiter._last_written_data["data.dataFramePattern"]
+    assert all(mask[i] for i in include)
+    assert not any(mask[i] for i in range(len(mask)) if i not in include)
+
+    include = list(range(1, 51, 3))
+    simple_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection.kernelParameters": {
+                    "resize": "False",
+                    "spec": "1,51,3",
+                    "type": "range",
+                }
+            }
+        )
+    )
+    simple_arbiter.on_matched_data(2, sources_dict)
+    mask = simple_arbiter._last_written_data["data.dataFramePattern"]
+    assert all(mask[i] for i in include)
+    assert not any(mask[i] for i in range(len(mask)) if i not in include)
+
+
+def test_decision(advanced_arbiter):
+    sources_dict = {
+        "arbitrarySource": (
+            kwhash(
+                image=kwhash(cellId=np.zeros(2)),
+            ),
+            None,
+        ),
+    }
+
+    advanced_arbiter.slotReconfigure(
+        kwhash(
+            frameSelection=kwhash(
+                decision="a",
+                plan=[
+                    kwhash(
+                        enable=True, name="a", kernel="ManualFilter", preselection=""
+                    ),
+                    kwhash(
+                        enable=True, name="b", kernel="ManualFilter", preselection=""
+                    ),
+                ],
+            )
+        )
+    )
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.selections.a": {"type": "mask", "spec": "0,1"}}),
+    )
+
+    advanced_arbiter.on_matched_data(1, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [False, True]
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection": {
+                    "decision": "b",
+                    "selections.b": {"type": "mask", "spec": "1,0"},
+                }
+            }
+        )
+    )
+
+    advanced_arbiter.on_matched_data(2, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [True, False]
+
+    advanced_arbiter.slotReconfigure(
+        kwhash(
+            frameSelection=kwhash(
+                decision="a | b",
+            )
+        )
+    )
+
+    advanced_arbiter.on_matched_data(3, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [True, True]
+
+    advanced_arbiter.slotReconfigure(
+        kwhash(
+            frameSelection=kwhash(
+                decision="a & b",
+            )
+        )
+    )
+
+    advanced_arbiter.on_matched_data(4, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        False,
+        False,
+    ]
+
+
+def test_preselection(advanced_arbiter):
+    sources_dict = {
+        "arbitrarySource": (
+            dict_to_hash({"image.cellId": np.zeros(2)}),
+            None,
+        ),
+    }
+    advanced_arbiter.slotReconfigure(
+        kwhash(
+            frameSelection=kwhash(
+                plan=[
+                    kwhash(
+                        enable=True, name="a", kernel="ManualFilter", preselection=""
+                    ),
+                    kwhash(
+                        enable=True, name="b", kernel="ManualFilter", preselection="a"
+                    ),
+                ],
+                decision="b",
+            ),
+        ),
+    )
+    # split in two calls as previous reconfigure should update schema
+    advanced_arbiter.slotReconfigure(
+        kwhash(
+            frameSelection=kwhash(
+                decision="b",
+                selections=kwhash(
+                    a=kwhash(spec="1,0"),
+                    b=kwhash(spec="1,1"),
+                ),
+            )
+        )
+    )
+
+    advanced_arbiter.on_matched_data(5, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [True, False]
+
+
+def test_reduce_threshold(advanced_arbiter):
+    sources_dict = {
+        "sourceA": (
+            kwhash(
+                image=kwhash(cellId=np.zeros(3)),
+                data=np.array([1, 1, -1], dtype=np.float32),
+                data2=np.array([0, 0, -1], dtype=np.float32),
+            ),
+            None,
+        ),
+        "sourceB": (
+            kwhash(
+                data=np.array([0, 1, -1], dtype=np.float32),
+                data2=np.array([1, 0, -1], dtype=np.float32),
+            ),
+            None,
+        ),
+    }
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection": {
+                    "plan": [
+                        {  # just to test that preselection mask is applied
+                            "enable": True,
+                            "name": "a",
+                            "kernel": "ManualFilter",
+                            "preselection": "",
+                        },
+                        {
+                            "enable": True,
+                            "name": "b",
+                            "kernel": "ReduceAndThreshold",
+                            "preselection": "a",
+                        },
+                    ],
+                    "decision": "b",
+                }
+            }
+        )
+    )
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection.selections.a.spec": "1,1,0",
+                "frameSelection.selections.b": {
+                    "key": "data",
+                    "threshold": 2,
+                    "reduction": "sum",
+                    "comparator": "lt",
+                },
+            }
+        )
+    )
+    advanced_arbiter.on_matched_data(1, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        True,
+        False,
+        False,
+    ]
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.selections.b.key": "data2"})
+    )
+    advanced_arbiter.on_matched_data(2, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        True,
+        True,
+        False,
+    ]
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.selections.b.threshold": 1})
+    )
+    advanced_arbiter.on_matched_data(3, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        False,
+        True,
+        False,
+    ]
+
+
+def test_boolean_ops(advanced_arbiter):
+    sources_dict = {
+        "sourceA": (
+            kwhash(
+                image=kwhash(cellId=np.zeros(3)),
+                data=np.array([1, 1, 1], dtype=bool),
+                data2=np.array([0, 0, 1], dtype=bool),
+            ),
+            None,
+        ),
+        "sourceB": (
+            kwhash(
+                data=np.array([0, 1, 1], dtype=bool),
+                data2=np.array([1, 0, 1], dtype=bool),
+            ),
+            None,
+        ),
+    }
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection": {
+                    "plan": [
+                        {  # just to test that preselection mask is applied
+                            "enable": True,
+                            "name": "a",
+                            "kernel": "ManualFilter",
+                            "preselection": "",
+                        },
+                        {
+                            "enable": True,
+                            "name": "b",
+                            "kernel": "BooleanCombination",
+                            "preselection": "a",
+                        },
+                    ],
+                    "decision": "b",
+                }
+            }
+        )
+    )
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection.selections.a.spec": "1,1,0",
+                "frameSelection.selections.b": {
+                    "key": "data",
+                    "operator": "all",
+                },
+            }
+        )
+    )
+    advanced_arbiter.on_matched_data(1, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        False,
+        True,
+        False,
+    ]
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.selections.b.key": "data2"})
+    )
+    advanced_arbiter.on_matched_data(2, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        False,
+        False,
+        False,
+    ]
+
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash({"frameSelection.selections.b.operator": "any"})
+    )
+    advanced_arbiter.on_matched_data(3, sources_dict)
+    assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [
+        True,
+        False,
+        False,
+    ]
+
+
+def test_random_frames(advanced_arbiter):
+    np.random.seed(0)
+    num_frames = 1000
+    num_blocked = 100
+    sources_dict = {
+        "arbitrarySource": (
+            kwhash(
+                image=kwhash(cellId=np.zeros(num_frames)),
+            ),
+            None,
+        ),
+    }
+    advanced_arbiter.slotReconfigure(
+        dict_to_hash(
+            {
+                "frameSelection": {
+                    "plan": [
+                        {  # just to test that preselection mask is applied
+                            "enable": True,
+                            "name": "a",
+                            "kernel": "ManualFilter",
+                            "preselection": "",
+                        },
+                        {
+                            "enable": True,
+                            "name": "b",
+                            "kernel": "RandomSampler",
+                            "preselection": "a",
+                        },
+                    ],
+                    "decision": "b",
+                }
+            }
+        )
+    )
+
+    for prob in (10, 50, 90):
+        advanced_arbiter.slotReconfigure(
+            dict_to_hash(
+                {
+                    "frameSelection.selections.a": {
+                        "spec": ",".join("0" * num_blocked),
+                        "resize": "True",
+                    },
+                    "frameSelection.selections.b": {
+                        "probability": prob,
+                    },
+                }
+            )
+        )
+        advanced_arbiter.on_matched_data(1, sources_dict)
+        res = advanced_arbiter._last_written_data["data.dataFramePattern"]
+        assert not any(res[:num_blocked])
+        expected = (num_frames - num_blocked) * (prob / 100)
+        assert expected * 0.75 <= np.count_nonzero(res) <= expected * 1.25
-- 
GitLab