diff --git a/setup.py b/setup.py index 088f2c52115acb61ff5d527953804a5a93d09a4e..2f3602f6644feb2eb13f11b27db89ba4f4b30247 100644 --- a/setup.py +++ b/setup.py @@ -35,7 +35,8 @@ setup( "PnccdCorrection = calng.corrections.PnccdCorrection:PnccdCorrection", "ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ", "ShmemTrainMatcher = calng.ShmemTrainMatcher:ShmemTrainMatcher", - "FrameSelectionArbiter = calng.FrameSelectionArbiter:FrameSelectionArbiter", # noqa + "SimpleFrameSelectionArbiter = calng.FrameSelectionArbiter:SimpleFrameSelectionArbiter", # noqa + "AdvancedFrameSelectionArbiter = calng.FrameSelectionArbiter:AdvancedFrameSelectionArbiter", # noqa "DetectorAssembler = calng.DetectorAssembler:DetectorAssembler", "Gotthard2Assembler = calng.Gotthard2Assembler:Gotthard2Assembler", "LpdminiSplitter = calng.LpdminiSplitter:LpdminiSplitter", @@ -56,7 +57,11 @@ setup( "SaturationMonitor = calng.correction_addons.saturation_monitor:SaturationMonitor",# noqa ], "calng.arbiter_kernel": [ + "Assign = calng.arbiter_kernels.base_kernel:Assign", "BooleanCombination = calng.arbiter_kernels.boolean_ops:BooleanCombination", # noqa + "IntensityThreshold = calng.arbiter_kernels.intensity_threshold:IntensityThreshold", # noqa + "LitFrames = calng.arbiter_kernels.litframes:LitFrames", + "ManualFilter = calng.arbiter_kernels.manual:ManualFilter", "Ppu = calng.arbiter_kernels.ppu_arbiter:PpuKernel", "RandomFrames = calng.arbiter_kernels.random_frames:RandomSampler", # noqa "ReduceAndThreshold = calng.arbiter_kernels.reduce_threshold:ReduceAndThreshold", # noqa diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py index 21b8ddf10e7586980fac98c8c78179dbb33f3a26..079252af21b271016b763c10121afda1f3586b98 100644 --- a/src/calng/FrameSelectionArbiter.py +++ b/src/calng/FrameSelectionArbiter.py @@ -1,50 +1,121 @@ +import enum from importlib.metadata import entry_points +import numpy as np from geometryDevices import utils as geom_utils from karabo.bound import ( + BOOL_ELEMENT, KARABO_CLASSINFO, NODE_ELEMENT, OUTPUT_CHANNEL, + OVERWRITE_ELEMENT, STRING_ELEMENT, + TABLE_ELEMENT, VECTOR_BOOL_ELEMENT, - ChannelMetaData, Epochstamp, Hash, Schema, + State, Timestamp, Trainstamp, ) from TrainMatcher import TrainMatcher -from . import utils +from . import scenes from ._version import version as deviceVersion from .arbiter_kernels.base_kernel import KernelWarning +from .expr import BinaryExpression +from .utils import WarningContextSystem -my_schema = Schema() -( - NODE_ELEMENT(my_schema) - .key("data") - .commit(), - - VECTOR_BOOL_ELEMENT(my_schema) - .key("data.dataFramePattern") - .assignmentOptional() - .defaultValue([]) - .commit(), -) + +class DeviceWarning(enum.Enum): + PLAN = "plan" + DECISION = "decision" + + +def output_schema(): + res = Schema() + ( + NODE_ELEMENT(res) + .key("data") + .commit(), + + VECTOR_BOOL_ELEMENT(res) + .key("data.dataFramePattern") + .assignmentOptional() + .defaultValue([]) + .commit(), + ) + return res kernels = [ kernel_class.load() - for kernel_class in entry_points().get("calng.arbiter_kernel", []) + for kernel_class in entry_points().select().get("calng.arbiter_kernel", []) ] -kernel_choice = { - kernel_class.__name__: kernel_class for kernel_class in kernels -} +kernel_choice = {kernel_class.__name__: kernel_class for kernel_class in kernels} + + +def selection_plan_schema(kernels): + schema = Schema() + ( + BOOL_ELEMENT(schema) + .key("enable") + .displayedName("Enable") + .assignmentOptional() + .defaultValue(False) + .reconfigurable() + .commit(), + + STRING_ELEMENT(schema) + .key("name") + .displayedName("Name") + .description( + "The name assigned to the result of running this kernel. This name can be " + "used in (later) preselection expressions and in the final decision " + "expression. Each row must have a unique name and names must be valid " + "identifiers. Each selection step gets its configuration from a schema " + "node corresponding to its name." + ) + .assignmentOptional() + .defaultValue("name") + .reconfigurable() + .commit(), + + STRING_ELEMENT(schema) + .key("kernel") + .displayedName("Kernel") + .description( + "Choose which kernel to run in this selection step. The special kernel " + "'Assign' does nothing, but by using the preselection expression, it " + "effectively assigns the result of the preselection to the given name, " + "allowing you to name combined results from other kernels." + ) + .options(",".join(kernel_choice)) + .assignmentOptional() + .defaultValue("Assign") + .reconfigurable() + .commit(), + + STRING_ELEMENT(schema) + .key("preselection") + .displayedName("Preselection") + .description( + "If not empty, this is evaluated as a logical expression (see " + "frameSelection.decision) involving previous selection steps. The " + "resulting mask is then passed to the kernel running in this selection " + "step. It is up to the kernel how the mask is used. The 'Assign' kernel " + "will just return the mask it is given." + ) + .assignmentOptional() + .defaultValue("") + .reconfigurable() + .commit(), + ) + return schema -@KARABO_CLASSINFO("FrameSelectionArbiter", deviceVersion) -class FrameSelectionArbiter(TrainMatcher.TrainMatcher): +class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): @staticmethod def expectedParameters(expected): ( @@ -52,9 +123,7 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): .key("geometryDevice") .displayedName("Geometry device") .description( - "The the device which will provide geometry. The device is expected to " - "have current geometry as a string (for now using custom non-robust " - "serialization scheme) element named serializedGeometry." + "The the device which will provide geometry." ) .assignmentMandatory() .commit(), @@ -63,134 +132,394 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): .key("frameSelection") .displayedName("Frame selection") .commit(), + ) + + def __init__(self, config): + super().__init__(config) + self.registerSlot(self.slotReceiveGeometry) + def initialization(self): + super().initialization() + self.warning_context = WarningContextSystem( + self, on_success={"state": "PASSIVE"} + ) + + self._geometry = None + if self.get("geometryDevice"): + self.signalSlotable.connect( + self.get("geometryDevice"), + "signalNewGeometry", + "", # slot device ID (default: self) + "slotReceiveGeometry", + ) + + def _guess_number_of_frames(self, sources): + # TODO: robust frame deduction + for (data, _) in sources.values(): + if data.has("image.cellId"): + return data.get("image.cellId").size + self.log.ERROR("Unable to figure out number of frames") + + def slotReceiveGeometry(self, device_id, serialized_geometry): + self.log.INFO(f"Received geometry from {device_id}") + try: + self._geometry = geom_utils.deserialize_geometry(serialized_geometry) + except Exception as e: + self.log.WARN(f"Failed to deserialize geometry; {e}") + +@KARABO_CLASSINFO("SimpleFrameSelectionArbiter", deviceVersion) +class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter): + @staticmethod + def expectedParameters(expected): + ( STRING_ELEMENT(expected) .key("frameSelection.kernelChoice") - .displayedName("Kernel to use") - .assignmentMandatory() - .options(",".join(kernel_choice.keys())) + .options(",".join(kernel_choice)) + .assignmentOptional() + .defaultValue("ManualFilter") .reconfigurable() .commit(), STRING_ELEMENT(expected) .key("frameSelection.kernelState") .setSpecialDisplayType("State") - .displayedName("Kernel state") .readOnly() .initialValue("OFF") .commit(), + ) - NODE_ELEMENT(expected) - .key("frameSelection.kernels") - .displayedName("Kernels") + def initialization(self): + super().initialization() + + self._update_schema(self["frameSelection.kernelChoice"]) + self._initialize_kernel( + self["frameSelection.kernelChoice"], self["frameSelection.kernelParameters"] + ) + + if self["state"] != State.ERROR: + self.start() # Auto-start this type of matcher. + + def _update_schema(self, kernel_name): + schema_update = Schema() + ( + NODE_ELEMENT(schema_update) + .key("frameSelection") .commit(), - OUTPUT_CHANNEL(expected) + NODE_ELEMENT(schema_update) + .key("frameSelection.kernelParameters") + .commit(), + ) + kernel_class = kernel_choice[kernel_name] + kernel_class.extend_device_schema( + schema_update, "frameSelection.kernelParameters" + ) + output_schema_update = output_schema() + kernel_class.extend_output_schema(output_schema_update, kernel_class.__name__) + ( + OUTPUT_CHANNEL(schema_update) .key("output") - .dataSchema(my_schema) + .dataSchema(output_schema_update) + .commit(), + ) + self.updateSchema(schema_update) + + def _initialize_kernel(self, class_name, conf): + kernel_class = kernel_choice[class_name] + self.kernel = kernel_class(self, kernel_class.__name__, conf) + + def preReconfigure(self, conf): + super().preReconfigure(conf) + + if conf.has("frameSelection.kernelChoice"): + name = conf["frameSelection.kernelChoice"] + self._update_schema(name) + self._initialize_kernel(name, self["frameSelection.kernelParameters"]) + if conf.has("frameSelection.kernelParameters"): + self.kernel.reconfigure(conf["frameSelection.kernelParameters"]) + + def on_matched_data(self, train_id, sources): + num_frames = self._guess_number_of_frames(sources) + + # run kernels + out_hash = Hash() + with self.warning_context( + "frameSelection.kernelState", KernelWarning.PROCESSING + ) as warn: + try: + decision = self.kernel.consider( + train_id, + sources, + num_frames, + np.ones(num_frames, dtype=bool), + out_hash, + ) + except Exception as ex: + warn(f"Kernel failed: {ex}") + decision = np.ones(num_frames, dtype=bool) + + out_hash["data.dataFramePattern"] = list(map(bool, decision)) + + # don't use TM's stored self.output as we have re-injected + self.writeChannel( + "output", + out_hash, + timestamp=Timestamp(Epochstamp(), Trainstamp(train_id)), + ) + self.info["sent"] += 1 + self.info["trainId"] = train_id + self.rate_out.update() + + +@KARABO_CLASSINFO("AdvancedFrameSelectionArbiter", deviceVersion) +class AdvancedFrameSelectionArbiter(BaseFrameSelectionArbiter): + @staticmethod + def expectedParameters(expected): + ( + OVERWRITE_ELEMENT(expected) + .key("availableScenes") + .setNewDefaultValue(["overview", "trainMatcherScene"]) + .commit(), + + TABLE_ELEMENT(expected).key("frameSelection.plan") + .displayedName("Selection Plan") + .description("") + .setColumns(selection_plan_schema(kernel_choice.keys())) + .assignmentOptional() + .defaultValue([]) + .reconfigurable() + .commit(), + + STRING_ELEMENT(expected) + .key("frameSelection.decision") + .displayedName("Decision") + .assignmentMandatory() + .reconfigurable() + .commit(), + + NODE_ELEMENT(expected) + .key("frameSelection.selections") + .displayedName("Selections") .commit(), ) - for kernel_class in kernel_choice.values(): - kernel_class.extend_device_schema( - expected, f"frameSelection.kernels.{kernel_class.__name__}" - ) def __init__(self, config): super().__init__(config) - self.registerSlot(self.slotReceiveGeometry) + self._selection_steps = {} # name -> (kernel class, preselection expression) + self._selection_kernels = {} # name -> kernel instance + self._decision_expr = BinaryExpression(None) def initialization(self): super().initialization() - self.warning_context = utils.WarningContextSystem( - self, - on_success={"frameSelection.kernelState": "ON"}, - ) - with self.warning_context("frameSelection.kernelState", KernelWarning.INIT): - self._kernel_class = kernel_choice[self.get("frameSelection.kernelChoice")] - self._kernel = self._kernel_class( - self._parameters[ - f"frameSelection.kernels.{self._kernel_class.__name__}" - ] - ) - self._kernel._device = self - self._geometry = None - if self.get("geometryDevice"): - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", + self._validate_plan_and_update_schema(self["frameSelection.plan"]) + self._initialize_kernels() + self._configure_decision(self["frameSelection.decision"]) + + if self["state"] != State.ERROR: + self.start() # Auto-start this type of matcher. + + def start(self): + super().start() + self.warning_context.on_success["state"] = "ACTIVE" + + def stop(self): + super().stop() + self.warning_context.on_success["state"] = "PASSIVE" + + def requestScene(self, params): + scene_name = params.get("name", default="") + if scene_name == "trainMatcherScene": + params["name"] = "scene" + return super().requestScene(params) + if scene_name != "overview": + self.log.INFO( + f"Unexpected scene name {scene_name} requested, " + "will just return overview" ) - self.start() # Auto-start this type of matcher. + payload = Hash("name", scene_name, "success", True) + payload["data"] = scenes.complex_arbiter_overview( + device_id=self.getInstanceId(), + schema=self.getFullSchema(), + ) + self.reply( + Hash( + "type", + "deviceScene", + "origin", + self.getInstanceId(), + "payload", + payload, + ) + ) def on_matched_data(self, train_id, sources): - # TODO: robust frame deduction - num_frames = None - for (data, _) in sources.values(): - if not data.has("image.cellId"): - continue - num_frames = data.get("image.cellId").size - break - else: - # TODO: also use warning context - self.log.WARN("Unable to figure out number of frames") + num_frames = self._guess_number_of_frames(sources) - with self.warning_context( - "frameSelection.kernelState", KernelWarning.PROCESSING - ): - decision = self._kernel.consider(train_id, sources, num_frames) - if isinstance(decision, Hash): - assert decision.has("data.dataFramePattern") - decision["data.dataFramePattern"] = list( - map(bool, decision["data.dataFramePattern"]) - ) - result = decision - else: - result = Hash() - # TODO: avoid recasting - result["data.dataFramePattern"] = list(map(bool, decision)) - self.output.write( - result, - ChannelMetaData( - f"{self.getInstanceId()}:output", - Timestamp(Epochstamp(), Trainstamp(train_id)), - ), - ) - self.output.update() + # run kernels + out_hash = Hash() + selections = self._process_selection_steps(sources, train_id, num_frames, out_hash) + + # final selection decision + decision = self._decision_expr.eval(selections, np.ones(num_frames, bool)) + out_hash["data.dataFramePattern"] = list(map(bool, decision)) + + # don't use TM's stored self.output as we have re-injected + self.writeChannel( + "output", + out_hash, + timestamp=Timestamp(Epochstamp(), Trainstamp(train_id)), + ) + self.info["sent"] += 1 + self.info["trainId"] = train_id self.rate_out.update() - def start(self): - self.set("frameSelection.kernelState", "OFF") - self.warning_context.reset() - super().start() + def _process_selection_steps(self, sources, train_id, num_frames, out_hash): + res = {} + for selection_name, kernel in self._selection_kernels.items(): + prefix = f"frameSelection.selections.{selection_name}" + with self.warning_context( + f"{prefix}.state", + KernelWarning.PRESELECTION, + ) as warn: + default_mask = np.ones(num_frames, dtype=bool) + _, selection_expr = self._selection_steps[selection_name] + try: + mask = selection_expr.eval( + res, default_mask + ) + except Exception as ex: + warn(f"Evaluating preselection for {prefix} failed: {ex}") + mask = default_mask + with self.warning_context( + f"{prefix}.state", + KernelWarning.PROCESSING, + ) as warn: + try: + res[selection_name] = kernel.consider( + train_id, sources, num_frames, mask, out_hash + ) + except Exception as ex: + warn(f"Kernel for {prefix} failed: {ex}") + res[selection_name] = np.ones(num_frames, dtype=bool) + return res + + def _configure_decision(self, decision_string): + with self.warning_context("state", DeviceWarning.DECISION) as warn: + try: + self._decision_expr = BinaryExpression.parse( + decision_string, self._selection_steps.keys() + ) + except Exception as ex: + # TODO: maybe re-raise to reject reconfiguration + warn(f"Failure to parse decision string '{decision_string}': {ex}") + + def _validate_plan_and_update_schema(self, plan): + new_selection_steps = {} + + # TODO: don't update this many levels, just updaet frameSelection.selections + schema_update = Schema() + ( + NODE_ELEMENT(schema_update) + .key("frameSelection") + .displayedName("Frame selection") + .commit(), + + NODE_ELEMENT(schema_update) + .key("frameSelection.selections") + .displayedName("Selections") + .commit(), + ) + output_schema_update = output_schema() + with self.warning_context("state", DeviceWarning.PLAN) as warn: + for selection in plan: + if not selection["enable"]: + continue + + selection_name = selection["name"].strip() + if not selection_name.isidentifier(): + warn(f"Invalid selection name '{selection_name}'") + continue + elif selection_name in new_selection_steps: + warn(f"Duplicate selection name '{selection_name}'") + continue + + try: + preselection = BinaryExpression.parse( + selection["preselection"], + new_selection_steps, # just needs to check names + ) + except Exception as ex: + warn( + "Failure to parse (or undefined variable used in) " + f"preselection for {selection_name}: {ex}" + ) + continue + + kernel_class = kernel_choice[selection["kernel"]] + + kernel_prefix = f"frameSelection.selections.{selection_name}" + ( + NODE_ELEMENT(schema_update) + .key(kernel_prefix) + .commit(), + + STRING_ELEMENT(schema_update) + .key(f"{kernel_prefix}.kernel") + .readOnly() + .initialValue(kernel_class.__name__) + .commit(), + + STRING_ELEMENT(schema_update) + .key(f"{kernel_prefix}.state") + .setSpecialDisplayType("State") + .readOnly() + .initialValue("OFF") + .commit(), + ) + kernel_class.extend_device_schema( + schema_update, kernel_prefix + ) + kernel_class.extend_output_schema(output_schema_update, selection_name) + new_selection_steps[selection_name] = (kernel_class, preselection) + + ( + OUTPUT_CHANNEL(schema_update) + .key("output") + .dataSchema(output_schema_update) + .commit(), + ) + self._selection_steps = new_selection_steps + self.updateSchema(schema_update) + + def _initialize_kernels(self): + # instantiate kernels for selections + self._selection_kernels.clear() + for selection_name, ( + kernel_class, + preselection, + ) in self._selection_steps.items(): + prefix = f"frameSelection.selections.{selection_name}" + with self.warning_context(f"{prefix}.state", KernelWarning.INIT) as warn: + try: + self._selection_kernels[selection_name] = kernel_class( + self, selection_name, self.get(prefix) + ) + except Exception as ex: + warn(f"Failed to instantiate kernel for {selection_name}: {ex}") def preReconfigure(self, conf): super().preReconfigure(conf) - self._reinstantiate_arbiter_kernel = False - if conf.has("frameSelection.kernelChoice"): - self.log.INFO("Switching frame selection kernel") - self._kernel_class = kernel_choice[conf.get("frameSelection.kernelChoice")] - self._reinstantiate_arbiter_kernel = True - elif conf.has(f"frameSelection.kernels.{self._kernel_class.__name__}"): - self.log.INFO("Reconfiguring frame selection kernel") - # TODO: update instead of rebuild for complex kernels - # (decide based on reconfigurability?) - self._reinstantiate_arbiter_kernel = True - - def postReconfigure(self): - super().postReconfigure() - if self._reinstantiate_arbiter_kernel: - self._kernel = self._kernel_class( - self._parameters[ - f"frameSelection.kernels.{self._kernel_class.__name__}" - ] - ) - self._reinstantiate_arbiter_kernel = False - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self._geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") + if conf.has("frameSelection.plan"): + self._validate_plan_and_update_schema(conf["frameSelection.plan"]) + self._initialize_kernels() + + if conf.has("frameSelection.decision"): + self._configure_decision(conf["frameSelection.decision"]) + + if conf.has("frameSelection.selections"): + for name, kernel in self._selection_kernels.items(): + kernel_prefix = f"frameSelection.selections.{name}" + if conf.has(kernel_prefix): + kernel.reconfigure(conf[kernel_prefix]) diff --git a/src/calng/arbiter_kernels/base_kernel.py b/src/calng/arbiter_kernels/base_kernel.py index 6d36a956b3be697d3edc6b3a4b65618958e81248..8ae88cab31ecff3ba8ad5a971645e72598946723 100644 --- a/src/calng/arbiter_kernels/base_kernel.py +++ b/src/calng/arbiter_kernels/base_kernel.py @@ -1,37 +1,44 @@ import enum -import numpy as np - class KernelWarning(enum.Enum): INIT = "init" - MISSINGSOURCE = "missingsource" - MISSINGKEY = "missingkey" PROCESSING = "processing" + PRESELECTION = "preselection" class BaseArbiterKernel: - def __init__(self, config): - """No need for prefix - hosting device should pass us the relevant subnode""" - self._config = config - self._device = None + def __init__(self, device, name, config): + self._device = device + self._name = name + self.reconfigure(config) @property def geometry(self): return self._device._geometry - def warning_context(self, warn_type): - return self._device.warning_context("frameSelection.kernelState", warn_type) - @staticmethod def extend_device_schema(schema, prefix): """Should add configurability to the arbiter (matcher) the kernel will be running in. Prefix will include node name, so will typically be - "frameSelection.kernels.{class.__name__}". The parent node will not have been - created already (unlike for correction addons).""" + "frameSelection.kernels.{class.__name__}". The parent node will have been + created already (unlike for correction addons), feel free to overwrite the + description.""" + pass + + @staticmethod + def extend_output_schema(schema, name): pass - def consider(self, train_id, sources, num_frames): + def consider(self, train_id, sources, num_frames, mask, out_hash): """Must return tuple of uint8 mask array of size equal to num_frames and base output hash or None.""" - return np.ones(num_frames, dtype=np.uint8), None + raise NotImplementedError() + + def reconfigure(self, config): + pass + + +class Assign(BaseArbiterKernel): + def consider(self, tid, src, nframes, mask, out): + return mask diff --git a/src/calng/arbiter_kernels/boolean_ops.py b/src/calng/arbiter_kernels/boolean_ops.py index 135516f6b25cc2fbfdaccb4233d58da425f81197..acd1caa7a921b73d23c60a67cf69d142f9f89e40 100644 --- a/src/calng/arbiter_kernels/boolean_ops.py +++ b/src/calng/arbiter_kernels/boolean_ops.py @@ -1,23 +1,18 @@ from karabo.bound import ( - NODE_ELEMENT, + OVERWRITE_ELEMENT, STRING_ELEMENT, ) import numpy as np -from .base_kernel import BaseArbiterKernel, KernelWarning +from .base_kernel import BaseArbiterKernel class BooleanCombination(BaseArbiterKernel): - def __init__(self, config): - super().__init__(config) - self._key = config.get("key") - self._operator = getattr(np, config.get("operator")) - @staticmethod def extend_device_schema(schema, prefix): ( - NODE_ELEMENT(schema) + OVERWRITE_ELEMENT(schema) .key(prefix) - .description( + .setNewDescription( "This kernel allows combining individual masks from many sources. It " "is intended for cases where, for example, each correction device via " "some addon provides its own frame mask. This arbiter kernel would " @@ -42,17 +37,22 @@ class BooleanCombination(BaseArbiterKernel): .commit(), ) - def consider(self, train_id, sources, num_frames): + def consider(self, train_id, sources, num_frames, mask, out_hash): # pretty sure this is special case of reduce and threshold in some algebra - with self.warning_context(KernelWarning.MISSINGKEY) as warn: - sources_with_key = [ - data[self._key] for (data, _) in sources.values() if data.has(self._key) - ] - if not sources_with_key: - warn(f"No sources had '{self._key}'") - else: - return self._operator( - sources_with_key, - axis=0, - ).astype(np.uint8, copy=False) - return np.ones(num_frames, dtype=np.uint8) + res = np.zeros(num_frames, dtype=bool) + mask = mask.astype(bool, copy=False) + sources_with_key = [ + data[self._key][mask] + for (data, _) in sources.values() + if data.has(self._key) + ] + if not sources_with_key: + raise KeyError(f"No sources had '{self._key}'") + res[mask] = self._operator(sources_with_key, axis=0) + return res + + def reconfigure(self, config): + if config.has("key"): + self._key = config.get("key") + if config.has("operator"): + self._operator = getattr(np, config.get("operator")) diff --git a/src/calng/arbiter_kernels/intensity_threshold.py b/src/calng/arbiter_kernels/intensity_threshold.py new file mode 100644 index 0000000000000000000000000000000000000000..d68670ad2401ff773fa0af3ba6f2e3ecaa213584 --- /dev/null +++ b/src/calng/arbiter_kernels/intensity_threshold.py @@ -0,0 +1,114 @@ +from karabo.bound import ( + DOUBLE_ELEMENT, + INT32_ELEMENT, + OVERWRITE_ELEMENT, + VECTOR_UINT32_ELEMENT, + VECTOR_DOUBLE_ELEMENT, +) +from collections import deque +import numpy as np +from .base_kernel import BaseArbiterKernel + + +def module_number(source): + token = source.split("/")[-1] + return int(token[:-8]) if token.endswith("CH0:xtdf") else None + + +class IntensityThreshold(BaseArbiterKernel): + def __init__(self, name, config, host_device): + super().__init__(name, config, host_device) + self._key = "integratedIntensity.mean" + self._modules = set(config.get("modules")) + self._snr = config.get("snr") + self._min_frames = config.get("minFrames") + self._history = deque([], self._min_frames) + + @staticmethod + def extend_device_schema(schema, prefix): + ( + OVERWRITE_ELEMENT(schema) + .key(prefix) + .setNewDescription( + "This kernel selects the frames by comparing the averaged" + "intensity to the threshold." + ) + .commit(), + + VECTOR_UINT32_ELEMENT(schema) + .key(f"{prefix}.modules") + .assignmentOptional() + .defaultValue([3, 4, 8, 15]) + .reconfigurable() + .commit(), + + DOUBLE_ELEMENT(schema) + .key(f"{prefix}.snr") + .assignmentOptional() + .defaultValue(3.5) + .reconfigurable() + .commit(), + + INT32_ELEMENT(schema) + .key(f"{prefix}.minFrames") + .assignmentOptional() + .defaultValue(100) + .reconfigurable() + .commit(), + ) + + @staticmethod + def extend_output_schema(schema, name): + ( + DOUBLE_ELEMENT(schema) + .key(f"{name}.threshold") + .assignmentOptional() + .defaultValue(np.nan) + .commit(), + + VECTOR_DOUBLE_ELEMENT(schema) + .key(f"{name}.intensity") + .assignmentOptional() + .defaultValue([]) + .commit() + ) + + def reconfigure(self, config): + if config.has("modules"): + self._modules = set(config.get("modules")) + if config.has("snr"): + self._snr = config.get("snr") + if config.has("minFrames"): + self._min_frames = config.get("minFrames") + self._history = deque(self._history, self._min_frames) + + def consider(self, train_id, sources, num_frames, mask, out_hash): + if mask.sum() == 0: + out_hash.set(f"{self._name}.threshold", 0) + out_hash.set(f"{self._name}.intensity", []) + return mask + + intensity = np.mean( + [ + data[self._key][mask] + for source, (data, _) in sources.items() + if data.has(self._key) and (module_number(source) in self._modules) + ], + axis=0, + ) + self._history.extend(intensity[-self._min_frames:]) + + if num_frames < self._min_frames: + q1, mu, q3 = np.percentile(intensity, [25, 50, 75]) + else: + q1, mu, q3 = np.percentile(self._history, [25, 50, 75]) + + sig = (q3 - q1) / 1.34896 + threshold = mu + self._snr * sig + + hits = intensity > threshold + + out_hash.set(f"{self._name}.threshold", threshold) + out_hash.set(f"{self._name}.intensity", intensity) + + return mask & hits diff --git a/src/calng/arbiter_kernels/litframes.py b/src/calng/arbiter_kernels/litframes.py new file mode 100644 index 0000000000000000000000000000000000000000..ce50b732dddadd2fd3985abdf90c39426f8cbee0 --- /dev/null +++ b/src/calng/arbiter_kernels/litframes.py @@ -0,0 +1,22 @@ +import numpy as np +from .base_kernel import BaseArbiterKernel + + +class LitFrames(BaseArbiterKernel): + @staticmethod + def extend_device_schema(schema, prefix): + pass + + def consider(self, train_id, sources, num_frames, mask, out_hash): + for source, (data, _) in sources.items(): + if not data.has("data.nPulsePerFrame"): + continue + + lff_data = np.array(data["data.nPulsePerFrame"]) + if len(lff_data) != num_frames: + raise ValueError( + f"{source}'s nPulsesPerFrame had weird size {len(lff_data)}; " + f"I think there should be {num_frames} frames!" + ) + return lff_data > 0 + raise KeyError("No source had 'data.nPulsesPerFrame'") diff --git a/src/calng/arbiter_kernels/manual.py b/src/calng/arbiter_kernels/manual.py new file mode 100644 index 0000000000000000000000000000000000000000..59759c1f24f7d2cc3eddaea86b37124123e7622d --- /dev/null +++ b/src/calng/arbiter_kernels/manual.py @@ -0,0 +1,109 @@ +import enum +import numpy as np +from karabo.bound import STRING_ELEMENT + +from .base_kernel import BaseArbiterKernel + + +class FramefilterSpecType(enum.Enum): + RANGE = "range" + MASK = "mask" + INDICES = "indices" + + +class ManualFilter(BaseArbiterKernel): + @staticmethod + def extend_device_schema(schema, prefix): + ( + STRING_ELEMENT(schema) + .key(f"{prefix}.resize") + .description( + "If number of frames doesn't match the length of the specified " + "filter, what to do? If 'Error', the kernel will just fail (resulting " + "in all true for the arbiter). If True or False, will expand manual " + "filter with the corresponding value (shrinking if manual filter is " + "too big)." + ) + .options("True,False,Error") + .assignmentOptional() + .defaultValue("False") + .reconfigurable() + .commit(), + + STRING_ELEMENT(schema) + .key(f"{prefix}.spec") + .description( + "String specifying the manual mask to apply. See 'type' for details." + ) + .assignmentOptional() + .defaultValue("") + .reconfigurable() + .commit(), + + STRING_ELEMENT(schema) + .key(f"{prefix}.type") + .description( + "How to specify the manual filter. 'range' means parse and pass the " + "spec to np.arange (so should be between 1 and 3 comma-separated " + "integers specifying [start], stop, [step]). 'mask' means " + "comma-separated list of 0 and 1 (zero means don't include, 1 means " + "include). 'indices' means comma-separated list of indices (these will " + "be included, anything else will not be)." + ) + .options(",".join(ffst.value for ffst in FramefilterSpecType)) + .assignmentOptional() + .defaultValue("mask") + .reconfigurable() + .commit(), + ) + + def reconfigure(self, config): + # TODO: handle rollback if illegal config + if config.has("type"): + self._filter_type = FramefilterSpecType(config["type"]) + if config.has("spec"): + self._filter_spec = config["spec"] + if config.has("spec") or config.has("type"): + self._filter = _parse_frame_filter(self._filter_type, self._filter_spec) + + if config.has("resize"): + if config["resize"] == "Error": + + def aux(num_frames): + if num_frames != self._filter.size: + raise ValueError("Number of frames doesn't match my filter") + return self._filter + + else: + value = config["resize"] == "True" # note: can't just use bool(...) + + def aux(num_frames): + if num_frames < self._filter.size: + return self._filter[:num_frames] + return np.pad( + self._filter, + (0, num_frames - self._filter.size), + constant_values=value, + ) + + self._resize = aux + + def consider(self, train_id, sources, num_frames, mask, out_hash): + return mask & self._resize(num_frames) + + +def _parse_frame_filter(filter_type, filter_spec): + if filter_type in (FramefilterSpecType.RANGE, FramefilterSpecType.INDICES): + if filter_type is FramefilterSpecType.RANGE: + # allow exceptions to bubble up + numbers = tuple(int(part) for part in filter_spec.split(",")) + indices = np.arange(*numbers, dtype=np.uint16) + else: + indices = np.fromstring(filter_spec, dtype=np.uint16, sep=",") + mask = np.zeros(np.max(indices)+1, dtype=bool) + mask[indices] = True + elif filter_type is FramefilterSpecType.MASK: + mask = np.fromstring(filter_spec, dtype=bool, sep=",") + else: + raise TypeError(f"Unknown frame filter type {filter_type}") + return mask diff --git a/src/calng/arbiter_kernels/ppu_arbiter.py b/src/calng/arbiter_kernels/ppu_arbiter.py index 04fa919d2bf90e64756da5c7bf06618eeb784b30..bbc293ac15b4d49f6fa265a2e3fd99ce27d38ef8 100644 --- a/src/calng/arbiter_kernels/ppu_arbiter.py +++ b/src/calng/arbiter_kernels/ppu_arbiter.py @@ -1,7 +1,6 @@ import numpy as np from karabo.bound import ( - NODE_ELEMENT, STRING_ELEMENT, ) @@ -9,8 +8,6 @@ from .base_kernel import BaseArbiterKernel, KernelWarning class PpuKernel(BaseArbiterKernel): - _node_name = "ppu" - def __init__(self, config): super().__init__(config) self._ppu_device_id = config.get("ppuDevice") @@ -20,10 +17,6 @@ class PpuKernel(BaseArbiterKernel): @staticmethod def extend_device_schema(schema, prefix): ( - NODE_ELEMENT(schema) - .key(prefix) - .commit(), - STRING_ELEMENT(schema) .key(f"{prefix}.ppuDevice") .description( @@ -38,7 +31,7 @@ class PpuKernel(BaseArbiterKernel): .commit(), ) - def consider(self, train_id, sources, num_frames): + def consider(self, train_id, sources, num_frames, mask, out_hash): with self.warning_context(KernelWarning.MISSINGSOURCE): ppu_data = next( data @@ -50,6 +43,6 @@ class PpuKernel(BaseArbiterKernel): self._num_trains = ppu_data["trainTrigger.numberOfTrains.value"] if train_id in range(self._target, self._target + self._num_trains): - return np.ones(num_frames, dtype=np.uint8) + return mask else: return np.zeros(num_frames, dtype=np.uint8) diff --git a/src/calng/arbiter_kernels/random_frames.py b/src/calng/arbiter_kernels/random_frames.py index 3e33861ec3a2e6c9a54ecd5f0f3a6939c58f5caf..7ad6c32693a33c27985e0a5cb144a5586f269a1d 100644 --- a/src/calng/arbiter_kernels/random_frames.py +++ b/src/calng/arbiter_kernels/random_frames.py @@ -1,6 +1,5 @@ from karabo.bound import ( DOUBLE_ELEMENT, - NODE_ELEMENT, Unit, ) import numpy as np @@ -8,17 +7,9 @@ from .base_kernel import BaseArbiterKernel class RandomSampler(BaseArbiterKernel): - def __init__(self, config): - super().__init__(config) - self._threshold = (100 - config.get("probability")) / 100 - @staticmethod def extend_device_schema(schema, prefix): ( - NODE_ELEMENT(schema) - .key(prefix) - .commit(), - DOUBLE_ELEMENT(schema) .key(f"{prefix}.probability") .unit(Unit.PERCENT) @@ -28,7 +19,9 @@ class RandomSampler(BaseArbiterKernel): .commit(), ) - def consider(self, train_id, sources, num_frames): - return (np.random.random(num_frames) > self._threshold).astype( - np.uint8, copy=False - ) + def consider(self, train_id, sources, num_frames, mask, out_hash): + return mask & (np.random.random(num_frames) > self._threshold) + + def reconfigure(self, config): + if config.has("probability"): + self._threshold = (100 - config.get("probability")) / 100 diff --git a/src/calng/arbiter_kernels/reduce_threshold.py b/src/calng/arbiter_kernels/reduce_threshold.py index 8c18e18be01a02686684e544c95efa493d4b476b..556f55b97e17631700f6791024a193ad1270f522 100644 --- a/src/calng/arbiter_kernels/reduce_threshold.py +++ b/src/calng/arbiter_kernels/reduce_threshold.py @@ -2,27 +2,20 @@ import operator from karabo.bound import ( DOUBLE_ELEMENT, - NODE_ELEMENT, + OVERWRITE_ELEMENT, STRING_ELEMENT, ) import numpy as np -from .base_kernel import BaseArbiterKernel, KernelWarning +from .base_kernel import BaseArbiterKernel class ReduceAndThreshold(BaseArbiterKernel): - def __init__(self, config): - super().__init__(config) - self._key = config.get("key") - self._threshold = config.get("threshold") - self._reduction = getattr(np, config.get("reduction")) - self._comparator = getattr(operator, config.get("comparator")) - @staticmethod def extend_device_schema(schema, prefix): ( - NODE_ELEMENT(schema) + OVERWRITE_ELEMENT(schema) .key(prefix) - .description( + .setNewDescription( "This kernel allows custom frame masks based on comparing some " "reduction along modules over each frame to some threshold. " "Integrated intensity will be used as a running example in the " @@ -80,15 +73,27 @@ class ReduceAndThreshold(BaseArbiterKernel): .commit(), ) - def consider(self, train_id, sources, num_frames): - with self.warning_context(KernelWarning.MISSINGKEY) as warn: - sources_with_key = [ - data[self._key] for (data, _) in sources.values() if data.has(self._key) - ] - if not sources_with_key: - warn(f"No sources had '{self._key}'") - else: - return self._comparator( - self._reduction(sources_with_key, axis=0), self._threshold - ).astype(np.uint8, copy=False) - return np.ones(num_frames, dtype=np.uint8) + def reconfigure(self, config): + if config.has("key"): + self._key = config.get("key") + if config.has("threshold"): + self._threshold = config.get("threshold") + if config.has("reduction"): + self._reduction = getattr(np, config.get("reduction")) + if config.has("comparator"): + self._comparator = getattr(operator, config.get("comparator")) + + def consider(self, train_id, sources, num_frames, mask, out_hash): + res = np.zeros(num_frames, dtype=bool) + mask = mask.astype(bool, copy=False) + sources_with_key = [ + data[self._key][mask] + for (data, _) in sources.values() + if data.has(self._key) + ] + if not sources_with_key: + raise KeyError(f"No sources had '{self._key}'") + res[mask] = self._comparator( + self._reduction(sources_with_key, axis=0), self._threshold + ) + return res diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index caa4ff6d35352f2160a6f533269339f14cdd876b..d505dbd50dfe7aa001b32b7500f90dc7997a1d6e 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -1,5 +1,4 @@ import concurrent.futures -import contextlib import functools import itertools import math @@ -18,7 +17,6 @@ from calngUtils import ( timing, trackers, ) -from calngUtils.misc import ChainHash from karabo.bound import ( BOOL_ELEMENT, DOUBLE_ELEMENT, @@ -1039,7 +1037,7 @@ def add_addon_nodes(schema, device_class, prefix="addons"): det_name = device_class.__name__[:-len("Correction")].lower() device_class._available_addons = [ addon.load() - for addon in entry_points().get("calng.correction_addon", []) + for addon in entry_points().select().get("calng.correction_addon", []) if not addon.extras or det_name in (extra[0] for extra in addon.extras) ] diff --git a/src/calng/expr.py b/src/calng/expr.py new file mode 100644 index 0000000000000000000000000000000000000000..cf2182af7c54b3f7a18aafef6d8444ab669adaa8 --- /dev/null +++ b/src/calng/expr.py @@ -0,0 +1,126 @@ +import numpy as np +import tokenize +from token import ( + AMPER, + CIRCUMFLEX, + DEDENT, + INDENT, + ISEOF, + LPAR, + NAME, + NEWLINE, + NL, + RPAR, + TILDE, + VBAR, +) +from collections import deque + + +class BaseExpression: + + def __init__(self, expr): + self.expr = expr + + @classmethod + def parse(cls, string, variables={}): + if not string.strip(): + return cls(None) + + stack = deque() + result = [] + token_iter = tokenize.generate_tokens(iter([string]).__next__) + try: + token = next(token_iter) + except tokenize.TokenError: + raise SyntaxError("invalid syntax") + + while not ISEOF(token.type): + tok_type = token.exact_type + + if tok_type == LPAR: + stack.append(token) + elif tok_type in cls.UNARY: + stack.append(token) + elif tok_type == RPAR: + try: + head = stack.pop() + while head.exact_type != LPAR: + result.append(head) + head = stack.pop() + except IndexError: + raise SyntaxError(f"unmatched ')', pos: {token.start[1]}") + elif tok_type in cls.BINARY: + prio = cls.PRIORITY[tok_type] | cls.UNARY.keys() + while stack and stack[-1].exact_type in prio: + result.append(stack.pop()) + stack.append(token) + elif tok_type in cls.BLANK: + pass + elif tok_type == NAME: + if token.string not in variables: + raise NameError( + f"name '{token.string}' is not defined, " + f"pos: {token.start[1]}" + ) + result.append(token) + else: + raise SyntaxError(f"invalid syntax, pos: {token.start[1]}") + + try: + token = next(token_iter) + except tokenize.TokenError: + raise SyntaxError(f"invalid syntax, pos: {token.end[1] + 1}") + + while stack: + head = stack.pop() + if head.exact_type == LPAR: + raise SyntaxError("unexpected EOF while parsing") + result.append(head) + + for token in result: + tok_type = token.exact_type + if tok_type in cls.BINARY: + stack.pop() + elif tok_type == NAME: + stack.append(token) + + if len(stack) != 1: + raise SyntaxError(f"invalid syntax, pos: {stack[-1].end[1] + 1}") + + return cls(result) + + def eval(self, variables, default): + if self.expr is None: + return default + stack = deque() + for token in self.expr: + tok_type = token.exact_type + if tok_type in self.UNARY: + op = self.UNARY[tok_type] + stack.append(op(stack.pop())) + elif tok_type in self.BINARY: + op = self.BINARY[tok_type] + b = stack.pop() + a = stack.pop() + stack.append(op(a, b)) + else: + stack.append(variables[token.string]) + + return stack.pop() + + +class BinaryExpression(BaseExpression): + BLANK = { + NEWLINE, + NL, + INDENT, + DEDENT, + } + UNARY = {TILDE: np.logical_not} + BINARY = {AMPER: np.logical_and, CIRCUMFLEX: np.logical_xor, VBAR: np.logical_or} + PRIORITY = { + AMPER: {AMPER}, + CIRCUMFLEX: {AMPER, CIRCUMFLEX}, + VBAR: {AMPER, CIRCUMFLEX, VBAR}, + } diff --git a/src/calng/scenes.py b/src/calng/scenes.py index 3eb66b0fdc9ba8e5ce507023ddaa1c7caededafe..884ae1c8f700f088be936c6d15e456013e3dbf79 100644 --- a/src/calng/scenes.py +++ b/src/calng/scenes.py @@ -9,6 +9,7 @@ from calngUtils.scene_utils import ( Space, VerticalLayout, Vline, + attrs_to_dict, boxed, recursive_editable, scene_generator, @@ -30,6 +31,7 @@ from karabo.common.scenemodel.api import ( TableElementModel, TrendGraphModel, UnknownWidgetDataModel, + VectorGraphModel, VectorXYGraphModel, WebLinkModel, ) @@ -403,6 +405,72 @@ class AssemblerDeviceStatus(VerticalLayout): ) +@titled("Device status", width=8 * NARROW_INC) +@boxed +class MatcherSubclassStatus(VerticalLayout): + def __init__(self, device_id): + super().__init__(padding=0) + name = DisplayLabelModel( + keys=[f"{device_id}.deviceId"], + width=14 * BASE_INC, + height=BASE_INC, + ) + state = DisplayStateColorModel( + show_string=True, + keys=[f"{device_id}.state"], + width=7 * BASE_INC, + height=BASE_INC, + ) + train_id = DisplayLabelModel( + keys=[f"{device_id}.trainId"], + width=7 * BASE_INC, + height=BASE_INC, + ) + rate = DisplayRoundedFloat( + keys=[f"{device_id}.outProcessing"], + width=7 * BASE_INC, + height=BASE_INC, + ) + matchratio = DisplayRoundedFloat( + keys=[f"{device_id}.matchingRatio"], + width=7 * BASE_INC, + height=BASE_INC, + ) + self.children.extend( + [ + name, + HorizontalLayout( + state, + train_id, + padding=0, + ), + HorizontalLayout( + rate, + matchratio, + padding=0, + ), + DeviceSceneLinkModel( + text="I'm actually a TrainMatcher", + keys=[f"{device_id}.availableScenes"], + target="trainMatcherScene", + target_window=SceneTargetWindow.Dialog, + width=14 * BASE_INC, + height=BASE_INC, + ), + LabelModel( + text="My geometry device:", + width=14 * BASE_INC, + height=BASE_INC, + ), + DisplayLabelModel( + keys=[f"{device_id}.geometryDevice"], + width=14 * BASE_INC, + height=BASE_INC, + ), # TODO: some day, get dynamic link to this friend + ] + ) + + @titled("Stats") @boxed class StatsBox(HorizontalLayout): @@ -571,54 +639,54 @@ class PreviewDisplayArea(VerticalLayout): device_id, schema_hash, channel_name, + image_key_name="image.data", + mask_key_name="image.mask", data_width=None, mask_width=None, data_height=None, mask_height=None, ): super().__init__() - try: - class_id = schema_hash.getAttribute( - f"{channel_name}.schema.image.data", "classId" + attrs = attrs_to_dict( + schema_hash.getAttributes(f"{channel_name}.schema.{image_key_name}") + ) + print(attrs) + if attrs.get("classId") == "ImageData": + model = DetectorGraphModel + extra = dict(colormap="viridis") + elif attrs.get("classId") == "NDArray": + model = NDArrayGraphModel + extra = dict() + elif attrs.get("valueType", "").startswith("VECTOR"): + model = VectorGraphModel + extra = dict() + else: + warning = ( + "Not sure what to do with " + f"{device_id}:{channel_name}.schema.{image_key_name}" ) - except RuntimeError: - warning = f"{channel_name} in schema of {device_id} missing classId" print(warning) + print(attrs) self.children.append( LabelModel(text=warning, width=10 * BASE_INC, height=BASE_INC) ) return - if class_id == "ImageData": - self.children.extend( - [ - DetectorGraphModel( - keys=[f"{device_id}.{channel_name}.schema.image.data"], - colormap="viridis", - width=60 * BASE_INC if data_width is None else data_width, - height=18 * BASE_INC if data_height is None else data_height, - ), - DetectorGraphModel( - keys=[f"{device_id}.{channel_name}.schema.image.mask"], - colormap="viridis", - width=60 * BASE_INC if mask_width is None else mask_width, - height=18 * BASE_INC if mask_height is None else mask_height, - ), - ] + self.children.append( + model( + keys=[f"{device_id}.{channel_name}.schema.{image_key_name}"], + width=60 * BASE_INC if data_width is None else data_width, + height=18 * BASE_INC if data_height is None else data_height, + **extra, ) - else: - self.children.extend( - [ - NDArrayGraphModel( - keys=[f"{device_id}.{channel_name}.schema.image.data"], - width=60 * BASE_INC if data_width is None else data_width, - height=18 * BASE_INC if data_height is None else data_height, - ), - NDArrayGraphModel( - keys=[f"{device_id}.{channel_name}.schema.image.mask"], - width=60 * BASE_INC if mask_width is None else mask_width, - height=12 * BASE_INC if mask_height is None else mask_height, - ), - ] + ) + if mask_key_name is not None: + self.children.append( + model( + keys=[f"{device_id}.{channel_name}.schema.{mask_key_name}"], + width=60 * BASE_INC if mask_width is None else mask_width, + height=12 * BASE_INC if mask_height is None else mask_height, + **extra, + ) ) @@ -1106,3 +1174,60 @@ def condition_checker_overview(device_id, schema): height=20 * BASE_INC, ), ) + + +@scene_generator +def complex_arbiter_overview(device_id, schema): + schema_hash = schema_to_hash(schema) + return VerticalLayout( + HorizontalLayout( + VerticalLayout( + MatcherSubclassStatus(device_id), + HorizontalLayout( + DisplayCommandModel( + keys=[f"{device_id}.start"], + width=5 * BASE_INC, + height=BASE_INC, + ), + DisplayCommandModel( + keys=[f"{device_id}.stop"], + width=5 * BASE_INC, + height=BASE_INC, + ), + ), + EditableRow(device_id, schema_hash, "frameSelection.decision"), + ), + VerticalLayout( + TableElementModel( + keys=[f"{device_id}.frameSelection.plan"], + klass="EditableTableElement", + width=20 * BASE_INC, + height=10 * BASE_INC, + ), + DisplayTextLogModel( + keys=[f"{device_id}.status"], + width=20 * BASE_INC, + height=8 * BASE_INC, + ), + ), + ), + HorizontalLayout( + children=[ + recursive_editable( + device_id, + schema_hash, + f"frameSelection.selections.{selection}", + ) + for selection in schema_hash["frameSelection.selections"].getKeys() + ] + ), + PreviewDisplayArea( + device_id, + schema_hash, + "output", + image_key_name="data.dataFramePattern", + mask_key_name=None, + data_width=40 * BASE_INC, + data_height=14 * BASE_INC, + ), + ) diff --git a/tests/common_setup.py b/tests/common_setup.py index fafe2cb7078a4a20b6ef4beb7d17b41d0c05d1b9..c3d20b6d9bbc450462095441e63e2fbf6ceefa3d 100644 --- a/tests/common_setup.py +++ b/tests/common_setup.py @@ -1,5 +1,6 @@ import contextlib import pathlib +import queue import threading import h5py @@ -98,3 +99,46 @@ def get_constant_from_file(path_to_file, file_name, data_set_name): File", "File Name", and "Data Set Name" fields displayed for a given CCV.""" with h5py.File(caldb_store / path_to_file / file_name, "r") as fd: return np.array(fd[f"{data_set_name}/data"]) + + +def kwhash(**kwargs): + res = Hash() + for k, v in kwargs.items(): + res[k] = v + return res + + +class OverrideBase: + overrides = tuple() + + @classmethod + @contextlib.contextmanager + def override_context(cls, other_cls): + cache = {} + for meth_name in cls.overrides: + cache[meth_name] = getattr(other_cls, meth_name) + setattr(other_cls, meth_name, getattr(cls, meth_name)) + try: + yield + finally: + for meth_name in cls.overrides: + setattr(other_cls, meth_name, cache[meth_name]) + + +class OverridePythonDevice(OverrideBase): + overrides = ("writeChannel",) + + def writeChannel(self, output_name, out_hash, timestamp=None): + self._last_written_channel = output_name + self._last_written_data = out_hash + + +def dict_to_hash(d): + res = Hash() + for k, v in d.items(): + if isinstance(v, dict): + v = dict_to_hash(v) + elif isinstance(v, list): + v = list(map(dict_to_hash, v)) + res[k] = v + return res diff --git a/tests/test_frameselection.py b/tests/test_frameselection.py new file mode 100644 index 0000000000000000000000000000000000000000..2399de2ad54bd5b65df32a1d79184dc84b4eb01b --- /dev/null +++ b/tests/test_frameselection.py @@ -0,0 +1,452 @@ +import pytest + +import numpy as np +from common_setup import OverridePythonDevice, dict_to_hash, kwhash +from karabo.bound import Configurator, PythonDevice + +from calng.FrameSelectionArbiter import ( + SimpleFrameSelectionArbiter, + AdvancedFrameSelectionArbiter, +) + + +@pytest.fixture +def advanced_arbiter(): + configurator = Configurator(PythonDevice) + configurator.registerClass(AdvancedFrameSelectionArbiter) + + with OverridePythonDevice.override_context(PythonDevice): + arbara = configurator.create( + AdvancedFrameSelectionArbiter, + kwhash( + geometryDevice="geometry", + sources=[], + frameSelection=kwhash( + decision="", + ), + ), + ) + arbara.initialization() + yield arbara + + +@pytest.fixture +def simple_arbiter(): + configurator = Configurator(PythonDevice) + configurator.registerClass(SimpleFrameSelectionArbiter) + + with OverridePythonDevice.override_context(PythonDevice): + arbara = configurator.create( + SimpleFrameSelectionArbiter, + kwhash( + geometryDevice="geometry", + sources=[], + ), + ) + arbara.initialization() + yield arbara + + +def test_simple(simple_arbiter): + # don't care about matching, just use one source + # only contains "cell table" to suggest number of frames + sources_dict = { + "arbitrarySource": ( + kwhash( + image=kwhash(cellId=np.zeros(2)), + ), + None, + ), + } + simple_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.kernelChoice": "ManualFilter"}) + ) + simple_arbiter.slotReconfigure( + dict_to_hash( + {"frameSelection.kernelParameters": {"spec": "0,1", "type": "mask"}} + ) + ) + simple_arbiter.on_matched_data(1, sources_dict) + assert simple_arbiter._last_written_data["data.dataFramePattern"] == [False, True] + + +def test_manual(simple_arbiter): + # don't care about matching, just use one source + # only contains "cell table" to suggest number of frames + sources_dict = { + "arbitrarySource": ( + kwhash( + image=kwhash(cellId=np.zeros(100)), + ), + None, + ), + } + simple_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.kernelChoice": "ManualFilter"}) + ) + + include = [10, 20, 30] + simple_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection.kernelParameters": { + "resize": "False", + "spec": ",".join(str(i) for i in include), + "type": "indices", + } + } + ) + ) + simple_arbiter.on_matched_data(1, sources_dict) + mask = simple_arbiter._last_written_data["data.dataFramePattern"] + assert all(mask[i] for i in include) + assert not any(mask[i] for i in range(len(mask)) if i not in include) + + include = list(range(1, 51, 3)) + simple_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection.kernelParameters": { + "resize": "False", + "spec": "1,51,3", + "type": "range", + } + } + ) + ) + simple_arbiter.on_matched_data(2, sources_dict) + mask = simple_arbiter._last_written_data["data.dataFramePattern"] + assert all(mask[i] for i in include) + assert not any(mask[i] for i in range(len(mask)) if i not in include) + + +def test_decision(advanced_arbiter): + sources_dict = { + "arbitrarySource": ( + kwhash( + image=kwhash(cellId=np.zeros(2)), + ), + None, + ), + } + + advanced_arbiter.slotReconfigure( + kwhash( + frameSelection=kwhash( + decision="a", + plan=[ + kwhash( + enable=True, name="a", kernel="ManualFilter", preselection="" + ), + kwhash( + enable=True, name="b", kernel="ManualFilter", preselection="" + ), + ], + ) + ) + ) + advanced_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.selections.a": {"type": "mask", "spec": "0,1"}}), + ) + + advanced_arbiter.on_matched_data(1, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [False, True] + + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection": { + "decision": "b", + "selections.b": {"type": "mask", "spec": "1,0"}, + } + } + ) + ) + + advanced_arbiter.on_matched_data(2, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [True, False] + + advanced_arbiter.slotReconfigure( + kwhash( + frameSelection=kwhash( + decision="a | b", + ) + ) + ) + + advanced_arbiter.on_matched_data(3, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [True, True] + + advanced_arbiter.slotReconfigure( + kwhash( + frameSelection=kwhash( + decision="a & b", + ) + ) + ) + + advanced_arbiter.on_matched_data(4, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + False, + False, + ] + + +def test_preselection(advanced_arbiter): + sources_dict = { + "arbitrarySource": ( + dict_to_hash({"image.cellId": np.zeros(2)}), + None, + ), + } + advanced_arbiter.slotReconfigure( + kwhash( + frameSelection=kwhash( + plan=[ + kwhash( + enable=True, name="a", kernel="ManualFilter", preselection="" + ), + kwhash( + enable=True, name="b", kernel="ManualFilter", preselection="a" + ), + ], + decision="b", + ), + ), + ) + # split in two calls as previous reconfigure should update schema + advanced_arbiter.slotReconfigure( + kwhash( + frameSelection=kwhash( + decision="b", + selections=kwhash( + a=kwhash(spec="1,0"), + b=kwhash(spec="1,1"), + ), + ) + ) + ) + + advanced_arbiter.on_matched_data(5, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [True, False] + + +def test_reduce_threshold(advanced_arbiter): + sources_dict = { + "sourceA": ( + kwhash( + image=kwhash(cellId=np.zeros(3)), + data=np.array([1, 1, -1], dtype=np.float32), + data2=np.array([0, 0, -1], dtype=np.float32), + ), + None, + ), + "sourceB": ( + kwhash( + data=np.array([0, 1, -1], dtype=np.float32), + data2=np.array([1, 0, -1], dtype=np.float32), + ), + None, + ), + } + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection": { + "plan": [ + { # just to test that preselection mask is applied + "enable": True, + "name": "a", + "kernel": "ManualFilter", + "preselection": "", + }, + { + "enable": True, + "name": "b", + "kernel": "ReduceAndThreshold", + "preselection": "a", + }, + ], + "decision": "b", + } + } + ) + ) + + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection.selections.a.spec": "1,1,0", + "frameSelection.selections.b": { + "key": "data", + "threshold": 2, + "reduction": "sum", + "comparator": "lt", + }, + } + ) + ) + advanced_arbiter.on_matched_data(1, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + True, + False, + False, + ] + + advanced_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.selections.b.key": "data2"}) + ) + advanced_arbiter.on_matched_data(2, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + True, + True, + False, + ] + + advanced_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.selections.b.threshold": 1}) + ) + advanced_arbiter.on_matched_data(3, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + False, + True, + False, + ] + + +def test_boolean_ops(advanced_arbiter): + sources_dict = { + "sourceA": ( + kwhash( + image=kwhash(cellId=np.zeros(3)), + data=np.array([1, 1, 1], dtype=bool), + data2=np.array([0, 0, 1], dtype=bool), + ), + None, + ), + "sourceB": ( + kwhash( + data=np.array([0, 1, 1], dtype=bool), + data2=np.array([1, 0, 1], dtype=bool), + ), + None, + ), + } + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection": { + "plan": [ + { # just to test that preselection mask is applied + "enable": True, + "name": "a", + "kernel": "ManualFilter", + "preselection": "", + }, + { + "enable": True, + "name": "b", + "kernel": "BooleanCombination", + "preselection": "a", + }, + ], + "decision": "b", + } + } + ) + ) + + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection.selections.a.spec": "1,1,0", + "frameSelection.selections.b": { + "key": "data", + "operator": "all", + }, + } + ) + ) + advanced_arbiter.on_matched_data(1, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + False, + True, + False, + ] + + advanced_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.selections.b.key": "data2"}) + ) + advanced_arbiter.on_matched_data(2, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + False, + False, + False, + ] + + advanced_arbiter.slotReconfigure( + dict_to_hash({"frameSelection.selections.b.operator": "any"}) + ) + advanced_arbiter.on_matched_data(3, sources_dict) + assert advanced_arbiter._last_written_data["data.dataFramePattern"] == [ + True, + False, + False, + ] + + +def test_random_frames(advanced_arbiter): + np.random.seed(0) + num_frames = 1000 + num_blocked = 100 + sources_dict = { + "arbitrarySource": ( + kwhash( + image=kwhash(cellId=np.zeros(num_frames)), + ), + None, + ), + } + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection": { + "plan": [ + { # just to test that preselection mask is applied + "enable": True, + "name": "a", + "kernel": "ManualFilter", + "preselection": "", + }, + { + "enable": True, + "name": "b", + "kernel": "RandomSampler", + "preselection": "a", + }, + ], + "decision": "b", + } + } + ) + ) + + for prob in (10, 50, 90): + advanced_arbiter.slotReconfigure( + dict_to_hash( + { + "frameSelection.selections.a": { + "spec": ",".join("0" * num_blocked), + "resize": "True", + }, + "frameSelection.selections.b": { + "probability": prob, + }, + } + ) + ) + advanced_arbiter.on_matched_data(1, sources_dict) + res = advanced_arbiter._last_written_data["data.dataFramePattern"] + assert not any(res[:num_blocked]) + expected = (num_frames - num_blocked) * (prob / 100) + assert expected * 0.75 <= np.count_nonzero(res) <= expected * 1.25