From e685a56a789977cb629596cdfcff5203254c68cc Mon Sep 17 00:00:00 2001 From: David Hammer <dhammer@mailbox.org> Date: Thu, 10 Feb 2022 12:34:27 +0100 Subject: [PATCH] Switch to TrainMatcher 2.0.1-2.13.0 with built-in bridge --- DEPENDS | 3 +- src/calng/CalibrationManager.py | 69 ++++++++++----------------------- src/calng/DetectorAssembler.py | 57 +++++++++++++++------------ src/calng/base_correction.py | 2 +- 4 files changed, 55 insertions(+), 76 deletions(-) diff --git a/DEPENDS b/DEPENDS index adead8b9..591a0c1b 100644 --- a/DEPENDS +++ b/DEPENDS @@ -1,4 +1,3 @@ -TrainMatcher, 1.2.0-2.10.2 -PipeToZeroMQ, 3.2.6-2.11.0 +TrainMatcher, 2.0.1-2.13.0 calngDeps, 0.0.3-2.10.0 calibrationClient, 9.0.6 diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 24ddeaa5..245cb973 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -160,9 +160,6 @@ class ModuleGroupRow(Configurable): withBridge = Bool( displayedName='Include bridge?') - startBridge = Bool( - displayedName='Start bridge?') - bridgePort = UInt16( displayedName='Bridge port', defaultValue=47000, @@ -694,7 +691,7 @@ class CalibrationManager(DeviceClientBase, Device): """ correction_device_servers = [ - server for _, server, _, _, _, _, _, _ in self.moduleGroups.value] + server for _, server, _, _, _, _, _ in self.moduleGroups.value] up_corr_servers = await self._get_servers_in_state( 'up', servers=correction_device_servers) @@ -1220,7 +1217,7 @@ class CalibrationManager(DeviceClientBase, Device): getattr(self.deviceIds, f'{role}Suffix') # Servers by group and layer. - server_by_group = {group: server for group, server, _, _, _, _, _, _ + server_by_group = {group: server for group, server, _, _, _, _, _ in self.moduleGroups.value} server_by_layer = {layer: server for layer, _, server in self.previewLayers.value} @@ -1279,10 +1276,10 @@ class CalibrationManager(DeviceClientBase, Device): ): return - # Instantiate group matchers and bridges. + # Instantiate group matchers which can also be bridges. for row in self.moduleGroups.value: group, server, with_matcher, start_matcher, with_bridge, \ - start_bridge, bridge_port, bridge_pattern = row + bridge_port, bridge_pattern = row # Group matcher, if applicable. if with_matcher: @@ -1290,14 +1287,20 @@ class CalibrationManager(DeviceClientBase, Device): group=group) config = Hash() - config['channels'] = [ - f'{correct_device_id_by_module[vname]}:dataOutput' - for vname in modules_by_group[group]] - config['fastSources'] = [ - Hash('fsSelect', True, - 'fsSource', input_source_by_module[vname]) + config['sources'] = [ + Hash('select', + True, + 'source', + f'{input_source_by_module[vname]}' + f'@{correct_device_id_by_module[vname]}:dataOutput') for vname in modules_by_group[group]] + if with_bridge: + config['zmqConfig'] = [Hash( + 'pattern', bridge_pattern, + 'hwm', 1, + 'port', bridge_port)] + if not await self._instantiate_device( server, class_ids['groupMatcher'], matcher_device_id, config @@ -1312,34 +1315,6 @@ class CalibrationManager(DeviceClientBase, Device): background(_activate_matcher(matcher_device_id)) - # Group bridge, if applicable. - if with_bridge: - bridge_device_id = device_id_templates['bridge'].format( - group=group) - - config = Hash() - config['outputsConfig'] = [Hash( - 'pattern', bridge_pattern, 'hwm', 1, 'port', bridge_port)] - - config['input.connectedOutputChannels'] = [ - f'{matcher_device_id}:output'] - - if not await self._instantiate_device( - server, class_ids['bridge'], bridge_device_id, config - ): - return - elif start_bridge: - # Delay the slot a bit since it will get lost during - # instantation. - - async def _activate_bridge(device_id): - with await getDevice(device_id) as device: - await sleep(3) - if device.state == State.PASSIVE: - await device.activate() - - background(_activate_bridge(bridge_device_id)) - # Instantiate preview layer assemblers. geometry_device_id = self.geometryDevice.value for layer, output_pipeline, server in self.previewLayers.value: @@ -1349,15 +1324,13 @@ class CalibrationManager(DeviceClientBase, Device): config = Hash() # TODO: put _image_data_path in corr dev schema, get from there config['pathToStack'] = self.imageDataPath.value - config['fastSources'] = [ - Hash('fsSelect', True, - 'fsSource', - f'{input_source_by_module[virtual_id]}') + config['sources'] = [ + Hash('select', True, + 'source', + f'{input_source_by_module[virtual_id]}' + f'@{device_id}:{output_pipeline}') for (virtual_id, device_id) in correct_device_id_by_module.items()] - config['channels'] = [ - f'{device_id}:{output_pipeline}' - for device_id in correct_device_id_by_module.values()] config['geometryInput.connectedOutputChannels'] = [ f'{geometry_device_id}:geometryOutput'] diff --git a/src/calng/DetectorAssembler.py b/src/calng/DetectorAssembler.py index 4659cc6a..e8651345 100644 --- a/src/calng/DetectorAssembler.py +++ b/src/calng/DetectorAssembler.py @@ -159,8 +159,8 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): # TODO: match inside device, fill multiple independent buffers self._path_to_stack = self.get("pathToStack") - self.geometry = None - self.input_buffer = None + self._geometry = None + self._stack_input_buffer = None self.KARABO_ON_DATA("geometryInput", self.receive_geometry) self.KARABO_SLOT(self.requestScene) @@ -196,9 +196,9 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): def receive_geometry(self, data, metadata): self.log.INFO("Received a new geometry") - self.geometry = pickle.loads(data.get("pickledGeometry")) + self._geometry = pickle.loads(data.get("pickledGeometry")) # TODO: allow multiple memory cells (extra geom notion of extra dimensions) - self.input_buffer = np.zeros(self.geometry.expected_data_shape) + self._stack_input_buffer = np.zeros(self._geometry.expected_data_shape) def ask_for_geometry(self): def runner(): @@ -206,7 +206,7 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): max_tries = 10 for i in range(max_tries): time.sleep(np.random.random() * 10) - if self.geometry is None: + if self._geometry is None: geometry_device_list = list( self.get("geometryInput.connectedOutputChannels") ) @@ -229,45 +229,47 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): self.signalSlotable.call(geometry_device, "pleaseSendYourGeometry") time.sleep(1) - if self.geometry is not None: + if self._geometry is not None: return self.log.INFO(f"Failed to get geometry in {max_tries} tries, need help") threading.Thread(target=runner, daemon=True).start() - def _send(self, train_id, sources): - # TODO: adapt to appropriate hook for new TrainMatcher (no _send) - if self.geometry is None: + def on_matched_data(self, train_id, sources): + if self._geometry is None: self.log.WARN("Have not received a geometry yet, will not send anything") return - timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) + my_timestamp = Timestamp(Epochstamp(), Trainstamp(train_id)) + my_source = self.getInstanceId() - module_indices_unfilled = set(range(self.input_buffer.shape[0])) - for source, (data, metadata) in sources.items(): + module_indices_unfilled = set(range(self._stack_input_buffer.shape[0])) + for source, (data, source_timestamp) in sources.items(): # TODO: handle failure to "parse" source, get data out module_index = self._source_to_index(source) - self.input_buffer[module_index] = np.squeeze(data.get(self._path_to_stack)) + self._stack_input_buffer[module_index] = np.squeeze( + data.get(self._path_to_stack) + ) module_indices_unfilled.discard(module_index) for module_index in module_indices_unfilled: - self.input_buffer[module_index].fill(0) + self._stack_input_buffer[module_index].fill(0) # TODO: configurable treatment of missing modules # TODO: reusable output buffer to save on allocation - assembled, _ = self.geometry.position_modules_fast(self.input_buffer) + assembled, _ = self._geometry.position_modules_fast(self._stack_input_buffer) # TODO: optionally include control data - out_hash = Hash( + output_hash = Hash( "image.data", assembled, "trainId", train_id, ) - # TODO: just get channel once, reuse (should not need to reinject) - channel = self.signalSlotable.getOutputChannel("output") - output_metadata = ChannelMetaData(self.getInstanceId(), timestamp) - channel.write(out_hash, output_metadata) - channel.update() + output_metadata = ChannelMetaData(my_source, my_timestamp) + self.output.write(output_hash, output_metadata) + self.output.update() + self.zmq_output.write(my_source, output_hash, my_timestamp) + self.zmq_output.update() if train_id % self.unsafe_get("preview.trainStride") == 0: downsampling_factor = self.unsafe_get("preview.downsamplingFactor") @@ -279,23 +281,28 @@ class DetectorAssembler(TrainMatcher.TrainMatcher): np, self.unsafe_get("preview.downsamplingFunction") ), ) - out_hash = Hash( + output_hash = Hash( "image.data", ImageData( # TODO: get around this being mirrored... assembled.astype(np.int32)[::-1, ::-1], Dims(*assembled.shape), Encoding.GRAY, + bitsPerPixel=32, ), "trainId", train_id, ) - channel = self.signalSlotable.getOutputChannel("preview.output") - channel.write(out_hash, output_metadata) - channel.update() + self.preview_output.write(output_hash, output_metadata) + self.preview_output.update() self.rate_out.update() + def on_new_data(self, channel, data, meta): + super().on_new_data(channel, data, meta) + ... + # TODO: start filling buffer early + @functools.lru_cache() def _source_to_index(self, source): # note: cache means warning only shows up once (also not performance-critical) diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index 48769362..103d823a 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -532,7 +532,7 @@ class BaseCorrection(PythonDevice): "be taken if DAQ train stride is >1." ) .assignmentOptional() - .defaultValue(6) + .defaultValue(1) .reconfigurable() .commit(), ) -- GitLab