diff --git a/DEPENDS b/DEPENDS index dbaeafee7d75ca7a9ba021ff5b5925d5ede31875..26a5109cef1ecbaca648556f4e7968a521c6cdc4 100644 --- a/DEPENDS +++ b/DEPENDS @@ -1,4 +1,4 @@ TrainMatcher, 2.4.7 calibrationClient, 11.3.0 -calibration/geometryDevices, 0.0.6 -calibration/calngUtils, 0.0.6 +calibration/geometryDevices, 0.0.7 +calibration/calngUtils, 0.0.7 diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 7c016419971b20f573c3eb158cf7ed05c72b2020..346602dfac80bcbc4f205ae6ef6c03251e17ee60 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -1058,22 +1058,6 @@ class CalibrationManager(DeviceClientBase, Device): if info['status'].startswith(state): return - async def _ensure_assemblers_active(self): - """Ensure assemblers are in ACTIVE state.""" - - # This coroutine may throw an AsyncTimeoutError to be handled - # by the calling site. - - with ExitStack() as stack: - assemblers = [stack.enter_context( - await wait_for(getDevice(device_id), timeout=3)) - for device_id in self._assembler_device_ids] - - await wait_for(gather(*[ - waitUntil(lambda: assembler.state == State.ACTIVE) - for assembler in assemblers]), - timeout=3) - async def _check_servers(self): """Validate device server configuration.""" @@ -1419,17 +1403,6 @@ class CalibrationManager(DeviceClientBase, Device): # Force managed DAQ settings. await self._apply_managed_values(daq=True) - # Try waiting until assemblers are active. - try: - await self._ensure_assemblers_active() - except AsyncTimeoutError: - self._set_status('One or more assemblers missing or not in ' - 'active state after timeout - geometry may ' - 'require manual update') - - # Ask the geometry device to re-send its geometry. - callNoWait(self.geometryDevice.value, 'sendGeometry') - self._set_status('All devices instantiated') self.state = State.ACTIVE diff --git a/src/calng/DetectorAssembler.py b/src/calng/DetectorAssembler.py index b4f6c06d94be9aa7b1ceea21673951818068ed77..84f7b7d5a8c21e5db1fd9ed8656c207f89e80819 100644 --- a/src/calng/DetectorAssembler.py +++ b/src/calng/DetectorAssembler.py @@ -4,8 +4,7 @@ from timeit import default_timer import numpy as np import xarray as xr -from calngUtils import device as device_utils, trackers -from geometryDevices import utils as geom_utils +from calngUtils import device as device_utils, geom_utils, trackers from karabo.bound import ( BOOL_ELEMENT, DOUBLE_ELEMENT, @@ -150,6 +149,19 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): .assignmentMandatory() .commit(), + STRING_ELEMENT(expected) + .key("geometryState") + .displayedName("Geometry state") + .description( + "The assembler needs a detector geometry to function. This is gotten " + "from a geometry device (see the geometryDevice parameter). If this " + "state is ERROR, no assembly will take place - check the geometry " + "device!" + ) + .readOnly() + .initialValue("INIT") + .commit(), + STRING_ELEMENT(expected) .key("sourceIndexPattern") .displayedName("Source index regex") @@ -192,7 +204,6 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): self._processing_time_tracker = trackers.ExponentialMovingAverage( alpha=0.3 ) - self.registerSlot(self.slotReceiveGeometry) self._need_to_update_source_index_mapping = True def initialization(self): @@ -201,19 +212,24 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): self._preview_friend = PreviewFriend(self, self._preview_outputs, "preview") self._image_data_path = self.get("imageDataPath") self._image_mask_path = self.get("imageMaskPath") - self._geometry = None # set up source to index mapping self._merge_source_to_index_from_regex() self._set_source_to_index_from_table() self.KARABO_SLOT(self.requestScene) - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", + self._geometry = None + def _on_geometry(geometry): + if self._geometry is None: + self.set("geometryState", "ACTIVE") + self._geometry = geometry + self.log.INFO("Received new geometry") + geom_utils.subscribe_to_geometry_bound( + self, self.get("geometryDevice"), _on_geometry ) + if self._geometry is None: + # initial get failed + self.set("geometryState", "ERROR") self.assembled_output = self.signalSlotable.getOutputChannel("assembledOutput") self.start() @@ -357,6 +373,7 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): payload["data"] = scenes.detector_assembler_overview( device_id=self.getInstanceId(), schema=self.getFullSchema(), + geometry_device_id=self.get("geometryDevice"), ) self.reply( Hash( @@ -369,21 +386,13 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): ) ) - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self._geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") - return - # TODO: test with multiple memory cells (extra geom notion of extra dimensions) - # NOTE: could almost test n_modules against _source_to_index JF inconsistent - def on_matched_data(self, train_id, sources): ts_start = default_timer() + if self._geometry is None: - self.log.WARN("Have not received a geometry yet, will not send anything") + # should have already set up warning return + my_timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) my_device_id = self.getInstanceId() bridge_output_choice = BridgeOutputOptions( diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py index 079252af21b271016b763c10121afda1f3586b98..5b7ffed65c2977550da245000827ec811279c8cb 100644 --- a/src/calng/FrameSelectionArbiter.py +++ b/src/calng/FrameSelectionArbiter.py @@ -2,7 +2,7 @@ import enum from importlib.metadata import entry_points import numpy as np -from geometryDevices import utils as geom_utils +from calngUtils import geom_utils from karabo.bound import ( BOOL_ELEMENT, KARABO_CLASSINFO, @@ -136,7 +136,6 @@ class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): def __init__(self, config): super().__init__(config) - self.registerSlot(self.slotReceiveGeometry) def initialization(self): super().initialization() @@ -145,13 +144,6 @@ class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): ) self._geometry = None - if self.get("geometryDevice"): - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", - ) def _guess_number_of_frames(self, sources): # TODO: robust frame deduction @@ -160,12 +152,6 @@ class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): return data.get("image.cellId").size self.log.ERROR("Unable to figure out number of frames") - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self._geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") @KARABO_CLASSINFO("SimpleFrameSelectionArbiter", deviceVersion) class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter): @@ -195,6 +181,9 @@ class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter): self._initialize_kernel( self["frameSelection.kernelChoice"], self["frameSelection.kernelParameters"] ) + geom_utils.subscribe_to_geometry_bound( + self, self.get("geometryDevice"), on_geometry=self._on_geometry + ) if self["state"] != State.ERROR: self.start() # Auto-start this type of matcher. @@ -270,6 +259,9 @@ class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter): self.info["trainId"] = train_id self.rate_out.update() + def _on_geometry(self, geometry): + self.kernel.on_new_geometry(geometry) + @KARABO_CLASSINFO("AdvancedFrameSelectionArbiter", deviceVersion) class AdvancedFrameSelectionArbiter(BaseFrameSelectionArbiter): @@ -315,6 +307,9 @@ class AdvancedFrameSelectionArbiter(BaseFrameSelectionArbiter): self._validate_plan_and_update_schema(self["frameSelection.plan"]) self._initialize_kernels() self._configure_decision(self["frameSelection.decision"]) + geom_utils.subscribe_to_geometry_bound( + self, self.get("geometryDevice"), on_geometry=self._on_geometry + ) if self["state"] != State.ERROR: self.start() # Auto-start this type of matcher. @@ -523,3 +518,7 @@ class AdvancedFrameSelectionArbiter(BaseFrameSelectionArbiter): kernel_prefix = f"frameSelection.selections.{name}" if conf.has(kernel_prefix): kernel.reconfigure(conf[kernel_prefix]) + + def _on_geometry(self, geometry): + for kernel in self._selection_kernels.values(): + kernel.on_new_geometry(geometry) diff --git a/src/calng/arbiter_kernels/base_kernel.py b/src/calng/arbiter_kernels/base_kernel.py index 8ae88cab31ecff3ba8ad5a971645e72598946723..7ccd96bd57da73bce010fb7a9b63082a5b710e4d 100644 --- a/src/calng/arbiter_kernels/base_kernel.py +++ b/src/calng/arbiter_kernels/base_kernel.py @@ -13,10 +13,6 @@ class BaseArbiterKernel: self._name = name self.reconfigure(config) - @property - def geometry(self): - return self._device._geometry - @staticmethod def extend_device_schema(schema, prefix): """Should add configurability to the arbiter (matcher) the kernel will be @@ -38,6 +34,15 @@ class BaseArbiterKernel: def reconfigure(self, config): pass + @property + def geometry(self): + """Same as BaseCorrectionAddon.geometry""" + return self._device._geometry + + def on_new_geometry(self, geometry): + """Same as BaseCorrectionAddon.on_new_geometry""" + pass + class Assign(BaseArbiterKernel): def consider(self, tid, src, nframes, mask, out): diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index de7e991083a6e294a82f13ea38799475b2f10fbc..459ef83f37da4c27d289b9d0b71ff53f47252204 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -2,16 +2,15 @@ import concurrent.futures import functools import itertools import math -import pathlib import threading from importlib.metadata import entry_points from timeit import default_timer import dateutil.parser import numpy as np -from geometryDevices import utils as geom_utils from calngUtils import ( device as device_utils, + geom_utils, scene_utils, shmem_utils, timing, @@ -435,9 +434,6 @@ class BaseCorrection(PythonDevice): self._last_cell_table = None # used to check whether to update property self._last_pulse_table = None # ditto - though not all detectors have this - # register slots - self.registerSlot(self.slotReceiveGeometry) - def constant_override_fun(friend_fun, constant, preserve_fields): def aux(): self.flush_constants( @@ -527,15 +523,6 @@ class BaseCorrection(PythonDevice): f"preview:{spec.name}" for spec in self._preview_outputs ] - self.geometry = None - if self.get("geometryDevice"): - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", - ) - self._buffered_status_update = Hash() self._processing_time_tracker = trackers.ExponentialMovingAverage( alpha=0.3 @@ -582,6 +569,18 @@ class BaseCorrection(PythonDevice): # update device schema in one fell swoop self.updateSchema(schema_override) + self._geometry = None + def _on_geometry(geometry): + self._geometry = geometry + for addon in self._enabled_addons: + addon.on_new_geometry(self._geometry) + geom_utils.subscribe_to_geometry_bound( + self, + self.get("geometryDevice"), + on_geometry=_on_geometry, + warn_fun=self.log_status_warn, + ) + self.updateState(State.ON) def __del__(self): @@ -735,13 +734,6 @@ class BaseCorrection(PythonDevice): response["payload"] = payload self.reply(response) - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self.geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") - def _get_data_from_hash(self, data_hash): """Will get image data, cell table, pulse table, and list of other arrays from the data hash. Assumes XTDF (image.data, image.cellId, image.pulseId, and no diff --git a/src/calng/correction_addons/base_addon.py b/src/calng/correction_addons/base_addon.py index 8e6d8690b8e2c04ae49c13566cdba7b84cc02e95..b5578c1e5166948d5bbfbad8cf5075c28c477dd7 100644 --- a/src/calng/correction_addons/base_addon.py +++ b/src/calng/correction_addons/base_addon.py @@ -19,12 +19,6 @@ class BaseCorrectionAddon: """Will be given the node from extend_device_schema, no prefix needed here""" pass - @property - def geometry(self): - """Helper to get current geometry (host device should be monitoring a geometry - device)""" - return self._device._geometry - def post_correction( self, train_id, processed_data, cell_table, pulse_table, output_hash ): @@ -43,3 +37,16 @@ class BaseCorrectionAddon: for an example of caching and reinstanting complex kernels using many parameters when any change.""" pass + + @property + def geometry(self): + """Helper to get current geometry from host device. If addon will do expensive + precomputation only when geometry changes, use the on_new_geometry hook to + trigger this.""" + return self._device._geometry + + def on_new_geometry(self, geometry): + """Hook which is called whenever the host device receives a new geometry from + the geometry device. Useful to know when geometry changes in case you plan to + do expensive precomputation based on it (ex. azimuthal integration).""" + pass diff --git a/src/calng/scenes.py b/src/calng/scenes.py index 6a8894815db3f1411f6feb4dac669e1bacac4502..8db15b4919563054e0e01b948e6235a7bda89400 100644 --- a/src/calng/scenes.py +++ b/src/calng/scenes.py @@ -342,7 +342,7 @@ class CompactDeviceLinkList(VerticalLayout): @titled("Assembler status", width=8 * NARROW_INC) @boxed class AssemblerDeviceStatus(VerticalLayout): - def __init__(self, device_id): + def __init__(self, device_id, geometry_device_id): super().__init__(padding=0) name = DisplayLabelModel( keys=[f"{device_id}.deviceId"], @@ -396,11 +396,29 @@ class AssemblerDeviceStatus(VerticalLayout): width=14 * BASE_INC, height=BASE_INC, ), - DisplayLabelModel( - keys=[f"{device_id}.geometryDevice"], + # note: link based on current value when generating + DeviceSceneLinkModel( + text=geometry_device_id, + keys=[f"{geometry_device_id}.availableScenes"], + target="overview", + target_window=SceneTargetWindow.Dialog, width=14 * BASE_INC, height=BASE_INC, - ), # TODO: some day, get dynamic link to this friend + ), + HorizontalLayout( + LabelModel( + text="My geometry:", + width=7 * BASE_INC, + height=BASE_INC, + ), + DisplayStateColorModel( + show_string=True, + keys=[f"{device_id}.geometryState"], + width=7 * BASE_INC, + height=BASE_INC, + ), + padding=0, + ), ] ) @@ -1095,11 +1113,11 @@ def correction_constant_dashboard( @scene_generator -def detector_assembler_overview(device_id, schema): +def detector_assembler_overview(device_id, schema, geometry_device_id): schema_hash = schema_to_hash(schema) return VerticalLayout( HorizontalLayout( - AssemblerDeviceStatus(device_id), + AssemblerDeviceStatus(device_id, geometry_device_id), PreviewSettings(device_id, schema_hash, "preview.assembled"), ), PreviewDisplayArea(