diff --git a/DEPENDS b/DEPENDS index dbaeafee7d75ca7a9ba021ff5b5925d5ede31875..26a5109cef1ecbaca648556f4e7968a521c6cdc4 100644 --- a/DEPENDS +++ b/DEPENDS @@ -1,4 +1,4 @@ TrainMatcher, 2.4.7 calibrationClient, 11.3.0 -calibration/geometryDevices, 0.0.6 -calibration/calngUtils, 0.0.6 +calibration/geometryDevices, 0.0.7 +calibration/calngUtils, 0.0.7 diff --git a/src/calng/DetectorAssembler.py b/src/calng/DetectorAssembler.py index b4f6c06d94be9aa7b1ceea21673951818068ed77..0141624c05bafea9228dcfa6c3e07dac442a0184 100644 --- a/src/calng/DetectorAssembler.py +++ b/src/calng/DetectorAssembler.py @@ -4,8 +4,7 @@ from timeit import default_timer import numpy as np import xarray as xr -from calngUtils import device as device_utils, trackers -from geometryDevices import utils as geom_utils +from calngUtils import device as device_utils, geom_utils, trackers from karabo.bound import ( BOOL_ELEMENT, DOUBLE_ELEMENT, @@ -21,6 +20,7 @@ from karabo.bound import ( ImageData, MetricPrefix, Schema, + State, Timestamp, Trainstamp, Unit, @@ -192,7 +192,6 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): self._processing_time_tracker = trackers.ExponentialMovingAverage( alpha=0.3 ) - self.registerSlot(self.slotReceiveGeometry) self._need_to_update_source_index_mapping = True def initialization(self): @@ -201,18 +200,24 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): self._preview_friend = PreviewFriend(self, self._preview_outputs, "preview") self._image_data_path = self.get("imageDataPath") self._image_mask_path = self.get("imageMaskPath") - self._geometry = None # set up source to index mapping self._merge_source_to_index_from_regex() self._set_source_to_index_from_table() self.KARABO_SLOT(self.requestScene) - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", + self._geometry = None + def _on_geometry(geometry): + if self._geometry is None: + # TODO: review with new warning system + if hasattr(self, "_state_before_geometry_missing_error"): + self.set("status", "Received geometry!") + self.updateState(self._state_before_geometry_missing_error) + del self._state_before_geometry_missing_error + self._geometry = geometry + self.log.INFO("Received new geometry") + geom_utils.subscribe_to_geometry_bound( + self, self.get("geometryDevice"), _on_geometry, warn_fun=self.log.WARN ) self.assembled_output = self.signalSlotable.getOutputChannel("assembledOutput") @@ -369,21 +374,25 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): ) ) - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self._geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") - return - # TODO: test with multiple memory cells (extra geom notion of extra dimensions) - # NOTE: could almost test n_modules against _source_to_index JF inconsistent - def on_matched_data(self, train_id, sources): ts_start = default_timer() + state = self.get("state") + if self._geometry is None: - self.log.WARN("Have not received a geometry yet, will not send anything") + if not hasattr(self, "_state_before_geometry_missing_error"): + self.log.WARN("Have not received a geometry yet, will not send anything") + self.set( + "status", + "Missing geometry, cannot assemble; " + f"check {self.get('geometryDevice')}" + ) + + if state is not State.ERROR: + self.updateState(State.ERROR) + # set or update + self._state_before_geometry_missing_error = state return + my_timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) my_device_id = self.getInstanceId() bridge_output_choice = BridgeOutputOptions( diff --git a/src/calng/FrameSelectionArbiter.py b/src/calng/FrameSelectionArbiter.py index 079252af21b271016b763c10121afda1f3586b98..637582d773152956b14fc322e7059ee631bb7e8e 100644 --- a/src/calng/FrameSelectionArbiter.py +++ b/src/calng/FrameSelectionArbiter.py @@ -2,7 +2,7 @@ import enum from importlib.metadata import entry_points import numpy as np -from geometryDevices import utils as geom_utils +from calngUtils import geom_utils from karabo.bound import ( BOOL_ELEMENT, KARABO_CLASSINFO, @@ -136,7 +136,6 @@ class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): def __init__(self, config): super().__init__(config) - self.registerSlot(self.slotReceiveGeometry) def initialization(self): super().initialization() @@ -144,14 +143,7 @@ class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): self, on_success={"state": "PASSIVE"} ) - self._geometry = None - if self.get("geometryDevice"): - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", - ) + geom_utils.subscribe_to_geometry_bound(self, self.get("geometryDevice")) def _guess_number_of_frames(self, sources): # TODO: robust frame deduction @@ -160,12 +152,6 @@ class BaseFrameSelectionArbiter(TrainMatcher.TrainMatcher): return data.get("image.cellId").size self.log.ERROR("Unable to figure out number of frames") - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self._geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") @KARABO_CLASSINFO("SimpleFrameSelectionArbiter", deviceVersion) class SimpleFrameSelectionArbiter(BaseFrameSelectionArbiter): diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index de7e991083a6e294a82f13ea38799475b2f10fbc..875b5c74d10fa1c2c01a8c49de7153286487b747 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -2,16 +2,15 @@ import concurrent.futures import functools import itertools import math -import pathlib import threading from importlib.metadata import entry_points from timeit import default_timer import dateutil.parser import numpy as np -from geometryDevices import utils as geom_utils from calngUtils import ( device as device_utils, + geom_utils, scene_utils, shmem_utils, timing, @@ -435,9 +434,6 @@ class BaseCorrection(PythonDevice): self._last_cell_table = None # used to check whether to update property self._last_pulse_table = None # ditto - though not all detectors have this - # register slots - self.registerSlot(self.slotReceiveGeometry) - def constant_override_fun(friend_fun, constant, preserve_fields): def aux(): self.flush_constants( @@ -527,14 +523,9 @@ class BaseCorrection(PythonDevice): f"preview:{spec.name}" for spec in self._preview_outputs ] - self.geometry = None - if self.get("geometryDevice"): - self.signalSlotable.connect( - self.get("geometryDevice"), - "signalNewGeometry", - "", # slot device ID (default: self) - "slotReceiveGeometry", - ) + geom_utils.subscribe_to_geometry_bound( + self, self.get("geometryDevice"), warn_fun=self.log_status_warn + ) self._buffered_status_update = Hash() self._processing_time_tracker = trackers.ExponentialMovingAverage( @@ -735,13 +726,6 @@ class BaseCorrection(PythonDevice): response["payload"] = payload self.reply(response) - def slotReceiveGeometry(self, device_id, serialized_geometry): - self.log.INFO(f"Received geometry from {device_id}") - try: - self.geometry = geom_utils.deserialize_geometry(serialized_geometry) - except Exception as e: - self.log.WARN(f"Failed to deserialize geometry; {e}") - def _get_data_from_hash(self, data_hash): """Will get image data, cell table, pulse table, and list of other arrays from the data hash. Assumes XTDF (image.data, image.cellId, image.pulseId, and no