diff --git a/docs/extensions.md b/docs/extensions.md index f26d17d2ea26b2cb2fd03d0c42ab03130ebb5bb8..36d635ef21ae952b01b6e6172d8d896036982881 100644 --- a/docs/extensions.md +++ b/docs/extensions.md @@ -69,8 +69,7 @@ This class for now containts stubs for the methods a kernel will need to provide - The `sources` argument comes from `on_matched_data` (the arbiter is a `TrainMatcher`). It is a dictionary mapping source names to tuples of data (the data hash received from the source) and timestamp. -For now, each arbiter kernel class must have a class parameter `_node_name` which will be used to add the node within the arbiter to hold the kernel configuration (the path will be `frameSelection.kernels.{kernel_class._node_name}`). -In the future, node name may instead be automatically set based on the class name. +The configuration keys added by `extend_device_schema` will go in the arbiter schema under `frameSelection.kernels.[node name]` where the node name is automatically based on the class name. Note that this node name is automatically part of the `prefix` passed to `extend_device_schema`. ## Existing arbiter kernels diff --git a/setup.py b/setup.py index af12b09747b04afa42913c79f67395cac42f69cd..8adfda45bb9c261a46661af0e227195e8e8ee9ab 100644 --- a/setup.py +++ b/setup.py @@ -30,9 +30,9 @@ setup( "Epix100Correction = calng.corrections.Epix100Correction:Epix100Correction", # noqa "Gotthard2Correction = calng.corrections.Gotthard2Correction:Gotthard2Correction", # noqa "JungfrauCorrection = calng.corrections.JungfrauCorrection:JungfrauCorrection", # noqa - "PnccdCorrection = calng.corrections.PnccdCorrection:PnccdCorrection", "LpdCorrection = calng.corrections.LpdCorrection:LpdCorrection", "LpdminiCorrection = calng.corrections.LpdminiCorrection:LpdminiCorrection", + "PnccdCorrection = calng.corrections.PnccdCorrection:PnccdCorrection", "ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ", "ShmemTrainMatcher = calng.ShmemTrainMatcher:ShmemTrainMatcher", "FrameSelectionArbiter = calng.FrameSelectionArbiter:FrameSelectionArbiter", # noqa @@ -56,9 +56,10 @@ setup( "RoiTool = calng.RoiTool:RoiTool", ], "calng.correction_addon": [ - "IntegratedIntensity = calng.correction_addons.integrated_intensity:IntegratedIntensityAddon [agipd]", # noqa - "Peakfinder9 = calng.correction_addons.peakfinder9:Peakfinder9Addon [agipd]", # noqa - "RandomFrames = calng.correction_addons.random_frames:RandomFramesAddon", + "IntegratedIntensity = calng.correction_addons.integrated_intensity:IntegratedIntensity", # noqa + "LitPixelCounter = calng.correction_addons.litpixel_counter:LitPixelCounter [agipd]", # noqa + "Peakfinder9 = calng.correction_addons.peakfinder9:Peakfinder9", # noqa + "RandomFrames = calng.correction_addons.random_frames:RandomFrames", # noqa ], "calng.arbiter_kernel": [ "BooleanCombination = calng.arbiter_kernels.boolean_ops:BooleanCombination", # noqa diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py index f19cdbbbc2e2ed47c5096464bdf632754285637e..3bfcbf4dd62a2157567f53fd91cbc38725a3de52 100644 --- a/src/calng/FrameSelectionArbiter.py +++ b/src/calng/FrameSelectionArbiter.py @@ -17,7 +17,7 @@ from karabo.bound import ( from TrainMatcher import TrainMatcher from ._version import version as deviceVersion -from . import utils +from . import utils, geom_utils my_schema = Schema() ( @@ -47,6 +47,17 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): @staticmethod def expectedParameters(expected): ( + STRING_ELEMENT(expected) + .key("geometryDevice") + .displayedName("Geometry device") + .description( + "The the device which will provide geometry. The device is expected to " + "have current geometry as a string (for now using custom non-robust " + "serialization scheme) element named serializedGeometry." + ) + .assignmentMandatory() + .commit(), + NODE_ELEMENT(expected) .key("frameSelection") .displayedName("Frame selection") @@ -72,15 +83,30 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): ) for kernel_class in kernel_choice.values(): kernel_class.extend_device_schema( - expected, f"frameSelection.kernels.{kernel_class._node_name}" + expected, f"frameSelection.kernels.{kernel_class.__name__}" ) + def __init__(self, config): + super().__init__(config) + self.registerSlot(self.slotReceiveGeometry) + def initialization(self): super().initialization() self._kernel_class = kernel_choice[self.get("frameSelection.kernelChoice")] self._kernel = self._kernel_class( - self._parameters[f"frameSelection.kernels.{self._kernel_class._node_name}"] + self._parameters[f"frameSelection.kernels.{self._kernel_class.__name__}"] ) + self._kernel._device = self + + self._geometry = None + if self.get("geometryDevice"): + self.signalSlotable.connect( + self.get("geometryDevice"), + "signalNewGeometry", + "", # slot device ID (default: self) + "slotReceiveGeometry", + ) + self.start() # Auto-start this type of matcher. def on_matched_data(self, train_id, sources): # TODO: robust frame deduction @@ -91,9 +117,16 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): break decision = self._kernel.consider(train_id, sources, num_frames) - result = Hash() - # TODO: avoid recasting - result["data.dataFramePattern"] = list(map(bool, decision)) + if isinstance(decision, Hash): + assert decision.has("data.dataFramePattern") + decision["data.dataFramePattern"] = list( + map(bool, decision["data.dataFramePattern"]) + ) + result = decision + else: + result = Hash() + # TODO: avoid recasting + result["data.dataFramePattern"] = list(map(bool, decision)) self.output.write( result, ChannelMetaData( @@ -111,7 +144,7 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): self.log.INFO("Switching frame selection kernel") self._kernel_class = kernel_choice[conf.get("frameSelection.kernelChoice")] self._reinstantiate_arbiter_kernel = True - elif conf.has(f"frameSelection.kernels.{self._kernel_class._node_name}"): + elif conf.has(f"frameSelection.kernels.{self._kernel_class.__name__}"): self.log.INFO("Reconfiguring frame selection kernel") # TODO: update instead of rebuild for complex kernels # (decide based on reconfigurability?) @@ -122,7 +155,14 @@ class FrameSelectionArbiter(TrainMatcher.TrainMatcher): if self._reinstantiate_arbiter_kernel: self._kernel = self._kernel_class( self._parameters[ - f"frameSelection.kernels.{self._kernel_class._node_name}" + f"frameSelection.kernels.{self._kernel_class.__name__}" ] ) self._reinstantiate_arbiter_kernel = False + + def slotReceiveGeometry(self, device_id, serialized_geometry): + self.log.INFO(f"Received geometry from {device_id}") + try: + self._geometry = geom_utils.deserialize_geometry(serialized_geometry) + except Exception as e: + self.log.WARN(f"Failed to deserialize geometry; {e}") diff --git a/src/calng/arbiter_kernels/base_kernel.py b/src/calng/arbiter_kernels/base_kernel.py index 0a3bfb0fb2f3ff4c5f007d56095110a49e7e25a5..3467d8643441a7254c05abda83210e3f642f2f8d 100644 --- a/src/calng/arbiter_kernels/base_kernel.py +++ b/src/calng/arbiter_kernels/base_kernel.py @@ -2,20 +2,23 @@ import numpy as np class BaseArbiterKernel: - _node_name = None # just for schema purposes, set in subclass - def __init__(self, config): """No need for prefix - hosting device should pass us the relevant subnode""" - pass + self._device = None + + @property + def geometry(self): + return self._device._geometry @staticmethod def extend_device_schema(schema, prefix): """Should add configurability to the arbiter (matcher) the kernel will be running in. Prefix will include node name, so will typically be - "frameSelection.kernels.{class._node_name}". The parent node will not have been + "frameSelection.kernels.{class.__name__}". The parent node will not have been created already (unlike for correction addons).""" pass def consider(self, train_id, sources, num_frames): - """Must return uint8 mask array of size equal to num_frames""" - return np.ones(num_frames, dtype=np.uint8) + """Must return tuple of uint8 mask array of size equal to num_frames + and base output hash or None.""" + return np.ones(num_frames, dtype=np.uint8), None diff --git a/src/calng/arbiter_kernels/boolean_ops.py b/src/calng/arbiter_kernels/boolean_ops.py index 11a805360aaefb7f31062e19b3776d36a966974e..dd081edb810f2a7a92dae057817f59965e2bba99 100644 --- a/src/calng/arbiter_kernels/boolean_ops.py +++ b/src/calng/arbiter_kernels/boolean_ops.py @@ -7,8 +7,6 @@ from .base_kernel import BaseArbiterKernel class BooleanCombination(BaseArbiterKernel): - _node_name = "booleanCombination" - def __init__(self, config): self._key = config.get("key") self._operator = getattr(np, config.get("operator")) diff --git a/src/calng/arbiter_kernels/random_frames.py b/src/calng/arbiter_kernels/random_frames.py index ef7093a87ace6d7ef87d3bdbaf4185188dd945c6..7f622dbe6255ffa0dc503a7152cc2d72e5919e4a 100644 --- a/src/calng/arbiter_kernels/random_frames.py +++ b/src/calng/arbiter_kernels/random_frames.py @@ -8,8 +8,6 @@ from .base_kernel import BaseArbiterKernel class RandomSampler(BaseArbiterKernel): - _node_name = "randomSampler" - def __init__(self, config): print("Getting config") print(config) diff --git a/src/calng/arbiter_kernels/reduce_threshold.py b/src/calng/arbiter_kernels/reduce_threshold.py index dac4abc4c90e1db5100e4e6db86c7749d3ace35b..1eaa72bc170e1fd091b5eb18fd2f50e6a63eb45f 100644 --- a/src/calng/arbiter_kernels/reduce_threshold.py +++ b/src/calng/arbiter_kernels/reduce_threshold.py @@ -10,8 +10,6 @@ from .base_kernel import BaseArbiterKernel class ReduceAndThreshold(BaseArbiterKernel): - _node_name = "reduceAndThreshold" - def __init__(self, config): self._key = config.get("key") self._threshold = config.get("threshold") diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index c2dfa5d1fe24348f5ba319b8b0a4fdf82908a7f2..b5bc81c43b90989e78032a5530209943d79b5c11 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -37,7 +37,7 @@ from karabo.bound import ( ) from karabo.common.api import KARABO_SCHEMA_DISPLAY_TYPE_SCENES as DT_SCENES -from . import geom_utils, scenes, preview_utils, shmem_utils, schemas, utils +from . import geom_utils, preview_utils, scenes, schemas, shmem_utils, utils from ._version import version as deviceVersion PROCESSING_STATE_TIMEOUT = 10 @@ -621,6 +621,7 @@ class BaseCorrection(PythonDevice): self._last_pulse_table = None # ditto - though not all detectors have this # register slots + self.registerSlot(self.slotReceiveGeometry) def constant_override_fun(friend_fun, constant, preserve_fields): def aux(): self.flush_constants( @@ -763,9 +764,9 @@ class BaseCorrection(PythonDevice): self.KARABO_ON_EOS("dataInput", self.handle_eos) self._enabled_addons = [ - addon_class(self._parameters[f"addons.{addon_class._node_name}"]) + addon_class(self._parameters[f"addons.{addon_class.__name__}"]) for addon_class in self._available_addons - if self.get(f"addons.{addon_class._node_name}.enable") + if self.get(f"addons.{addon_class.__name__}.enable") ] for addon in self._enabled_addons: addon._device = self @@ -855,7 +856,7 @@ class BaseCorrection(PythonDevice): if update.has("addons"): # note: can avoid iterating, but it's not that costly for addon in self._enabled_addons: - full_path = f"addons.{addon._node_name}" + full_path = f"addons.{addon.__class__.__name__}" if update.has(full_path): addon.reconfigure(update[full_path]) @@ -1457,17 +1458,18 @@ def add_addon_nodes(schema, device_class, prefix="addons"): for addon_class in device_class._available_addons: ( NODE_ELEMENT(schema) - .key(f"{prefix}.{addon_class._node_name}") + .key(f"{prefix}.{addon_class.__name__}") .commit(), BOOL_ELEMENT(schema) - .key(f"{prefix}.{addon_class._node_name}.enable") + .key(f"{prefix}.{addon_class.__name__}.enable") + .tags("managed") .assignmentOptional() .defaultValue(False) .commit(), ) addon_class.extend_device_schema( - schema, f"{prefix}.{addon_class._node_name}" + schema, f"{prefix}.{addon_class.__name__}" ) diff --git a/src/calng/correction_addons/base_addon.py b/src/calng/correction_addons/base_addon.py index ab134120f11bbe71d6518f6481aa733acfc2e2a7..95b5192f77d3c63e3c404150a170bdc820530016 100644 --- a/src/calng/correction_addons/base_addon.py +++ b/src/calng/correction_addons/base_addon.py @@ -1,5 +1,4 @@ class BaseCorrectionAddon: - _node_name = None # subclass must set (usually name of addon minus "Addon" suffix) _device = None # will be set to host device *after* init @staticmethod diff --git a/src/calng/correction_addons/integrated_intensity.py b/src/calng/correction_addons/integrated_intensity.py index 231f0722a1e1718cd0a2bdd71cd88881e983c5ab..165ffa5cda87da19bcc51deba1c764270d99f6a9 100644 --- a/src/calng/correction_addons/integrated_intensity.py +++ b/src/calng/correction_addons/integrated_intensity.py @@ -10,9 +10,7 @@ def maybe_get(a): return a -class IntegratedIntensityAddon(BaseCorrectionAddon): - _node_name = "integratedIntensity" - +class IntegratedIntensity(BaseCorrectionAddon): def __init__(self, config): global cupy import cupy diff --git a/src/calng/correction_addons/litpixel_counter.py b/src/calng/correction_addons/litpixel_counter.py new file mode 100644 index 0000000000000000000000000000000000000000..48468c46bf5032d9cfd00906c61e17e3b5fd5839 --- /dev/null +++ b/src/calng/correction_addons/litpixel_counter.py @@ -0,0 +1,57 @@ +import numpy as np + +from karabo.bound import NODE_ELEMENT, NDARRAY_ELEMENT, DOUBLE_ELEMENT + +from .base_addon import BaseCorrectionAddon + + +class LitPixelCounter(BaseCorrectionAddon): + def __init__(self, config): + global cupy + import cupy + + self._threshold = config["threshold"] + + def reconfigure(self, changed_config): + if changed_config.has("threshold"): + self._threshold = changed_config["threshold"] + + @staticmethod + def extend_output_schema(schema): + ( + NODE_ELEMENT(schema) + .key("litpixels") + .commit(), + + NDARRAY_ELEMENT(schema) + .key("litpixels.count") + .dtype('UINT32') + .commit(), + + NDARRAY_ELEMENT(schema) + .key("litpixels.unmasked") + .dtype('FLOAT') + .commit() + ) + + @staticmethod + def extend_device_schema(schema, prefix): + ( + DOUBLE_ELEMENT(schema) + .key(f"{prefix}.threshold") + .tags("managed") + .assignmentOptional() + .defaultValue(6.0) + .reconfigurable() + .commit() + ) + + def post_correction(self, processed_data, cell_table, pulse_table, output_hash): + n_cells, n_x, n_y = processed_data.shape + per_asic_data = processed_data.reshape(n_cells, 64, n_x // 64, 64, n_y // 64) + + lit_pixels = np.sum(per_asic_data > self._threshold, axis=(1, 3)) + unmasked_pixels = np.isfinite(per_asic_data).sum(axis=(1, 3)) + + output_hash["litpixels.count"] = lit_pixels.get() + output_hash["litpixels.unmasked"] = unmasked_pixels.get() diff --git a/src/calng/correction_addons/peakfinder9.py b/src/calng/correction_addons/peakfinder9.py index 8285bb97ea9312ba8111213202059733d3867083..ddbac713f39901d2541068225f38f438d7b0fa19 100644 --- a/src/calng/correction_addons/peakfinder9.py +++ b/src/calng/correction_addons/peakfinder9.py @@ -11,9 +11,7 @@ from .base_addon import BaseCorrectionAddon from .. import utils -class Peakfinder9Addon(BaseCorrectionAddon): - _node_name = "peakfinder9" - +class Peakfinder9(BaseCorrectionAddon): @staticmethod def extend_device_schema(schema, prefix): ( diff --git a/src/calng/correction_addons/random_frames.py b/src/calng/correction_addons/random_frames.py index 64903842220b09e9ab84f2f2abed4e47dcf1c9ba..8d6fb71ec5df102d50198b349ede828d873e9ee7 100644 --- a/src/calng/correction_addons/random_frames.py +++ b/src/calng/correction_addons/random_frames.py @@ -3,9 +3,7 @@ from karabo.bound import DOUBLE_ELEMENT, NDARRAY_ELEMENT, NODE_ELEMENT, Unit from .base_addon import BaseCorrectionAddon -class RandomFramesAddon(BaseCorrectionAddon): - _node_name = "randomFrames" - +class RandomFrames(BaseCorrectionAddon): @staticmethod def extend_device_schema(schema, prefix): ( diff --git a/src/calng/corrections/AgipdCorrection.py b/src/calng/corrections/AgipdCorrection.py index 041f9f4598081dc2311ff5c37950cac8fafe0dbb..2561b2d91b03ee7c7deae2bb8dbc4e81507378f8 100644 --- a/src/calng/corrections/AgipdCorrection.py +++ b/src/calng/corrections/AgipdCorrection.py @@ -757,7 +757,6 @@ class AgipdCorrection(base_correction.BaseCorrection): except Exception as e: self.log_status_warn(f"Unknown exception when loading data to GPU: {e}") - buffer_handle, buffer_array = self._shmem_buffer.next_slot() # first prepare previews (not affected by addons) self.kernel_runner.correct(self._correction_flag_preview) with self.warning_context( @@ -784,6 +783,7 @@ class AgipdCorrection(base_correction.BaseCorrection): # TODO: start writing out previews asynchronously in the background # then prepare full corrected data + buffer_handle, buffer_array = self._shmem_buffer.next_slot() if self._correction_flag_enabled != self._correction_flag_preview: self.kernel_runner.correct(self._correction_flag_enabled) for addon in self._enabled_addons: diff --git a/src/calng/corrections/DsscCorrection.py b/src/calng/corrections/DsscCorrection.py index c6de4a4cf4318d72d69bd2e5adfd79c488e9c8ab..568e90a5fdf54d212233e69d4dd68b7d1c0a5392 100644 --- a/src/calng/corrections/DsscCorrection.py +++ b/src/calng/corrections/DsscCorrection.py @@ -219,6 +219,10 @@ class DsscCorrection(base_correction.BaseCorrection): ) DsscCalcatFriend.add_schema(expected) base_correction.add_addon_nodes(expected, DsscCorrection) + base_correction.add_correction_step_schema( + expected, + DsscCorrection._correction_steps, + ) base_correction.add_preview_outputs(expected, DsscCorrection._preview_outputs) ( # support both CPU and GPU kernels @@ -236,10 +240,6 @@ class DsscCorrection(base_correction.BaseCorrection): .reconfigurable() .commit(), ) - base_correction.add_correction_step_schema( - expected, - DsscCorrection._correction_steps, - ) @property def input_data_shape(self): diff --git a/src/calng/corrections/Epix100Correction.py b/src/calng/corrections/Epix100Correction.py index 47c50cb867a824f69790971153eed0aca5d711b0..cf9e85c134b7ab730ac846d77ad99ec565f1a770 100644 --- a/src/calng/corrections/Epix100Correction.py +++ b/src/calng/corrections/Epix100Correction.py @@ -251,7 +251,7 @@ class Epix100CpuRunner(base_kernel_runner.BaseKernelRunner): if flags & CorrectionFlags.COMMONMODE: # per rectangular block that looks like something is going on - cm_mask=( + cm_mask = ( (self._q_bad_pixel_map[q] != 0) | (output > self._q_noise_map[q] * cm_noise_sigma) ).astype(np.uint8, copy=False)