diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 60f00fae7c877810b082890712599cb24f7ea12d..bbad11f8ea3e8a01d9a420a9c47cac43bfe7a99f 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -21,12 +21,12 @@ from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future from karabo.middlelayer import ( KaraboError, Device, DeviceClientBase, Descriptor, Hash, Configurable, - Slot, Node, Type, + Slot, Node, Type, Schema, AccessMode, AccessLevel, Assignment, DaqPolicy, State, Unit, - UInt16, UInt32, Bool, Double, Schema, String, VectorString, VectorHash, + UInt16, UInt32, Bool, Float, Double, String, VectorString, VectorHash, background, call, callNoWait, setNoWait, sleep, instantiate, slot, coslot, - getDevice, getTopology, getConfiguration, getConfigurationFromPast, - getConfigurationFromName, get_property) + get_property, getTopology, getConfiguration, getConfigurationFromPast, + getConfigurationFromName) from karabo.middlelayer_api.proxy import ProxyFactory from ._version import version as deviceVersion @@ -38,25 +38,12 @@ Device states: - INIT: When the device is starting up - ACTIVE: When the device is ready to manage a calibration pipeline - CHANGING: When the device is actively changing the pipeline configuration + - UNKNOWN: When the pipeline should be restarted after reconfiguration - ERROR: Recoverable error, only allows server restart - - UNKNOWN: Unrecoverable error + - DISABLED: Unrecoverable error, requires device restart ''' -# Copied from karabo MDL source (location depending on version) -# Will be part of MDL's public API in 2.12 -def get_instance_parent(instance): - """Find the parent of the instance""" - parent = instance - while True: - try: - parent = next(iter(parent._parents)) - except StopIteration: - break - - return parent - - class ClassIdsNode(Configurable): correctionClass = String( displayedName='Correction class', @@ -65,8 +52,8 @@ class ClassIdsNode(Configurable): accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) - groupMatcherClass = String( - displayedName='Group matcher class', + matcherClass = String( + displayedName='Matcher class', description='Device class to use for matching the stream output of a ' 'module group.', defaultValue='ShmemTrainMatcher', @@ -92,9 +79,9 @@ class DeviceIdsNode(Configurable): accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) - groupMatcherSuffix = String( - displayedName='Group matcher suffix', - description='Suffix for group matching device IDs. The formatting ' + matcherSuffix = String( + displayedName='Matcher suffix', + description='Suffix for matching device IDs. The formatting ' 'placeholder \'group\' may be used.', defaultValue='MATCH_G{group}', accessMode=AccessMode.INITONLY, @@ -111,66 +98,88 @@ class DeviceIdsNode(Configurable): class ModuleRow(Configurable): virtualName = String( - displayedName='Virtual name') + displayedName='Virtual name', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) group = UInt32( - displayedName='Group') + displayedName='Group', + defaultValue=0, + accessMode=AccessMode.RECONFIGURABLE) aggregator = String( - displayedName='Aggregator name') + displayedName='Aggregator name', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) inputChannel = String( - displayedName='Input channel') + displayedName='Input channel', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) inputSource = String( - displayedName='Input source') + displayedName='Input source', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) class ModuleGroupRow(Configurable): group = UInt32( - displayedName='Group') + displayedName='Group', + defaultValue=0, + accessMode=AccessMode.RECONFIGURABLE) deviceServer = String( - displayedName='Device server') - - withMatcher = Bool( - displayedName='Include matcher?') - - startMatcher = Bool( - displayedName='Start matcher?') + displayedName='Device server', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) withBridge = Bool( - displayedName='Include bridge?') + displayedName='Include bridge?', + defaultValue=False, + accessMode=AccessMode.RECONFIGURABLE) bridgePort = UInt16( displayedName='Bridge port', defaultValue=47000, minInc=1024, - maxInc=65353) + maxInc=65353, + accessMode=AccessMode.RECONFIGURABLE) bridgePattern = String( displayedName='Bridge pattern', options=['PUSH', 'REP', 'PUB'], - defaultValue='PUSH') + defaultValue='PUSH', + accessMode=AccessMode.RECONFIGURABLE) class PreviewLayerRow(Configurable): layer = String( - displayedName='Preview layer') + displayedName='Preview layer', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) outputPipeline = String( - displayedName='Output pipeline') + displayedName='Output pipeline', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) deviceServer = String( - displayedName='Device server') + displayedName='Device server', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) class DeviceServerRow(Configurable): deviceServer = String( - displayedName='Device server') + displayedName='Device server', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) webserverHost = String( - displayedName='Webserver host') + displayedName='Webserver host', + defaultValue='http://', + accessMode=AccessMode.RECONFIGURABLE) class WebserverApiNode(Configurable): @@ -216,13 +225,19 @@ class WebserverApiNode(Configurable): class RestoredConfigurationRow(Configurable): enabled = Bool( - displayedName='Enabled') + displayedName='Enabled', + defaultValue=False, + accessMode=AccessMode.RECONFIGURABLE) memberPattern = String( - displayedName='Member pattern') + displayedName='Member pattern', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) keyPattern = String( - displayedName='Key pattern') + displayedName='Key pattern', + defaultValue='', + accessMode=AccessMode.RECONFIGURABLE) class ManagedKeysNode(Configurable): @@ -260,6 +275,9 @@ class CalibrationManager(DeviceClientBase, Device): @slot def requestScene(self, params): name = params.get('name', default='overview') + + payload = Hash('success', True) + if name == 'overview': # Assumes there are correction devices known to manager scene_data = scenes.manager_device_overview( @@ -271,17 +289,20 @@ class CalibrationManager(DeviceClientBase, Device): domain_device_ids=self._domain_device_ids, ) payload = Hash('success', True, 'name', name, 'data', scene_data) - elif name == "correction_performance_overview": + + elif name == 'correction_performance_overview': scene_data = scenes.correction_device_performance_dashboard( self._correction_device_ids, ) payload = Hash('success', True, 'name', name, 'data', scene_data) - elif name == "correction_constant_overview": + + elif name == 'correction_constant_overview': scene_data = scenes.correction_constant_dashboard( self._correction_device_ids, self._correction_device_schema, ) payload = Hash('success', True, 'name', name, 'data', scene_data) + elif name.startswith('browse_schema'): if ':' in name: prefix = name[len('browse_schema:'):] @@ -293,6 +314,7 @@ class CalibrationManager(DeviceClientBase, Device): prefix, ) payload = Hash('success', True, 'name', name, 'data', scene_data) + else: payload = Hash('success', False, 'name', name) @@ -321,7 +343,7 @@ class CalibrationManager(DeviceClientBase, Device): 'detector, the formatting placeholder ' '\'detector_identifier\' may be used. Leave empty to ' 'disable DAQ interaction.', - defaultValue='^{detector_identifier}/DET/\d*CH0$', + defaultValue=r'^{detector_identifier}/DET/\d*CH0$', accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) @@ -369,6 +391,8 @@ class CalibrationManager(DeviceClientBase, Device): @VectorHash( displayedName='Device servers', + description='WARNING: It is strongly recommended to perform a full ' + 'pipeline restart after changes to this property.', rows=DeviceServerRow, accessMode=AccessMode.RECONFIGURABLE, assignment=Assignment.MANDATORY) @@ -376,6 +400,9 @@ class CalibrationManager(DeviceClientBase, Device): self.deviceServers = value self._servers_changed = True + # Switch to UNKNOWN state to suggest operator to restart pipeline. + self.state = State.UNKNOWN + geometryDevice = String( displayedName='Geometry device', description='Device ID for a geometry device defining the detector ' @@ -383,16 +410,6 @@ class CalibrationManager(DeviceClientBase, Device): accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) - maxIdle = Double( - displayedName='Preview source timeout', - description='TrainMatcher (used for preview assemblers) has an option ' - '(maxIdle) to automatically ignore sources which have not ' - 'sent data in the last maxIdle seconds (if maxIdle > 0). ' - 'Useful in case a module fails. If missing source comes ' - 'back, it will be matched again.', - defaultValue=1, - accessMode=AccessMode.RECONFIGURABLE) - webserverApi = Node( WebserverApiNode, displayedName='Webserver API', @@ -413,8 +430,8 @@ class CalibrationManager(DeviceClientBase, Device): 'to selectively restore the most previous configuration.', defaultValue=[Hash( 'enabled', False, - 'memberPattern', 'MATCH_G\d', - 'keyPattern', '^(channels|fastSources|slowSources)')], + 'memberPattern', r'MATCH_G\d', + 'keyPattern', '^(sources|zmqConfig|sortSources|mode')], accessMode=AccessMode.RECONFIGURABLE) async def restoredConfigurations(self, new_configs): self.restoredConfigurations = new_configs @@ -431,14 +448,14 @@ class CalibrationManager(DeviceClientBase, Device): @Slot( displayedName='Restart servers', - allowedStates=[State.ACTIVE, State.ERROR]) + allowedStates=[State.UNKNOWN, State.ACTIVE, State.ERROR]) async def restartServers(self): self.state = State.CHANGING background(self._restart_servers()) @Slot( displayedName='Instantiate pipeline', - allowedStates=[State.ACTIVE]) + allowedStates=[State.UNKNOWN, State.ACTIVE]) async def startInstantiate(self): # Slot name is mandated by DeviceInstantiator interface. self.state = State.CHANGING @@ -464,13 +481,38 @@ class CalibrationManager(DeviceClientBase, Device): 'the manager, replacing any manual change.', allowedStates=[State.ACTIVE]) async def applyManagedValues(self): - background(self._apply_managed_values()) + background(self._apply_managed_values( + daq=True, assemblers=True, corrections=True)) managedKeys = Node( ManagedKeysNode, displayedName='Managed keys', description='Properties and slots managed on devices in the pipeline.') + # Hardcoded managed keys + # List of tuple (remote key, local key, descriptor) + _managed_daq_keys = [ + ( + 'DataDispatcher.trainStride', 'daqTrainStride', + UInt32(displayedName='DAQ train stride', + unitSymbol=Unit.COUNT, + defaultValue=5, + allowedStates=[State.ACTIVE], + minInc=1) + ) + ] + + _managed_assembler_keys = [ + ( + 'maxIdle', 'previewMaxIdle', + Float(displayedName='Preview max idle', + unitSymbol=Unit.SECOND, + defaultValue=0.5, + allowedStates=[State.ACTIVE], + minInc=0.0) + ), + ] + def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -478,9 +520,16 @@ class CalibrationManager(DeviceClientBase, Device): # for all devices it manages. self._device_id_root = self.deviceId[:self.deviceId.rfind('/')].value - # Concretized class ID for correction devices. - self._correction_class_id = self.classIds.correctionClass.format( - self.detectorType.value.lower().capitalize()) + # Concretized class IDs and device ID templates. + class_args = (self.detectorType.value.capitalize(),) + self._class_ids = {} + self._device_id_templates = {} + + for role in ['correction', 'matcher', 'assembler']: + self._class_ids[role] = getattr( + self.classIds, f'{role}Class').value.format(*class_args) + self._device_id_templates[role] = f'{self._device_id_root}/' + \ + getattr(self.deviceIds, f'{role}Suffix') # Concretized pattern for device IDs of data aggregators. if self.daqDevicesPattern: @@ -489,19 +538,31 @@ class CalibrationManager(DeviceClientBase, Device): detector_identifier=self.detectorIdentifier.value)).match else: # Always False. - self._is_daq_device = lambda x: False + self._is_daq_device = bool # Always returns False. # Set of data aggregators associated with the managed detector. self._daq_device_ids = set() # Set of devices in the same domain as the manager, i.e. having - # the same device ID root. + # the same device ID root. Not all of these devices are + # necessarily managed, there may be other persistent devices + # next to the manager started from projects, e.g. geometry + # devices or condition checkers. self._domain_device_ids = set() + # Set of managed devices, i.e. in the same domain as the manager + # and of a class ID used in the classIds node and running in a + # server in the deviceServers vector. + self._managed_device_ids = set() + # Set of correction devices for the managed detector, i.e. being - # in the same domain and having the specified class ID. + # managed and having the specified class ID. self._correction_device_ids = set() + # Set of assembler devices for the managed detector, i.e. being + # managed and having the specified class ID. + self._assembler_device_ids = set() + # Mapping of device ID pattern to key pattern to selectively # restore past configuration for. self._restore_config_patterns = dict() @@ -520,7 +581,8 @@ class CalibrationManager(DeviceClientBase, Device): await super().slotInstanceNew(instance_id, info) if info['type'] == 'device': - self._check_new_device(instance_id, info['classId']) + self._check_new_device(instance_id, info['serverId'], + info['classId']) self._update_managed_devices() @slot @@ -532,13 +594,12 @@ class CalibrationManager(DeviceClientBase, Device): if info['type'] == 'device': self._daq_device_ids.discard(instance_id) self._domain_device_ids.discard(instance_id) + self._managed_device_ids.discard(instance_id) self._correction_device_ids.discard(instance_id) + self._assembler_device_ids.discard(instance_id) self._update_managed_devices() async def _async_init(self): - # Populate the device ID sets with what's out there right now. - await self._check_topology() - # Set-up Tornado. if hasattr(AsyncIOMainLoop, 'initialized'): if not AsyncIOMainLoop.initialized(): @@ -552,28 +613,46 @@ class CalibrationManager(DeviceClientBase, Device): # Inject schema for configuration of managed devices. await self._inject_managed_keys() + # Populate the device ID sets with what's out there right now. + await self._check_topology() + if self.state == State.INIT: self._set_status('Calibration manager ready') self.state = State.ACTIVE - def _check_new_device(self, device_id, class_id): - if class_id == 'DataAggregator' and self._is_daq_device(device_id): + def _check_new_device(self, device_id, server_id, class_id): + if device_id == self.deviceId: + # Ignore manager itself, it typically sees its own instance + # going up first. + return + + elif class_id == 'DataAggregator' and self._is_daq_device(device_id): # This device is a data aggregator belonging to the detector # installation self._daq_device_ids.add(device_id) elif device_id.startswith(self._device_id_root): - # This device lives under the same device ID root as this - # manager instance, but don't add yourself! - if device_id == self.deviceId: + # In our domain. + self._domain_device_ids.add(device_id) + + if ( + server_id not in self._server_api_names or + class_id not in self._class_ids.values() + ): + # NOT a managed device. return - self._domain_device_ids.add(device_id) + # A managed device. + self._managed_device_ids.add(device_id) - if class_id == self._correction_class_id: + if class_id == self._class_ids['correction']: # This device is also a correction device. self._correction_device_ids.add(device_id) + elif class_id == self._class_ids['assembler']: + # This device is also an assembler + self._assembler_device_ids.add(device_id) + async def _check_topology(self): for i in range(10): try: @@ -593,14 +672,17 @@ class CalibrationManager(DeviceClientBase, Device): self.logger.debug(f'Topology arrived after {i+1} tries') for device_id in devices: - self._check_new_device(device_id, devices[device_id, 'classId']) + self._check_new_device(device_id, devices[device_id, 'serverId'], + devices[device_id, 'classId']) self._update_managed_devices(True) async def _discover_managed_devices(self): self._daq_device_ids.clear() self._domain_device_ids.clear() + self._managed_device_ids.clear() self._correction_device_ids.clear() + self._assembler_device_ids.clear() await self._check_topology() @@ -608,9 +690,7 @@ class CalibrationManager(DeviceClientBase, Device): async def _delayed_managed_devices_update(self): await sleep(1.0) # Throttle updates to at most once a second. - self.managedDevices = sorted( - self._domain_device_ids | self._daq_device_ids) - + self.managedDevices = sorted(self._managed_device_ids) self._managed_devices_updater = None # Clear task again. def _update_managed_devices(self, forced=False): @@ -618,8 +698,7 @@ class CalibrationManager(DeviceClientBase, Device): # Update already in progress, ignore. return - all_managed_devices = self._domain_device_ids | self._daq_device_ids - if forced or len(all_managed_devices) != len(self.managedDevices): + if forced or len(self._managed_device_ids) != len(self.managedDevices): # Trigger an update either if forced or the number of # devices changed. self._managed_devices_updater = background( @@ -650,25 +729,31 @@ class CalibrationManager(DeviceClientBase, Device): return {key: max(set(values), key=values.count) for key, values in key_values.items()} - @staticmethod - def _get_managed_daq_keys(): - # List of tuples [remote key, local key, descriptor] - return [ - ('DataDispatcher.trainStride', 'daqTrainStride', - UInt32(displayedName='DAQ train stride', - unitSymbol=Unit.COUNT, - defaultValue=5, - allowedStates=[State.ACTIVE], - minInc=1)) - ] + async def _add_managed_keys(self, managed_hash, managed_keys, owner, + device_ids): + """Add hardcoded managed keys.""" + + remote_keys = [] + + for remote_key, local_key, descr in managed_keys: + remote_keys.append(remote_key) + _, attrs = descr.toSchemaAndAttrs(None, None) + + managed_hash[local_key] = 0 + managed_hash[local_key, ...].update( + attrs, __owner=owner, __remote=remote_key) + + return await self._get_shared_keys(device_ids, remote_keys) \ + if device_ids else {} async def _inject_managed_keys(self): """Attempt to retrieve the correction device's schema and insert - part of it as managed keys. + part of it as managed keys, plus some hardcoded keys for other + devices types. """ correction_device_servers = [ - server for _, server, _, _, _, _, _ in self.moduleGroups.value] + server for _, server, _, _, _ in self.moduleGroups.value] up_corr_servers = await self._get_servers_in_state( 'up', servers=correction_device_servers) @@ -691,15 +776,16 @@ class CalibrationManager(DeviceClientBase, Device): # Obtain the device schema from a correction device server. managed_schema, _, _ = await call(corr_server, 'slotGetClassSchema', - self._correction_class_id) - # saving this for later + self._class_ids['correction']) + + # Save this for scene generation later. self._correction_device_schema = Schema() self._correction_device_schema.copy(managed_schema) - if managed_schema.name != self._correction_class_id: + if managed_schema.name != self._class_ids['correction']: self._set_fatal( - f'Correction class ID `{self._correction_class_id}` not known ' - f'or loadable by device server `{corr_server}`') + f'Correction class ID `{self._class_ids["correction"]}` not ' + f'known or loadable by device server `{corr_server}`') return # Collect the keys to be managed including the nodes leading up @@ -728,19 +814,14 @@ class CalibrationManager(DeviceClientBase, Device): # Add in managed DAQ keys, if enabled. if self.daqDevicesPattern: - remote_keys = [] + prev_values.update(await self._add_managed_keys( + managed_hash, self._managed_daq_keys, 'daq', + self._daq_device_ids)) - for remote_key, local_key, descr in self._get_managed_daq_keys(): - remote_keys.append(remote_key) - _, attrs = descr.toSchemaAndAttrs(None, None) - - managed_hash[local_key] = 0 - managed_hash[local_key, ...].update( - attrs, __owner='daq', __remote=remote_key) - - if self._daq_device_ids: - prev_values.update(await self._get_shared_keys( - self._daq_device_ids, remote_keys)) + # Add in managed assembler keys. + prev_values.update(await self._add_managed_keys( + managed_hash, self._managed_assembler_keys, 'assemblers', + self._assembler_device_ids)) if self.managedKeyConfiguration: config_name = self.managedKeyConfiguration.value @@ -853,7 +934,7 @@ class CalibrationManager(DeviceClientBase, Device): # Inject the newly prepared node for managed keys. self.__class__.managedKeys = managed_node await self.publishInjectedParameters() - self._managed_keys = managed_keys + self._managed_correction_keys = managed_keys self.logger.debug('Managed schema injected') @@ -874,9 +955,9 @@ class CalibrationManager(DeviceClientBase, Device): self._set_status(text, level=logging.ERROR) def _set_fatal(self, text): - """Set the device into unknown state and log an error message.""" + """Set the device into disabled state and log an error message.""" - self.state = State.UNKNOWN + self.state = State.DISABLED self._set_status(text, level=logging.CRITICAL) def _set_exception(self, text, e): @@ -1164,6 +1245,7 @@ class CalibrationManager(DeviceClientBase, Device): return False self.logger.debug(f'Instantation result for {device_id}: {msg}') + return True async def _instantiate_pipeline(self): @@ -1184,19 +1266,8 @@ class CalibrationManager(DeviceClientBase, Device): return self._set_error('Request unexpectedly failed while ' 'checking device server state', e) - # Class and device ID templates per role. - class_ids = {} - device_id_templates = {} - - class_args = (self.detectorType.value.lower().capitalize(),) - for role in ['correction', 'groupMatcher', 'assembler']: - class_ids[role] = getattr( - self.classIds, f'{role}Class').value.format(*class_args) - device_id_templates[role] = f'{self._device_id_root}/' + \ - getattr(self.deviceIds, f'{role}Suffix') - # Servers by group and layer. - server_by_group = {group: server for group, server, _, _, _, _, _ + server_by_group = {group: server for group, server, _, _, _, in self.moduleGroups.value} server_by_layer = {layer: server for layer, _, server in self.previewLayers.value} @@ -1219,14 +1290,15 @@ class CalibrationManager(DeviceClientBase, Device): vname, group, aggregator, input_channel, input_source = row modules_by_group[group].append(vname) - device_id = device_id_templates['correction'].format( + device_id = self._device_id_templates['correction'].format( virtualName=vname, index=index, group=group) correct_device_id_by_module[vname] = device_id if not aggregator: aggregator = f'{self.detectorType.upper()}{index:02d}' - daq_device_id = f'{self.detectorIdentifier}/DET/{index}CH0' + detector_id = self.detectorIdentifier.value + daq_device_id = f'{detector_id}/DET/{index}CH0' if not input_channel: input_channel = f'{daq_device_id}:output' @@ -1236,104 +1308,106 @@ class CalibrationManager(DeviceClientBase, Device): input_source_by_module[vname] = input_source - config = Hash() - - config['constantParameters.detectorName'] = self.detectorIdentifier.value - config['constantParameters.karaboDa'] = aggregator - config['dataInput.connectedOutputChannels'] = [input_channel] - config['fastSources'] = [input_source] + config = Hash( + 'constantParameters.detectorName', detector_id, + 'constantParameters.karaboDa', aggregator, + 'dataInput.connectedOutputChannels', [input_channel], + 'fastSources', [input_source] + ) # Add managed keys. - for key in self._managed_keys: + for key in self._managed_correction_keys: value = get_property(self, f'managedKeys.{key}') if not ismethod(value): - config[key] = value + config[key] = value.value awaitables.append(self._instantiate_device( server_by_group[group], - class_ids['correction'], + self._class_ids['correction'], device_id, - config)) - await gather(*awaitables) - awaitables.clear() + config + )) - # Instantiate group matchers which can also be bridges. + # Instantiate group matchers with optional bridge enabled. for row in self.moduleGroups.value: - group, server, with_matcher, start_matcher, with_bridge, \ - bridge_port, bridge_pattern = row - - # Group matcher, if applicable. - if with_matcher: - matcher_device_id = device_id_templates['groupMatcher'].format( - group=group) - - config = Hash() - config['sources'] = [ - Hash('select', - True, - 'source', - f'{input_source_by_module[vname]}' - f'@{correct_device_id_by_module[vname]}:dataOutput') - for vname in modules_by_group[group]] - - if with_bridge: - config['zmqConfig'] = [Hash( - 'pattern', bridge_pattern, - 'hwm', 1, - 'port', bridge_port)] - - if not await self._instantiate_device( - server, class_ids['groupMatcher'], - matcher_device_id, config - ): - return - elif start_matcher: - async def _activate_matcher(device_id): - with await getDevice(device_id) as device: - await sleep(3) - if device.state == State.PASSIVE: - await device.start() - - background(_activate_matcher(matcher_device_id)) + group, server, with_bridge, bridge_port, bridge_pattern = row + + sources = [Hash('select', True, + 'source', '{}@{}:dataOutput'.format( + input_source_by_module[vname], + correct_device_id_by_module[vname]) + ) + for vname in modules_by_group[group]] + config = Hash('sources', sources) + + if with_bridge: + config['zmqConfig'] = [ + Hash('pattern', bridge_pattern, 'hwm', 1, + 'port', bridge_port) + ] + + awaitables.append(self._instantiate_device( + server, + self._class_ids['matcher'], + self._device_id_templates['matcher'].format(group=group), + config + )) # Instantiate preview layer assemblers. for layer, output_pipeline, server in self.previewLayers.value: - assembler_device_id = device_id_templates['assembler'].format( - layer=layer) - - config = Hash() - config['sources'] = [ - Hash('select', True, - 'source', - f'{device_id}:{output_pipeline}') - for (_, device_id) - in correct_device_id_by_module.items()] - config['geometryDevice'] = self.geometryDevice.value - config['maxIdle'] = self.maxIdle.value - # TODO: enable live reconfiguration of maxIdle via manager + sources = [Hash('select', True, + 'source', f'{device_id}:{output_pipeline}') + for (_, device_id) + in correct_device_id_by_module.items()] + config = Hash('sources', sources, + 'geometryDevice', self.geometryDevice.value) + + for remote_key, local_key, _ in self._managed_assembler_keys: + value = get_property(self, f'managedKeys.{local_key}') + + if not ismethod(value): + config[remote_key] = value.value awaitables.append(self._instantiate_device( - server, class_ids['assembler'], assembler_device_id, config)) + server, + self._class_ids['assembler'], + self._device_id_templates['assembler'].format(layer=layer), + config + )) + # Perform instantation. await gather(*awaitables) - awaitables.clear() - callNoWait(self.geometryDevice.value, "sendGeometry") + + # Force managed DAQ settings. + await self._apply_managed_values(daq=True) + + # Ask the geometry device to re-send its geometry. + callNoWait(self.geometryDevice.value, 'sendGeometry') self._set_status('All devices instantiated') self.state = State.ACTIVE - async def _apply_managed_values(self): + async def _apply_managed_values(self, daq=False, assemblers=False, + corrections=False): """Apply all managed keys to local values.""" - for daq_key, local_key, _ in self._get_managed_daq_keys(): - await self._set_on_daq( - daq_key, get_property(self, f'managedKeys.{local_key}')) + if daq and self.daqDevicesPattern: + # Only apply if DAQ management is enabled. + for remote_key, local_key, _ in self._managed_daq_keys: + await self._set_on_daq( + remote_key, get_property(self, f'managedKeys.{local_key}')) - for key in self._managed_keys: - value = get_property(self, f'managedKeys.{key}') - if not ismethod(value): - await self._set_on_corrections(key, value) + if assemblers: + for remote_key, local_key, _ in self._managed_assembler_keys: + await self._set_on_assembler( + remote_key, get_property(self, f'managedKeys.{local_key}')) + + if corrections: + for key in self._managed_correction_keys: + value = get_property(self, f'managedKeys.{key}') + if not ismethod(value): + await self._set_on_corrections(key, value) def _call(self, device_ids, slot, *args): """Call the same slot on a list of devices.""" @@ -1353,8 +1427,8 @@ class CalibrationManager(DeviceClientBase, Device): """Set the same property on a list of devices.""" for device_id in device_ids: - setNoWait(device_id, key, value) - self.logger.debug(f'Set {device_id}.{key} to {value}') + setNoWait(device_id, key, value.value) + self.logger.debug(f'Set {device_id}.{key} to {value.value}') async def _set_on_daq(self, key, value): """Set a property on all DAQ devices.""" @@ -1369,3 +1443,10 @@ class CalibrationManager(DeviceClientBase, Device): if self._correction_device_ids: self._set(self._correction_device_ids, key, value) self.logger.info(f'Set <CORR>.{key} to {value}') + + async def _set_on_assemblers(self, key, value): + """Set a property on all preview assemblers.""" + + if self._assembler_device_ids: + self._set(self._assembler_device_ids, key, value) + self.logger.info(f'Set <ASSEMBLE>.{key} to {value}') diff --git a/src/calng/ShmemTrainMatcher.py b/src/calng/ShmemTrainMatcher.py index 4c1e3c7992586165f3d17c6db3d547b706f77507..9f92fc113976561b2f137df9f62100b4db55c1cc 100644 --- a/src/calng/ShmemTrainMatcher.py +++ b/src/calng/ShmemTrainMatcher.py @@ -196,6 +196,8 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher): # it is set already by super by default, so only need to turn off self.output = None + self.start() # Auto-start this type of matcher. + def preReconfigure(self, conf): super().preReconfigure(conf) if conf.has("merge") or conf.has("sources"):