diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 20b91305f7fe6118799d2fecd317f995c5d50479..abde2474b9e0b40a31bef635e3cbe01699b1c7f6 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -14,6 +14,7 @@ from traceback import format_exc from urllib.parse import urlparse import json import logging +import re from tornado.httpclient import AsyncHTTPClient, HTTPError from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future @@ -26,7 +27,7 @@ from karabo.middlelayer import ( UInt16, UInt32, Bool, Double, Schema, String, VectorString, VectorHash, background, call, callNoWait, setNoWait, sleep, instantiate, slot, coslot, getDevice, getTopology, getConfiguration, getConfigurationFromPast, - get_property) + getConfigurationFromName, get_property) from karabo.middlelayer_api.proxy import ProxyFactory from karabo import version as karaboVersion @@ -151,10 +152,16 @@ class ModuleGroupRow(Configurable): displayedName='Device server') withMatcher = Bool( - displayedName='Matcher?') + displayedName='Include matcher?') + + startMatcher = Bool( + displayedName='Start matcher?') withBridge = Bool( - displayedName='Bridge?') + displayedName='Include bridge?') + + startBridge = Bool( + displayedName='Start bridge?') bridgePort = UInt16( displayedName='Bridge port', @@ -228,44 +235,19 @@ class WebserverApiNode(Configurable): accessMode=AccessMode.RECONFIGURABLE) -class InstantiationOptionsNode(Configurable): - restoreMatcherSources = Bool( - displayedName='Restore matcher sources', - description='Attempt to retrieve and restore the last known ' - 'configuration for slow and fast sources of matcher ' - 'devices when the pipeline is instantiated.', - defaultValue=False, - accessMode=AccessMode.RECONFIGURABLE) +class RestoredConfigurationRow(Configurable): + enabled = Bool( + displayedName='Enabled') - autoActivateGroupBridges = Bool( - displayedName='Activate bridges automatically', - description='Whether to activate all group bridges immediately after ' - 'instantation.', - defaultValue=False, - accessMode=AccessMode.RECONFIGURABLE) + memberPattern = String( + displayedName='Member pattern') - autoActivateGroupMatchers = Bool( - displayedName='Activate group matchers automatically', - description='Whether to activate all group matchers immediately after ' - 'instantation.', - defaultValue=False, - accessMode=AccessMode.RECONFIGURABLE) + keyPattern = String( + displayedName='Key pattern') class ManagedKeysNode(Configurable): - # Keys managed on detector DAQ devices. - DAQ_KEYS = {'DataDispatcher.trainStride': 'daqTrainStride'} - - @UInt32( - displayedName='DAQ train stride', - unitSymbol=Unit.COUNT, - defaultValue=5, - allowedStates=[State.ACTIVE], - minInc=1) - async def daqTrainStride(self, value): - self.daqTrainStride = value - background(get_instance_parent(self)._set_on_daq( - 'DataDispatcher.trainStride', value)) + pass class ManagedKeysCloneFactory(ProxyFactory): @@ -356,6 +338,16 @@ class CalibrationManager(DeviceClientBase, Device): accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) + daqDevicesPattern = String( + displayedName='DAQ devices pattern', + description='Regexp pattern for the DAQ data aggregators of this ' + 'detector, the formatting placeholder ' + '\'detector_identifier\' may be used. Leave empty to ' + 'disable DAQ interaction.', + defaultValue='^{detector_identifier}/DET/\d*CH0$', + accessMode=AccessMode.INITONLY, + assignment=Assignment.MANDATORY) + classIds = Node( ClassIdsNode, displayedName='Class IDs', @@ -421,22 +413,35 @@ class CalibrationManager(DeviceClientBase, Device): accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) - calcatUrl = String( - displayedName='CalCat URL', - description='[NYI] URL to CalCat API to use for constant retrieval, ' - 'set by local secrets file', - accessMode=AccessMode.READONLY) - webserverApi = Node( WebserverApiNode, displayedName='Webserver API', description='Configurations for the webserver API to control device ' 'servers.') - instantiationOptions = Node( - InstantiationOptionsNode, - displayedName='Instantiation options', - description='Optional flags controlling the pipeline instantiation.') + managedKeyConfiguration = String( + displayedName='Managed key configuration', + description='Name of manager device configuration to apply to ' + 'managed keys on startup, leave empty to disable.', + defaultValue='', + accessMode=AccessMode.INITONLY) + + @VectorHash( + displayedName='Restored configurations', + rows=RestoredConfigurationRow, + description='Regexp patterns matched against instantiated devices ' + 'to selectively restore the most previous configuration.', + defaultValue=[Hash( + 'enabled', False, + 'memberPattern', 'MATCH_G\d', + 'keyPattern', '^(channels|fastSources|slowSources)')], + accessMode=AccessMode.RECONFIGURABLE) + async def restoredConfigurations(self, new_configs): + self.restoredConfigurations = new_configs + self._restore_config_patterns = { + re.compile(member_pattern): re.compile(key_pattern) + for enabled, member_pattern, key_pattern in new_configs.value + if enabled} doNotCompressEvents = Bool( requiredAccessLevel=AccessLevel.GOD, @@ -459,6 +464,20 @@ class CalibrationManager(DeviceClientBase, Device): self.state = State.CHANGING background(self._instantiate_pipeline()) + managedDevices = VectorString( + displayedName='Managed devices', + description='List of currently managed devices.', + defaultValue=[], + accessMode=AccessMode.READONLY) + + @Slot( + displayedName='Discover managed devices', + description='', + allowedStates=[State.ACTIVE]) + async def discoverManagedDevices(self): + self.state = State.CHANGING + background(self._discover_managed_devices()) + @Slot( displayedName='Apply managed values', description='Set all managed keys to the values currently active on ' @@ -467,7 +486,7 @@ class CalibrationManager(DeviceClientBase, Device): async def applyManagedValues(self): background(self._apply_managed_values()) - managed = Node( + managedKeys = Node( ManagedKeysNode, displayedName='Managed keys', description='Properties and slots managed on devices in the pipeline.') @@ -483,6 +502,15 @@ class CalibrationManager(DeviceClientBase, Device): self._correction_class_id = self.classIds.correctionClass.format( self.detectorType.value.lower().capitalize()) + # Concretized pattern for device IDs of data aggregators. + if self.daqDevicesPattern: + # Match function of compiled pattern. + self._is_daq_device = re.compile(self.daqDevicesPattern.format( + detector_identifier=self.detectorIdentifier.value)).match + else: + # Always False. + self._is_daq_device = lambda x: False + # Set of data aggregators associated with the managed detector. self._daq_device_ids = set() @@ -494,6 +522,13 @@ class CalibrationManager(DeviceClientBase, Device): # in the same domain and having the specified class ID. self._correction_device_ids = set() + # Mapping of device ID pattern to key pattern to selectively + # restore past configuration for. + self._restore_config_patterns = dict() + + # Task object to update managed devices list. + self._managed_devices_updater = None + async def onInitialization(self): self.state = State.INIT @@ -512,6 +547,7 @@ class CalibrationManager(DeviceClientBase, Device): if info['type'] == 'device': self._check_new_device(instance_id, info['classId']) + self._update_managed_devices() @slot def slotInstanceGone(self, instance_id, info): @@ -523,6 +559,7 @@ class CalibrationManager(DeviceClientBase, Device): self._daq_device_ids.discard(instance_id) self._domain_device_ids.discard(instance_id) self._correction_device_ids.discard(instance_id) + self._update_managed_devices() async def _async_init(self): # Populate the device ID sets with what's out there right now. @@ -546,15 +583,17 @@ class CalibrationManager(DeviceClientBase, Device): self.state = State.ACTIVE def _check_new_device(self, device_id, class_id): - if class_id == 'DataAggregator' and \ - device_id.startswith(self.detectorIdentifier.value): + if class_id == 'DataAggregator' and self._is_daq_device(device_id): # This device is a data aggregator belonging to the detector # installation self._daq_device_ids.add(device_id) elif device_id.startswith(self._device_id_root): # This device lives under the same device ID root as this - # manager instance. + # manager instance, but don't add yourself! + if device_id == self.deviceId: + return + self._domain_device_ids.add(device_id) if class_id == self._correction_class_id: @@ -582,6 +621,36 @@ class CalibrationManager(DeviceClientBase, Device): for device_id in devices: self._check_new_device(device_id, devices[device_id, 'classId']) + self._update_managed_devices(True) + + async def _discover_managed_devices(self): + self._daq_device_ids.clear() + self._domain_device_ids.clear() + self._correction_device_ids.clear() + + await self._check_topology() + + self.state = State.ACTIVE + + async def _delayed_managed_devices_update(self): + await sleep(1.0) # Throttle updates to at most once a second. + self.managedDevices = sorted( + self._domain_device_ids | self._daq_device_ids) + + self._managed_devices_updater = None # Clear task again. + + def _update_managed_devices(self, forced=False): + if self._managed_devices_updater is not None: + # Update already in progress, ignore. + return + + all_managed_devices = self._domain_device_ids | self._daq_device_ids + if forced or len(all_managed_devices) != len(self.managedDevices): + # Trigger an update either if forced or the number of + # devices changed. + self._managed_devices_updater = background( + self._delayed_managed_devices_update()) + async def _get_shared_keys(self, device_ids, keys): """Find the most common property values on devices.""" @@ -607,13 +676,25 @@ class CalibrationManager(DeviceClientBase, Device): return {key: max(set(values), key=values.count) for key, values in key_values.items()} + @staticmethod + def _get_managed_daq_keys(): + # List of tuples [remote key, local key, descriptor] + return [ + ('DataDispatcher.trainStride', 'daqTrainStride', + UInt32(displayedName='DAQ train stride', + unitSymbol=Unit.COUNT, + defaultValue=5, + allowedStates=[State.ACTIVE], + minInc=1)) + ] + async def _inject_managed_keys(self): """Attempt to retrieve the correction device's schema and insert part of it as managed keys. """ correction_device_servers = [ - server for _, server, _, _, _, _ in self.moduleGroups.value] + server for _, server, _, _, _, _, _, _ in self.moduleGroups.value] up_corr_servers = await self._get_servers_in_state( 'up', servers=correction_device_servers) @@ -647,28 +728,67 @@ class CalibrationManager(DeviceClientBase, Device): f'or loadable by device server `{corr_server}`') return - # Collect the keys to be managed and build a nested hash - # expressing its hierarchy, leafs are set to None. + # Collect the keys to be managed including the nodes leading up + # to them. To do this a new hash is constructed with the managed + # key paths, filling in the node gaps in between. managed_keys = set(managed_schema.hash['managedKeys', 'defaultValue']) - managed_tree = Hash(*chain.from_iterable( - zip(managed_keys, repeat(None, len(managed_keys))))) - managed_paths = set(managed_tree.paths()) + managed_paths = set(Hash(*chain.from_iterable( + zip(managed_keys, repeat(None, len(managed_keys))))).paths()) # Reduce the correction schema to the managed paths. managed_hash = managed_schema.hash for path in managed_hash.paths(): if path not in managed_paths: + # Remove unmanaged path. del managed_hash[path] + else: + # Set owner and remote name (identical for corrections). + managed_hash[path, ...].update( + __owner='corrections', __remote=path) # Retrieve any previous values already on running devices in # order to update the defaultValue attribute in the schema just # before injection. - prev_vals = await self._get_shared_keys( + prev_values = await self._get_shared_keys( self._correction_device_ids, managed_keys) - if self._daq_device_ids: - prev_vals.update(await self._get_shared_keys( - self._daq_device_ids, ManagedKeysNode.DAQ_KEYS.keys())) + # Add in managed DAQ keys, if enabled. + if self.daqDevicesPattern: + remote_keys = [] + + for remote_key, local_key, descr in self._get_managed_daq_keys(): + remote_keys.append(remote_key) + _, attrs = descr.toSchemaAndAttrs(None, None) + + managed_hash[local_key] = 0 + managed_hash[local_key, ...].update( + attrs, __owner='daq', __remote=remote_key) + + if self._daq_device_ids: + prev_values.update(await self._get_shared_keys( + self._daq_device_ids, remote_keys)) + + if self.managedKeyConfiguration: + config_name = self.managedKeyConfiguration.value + + # Try to obtain the specified manager configuration to apply + # its values of managed keys on top of the current values. + try: + named_config = (await getConfigurationFromName( + self.deviceId, config_name))['managedKeys'] + except KaraboError as e: + self.logger.warn(f'Failed receiving named manager ' + f'configuration `{config_name}`: {e}') + except KeyError: + self.logger.warn(f'Missing `managedKeys` on named manager ' + f'configuration `{config_name}`') + else: + # Pick out the keys managed now and check for hashable + # values (e.g. excluding slots). + prev_values.update({ + key: named_config[key] + for key in set(named_config.paths()) & managed_keys + if isinstance(named_config[key], Hashable)}) # Retrieve the attributes on the current managed node. The # original implementation of toSchemaAndAttrs in the Node's @@ -678,9 +798,9 @@ class CalibrationManager(DeviceClientBase, Device): # The value are then obtained from the Node object again since # enums are converted to their values by toSchemaAndAttrs, which # in turn is not valid for property definition. - _, attrs = Descriptor.toSchemaAndAttrs(self.__class__.managed, + _, attrs = Descriptor.toSchemaAndAttrs(self.__class__.managedKeys, None, None) - managed_node_attrs = {key: getattr(self.__class__.managed, key) + managed_node_attrs = {key: getattr(self.__class__.managedKeys, key) for key in attrs.keys()} # Build a proxy from the managed schema, and create a new node @@ -693,20 +813,22 @@ class CalibrationManager(DeviceClientBase, Device): # Walk the managed tree to and sanitize all descriptors to our # specifications. def _sanitize_node(parent, tree, prefix=''): - for key, value in tree.items(): + for leaf_key, value in tree.items(): # Fetch the descriptor class, not its instance! - descr = getattr(parent.cls, key) + descr = getattr(parent.cls, leaf_key) - full_key = f'{prefix}.{key}' if prefix else key + local_key = f'{prefix}.{leaf_key}' if prefix else leaf_key + remote_key = managed_hash[local_key, '__remote'] if isinstance(descr, Node): - _sanitize_node(descr, value, full_key) + _sanitize_node(descr, value, local_key) elif isinstance(descr, Slot): - async def _managed_slot_called(parent, fk=full_key): - background(self._call_on_corrections(fk)) + async def _managed_slot_called(parent, + remote_key=remote_key): + background(self._call_on_corrections(remote_key)) - _managed_slot_called.__name__ = f'managed.{full_key}' + _managed_slot_called.__name__ = f'managedKeys.{local_key}' descr.__call__(_managed_slot_called) # Managed slots can only be called in the ACTIVE @@ -720,13 +842,18 @@ class CalibrationManager(DeviceClientBase, Device): # Add a callback only if the original descriptor # is reconfigurable. - async def _managed_prop_changed(parent, v, k=key, - fk=full_key): - setattr(parent, k, v) + async def _managed_prop_changed( + parent, new_value, + leaf_key=leaf_key, + remote_key=remote_key, + setter=getattr(self, '_set_on_' + managed_hash[ + local_key, '__owner']) + ): + setattr(parent, leaf_key, new_value) if self.state != State.INIT: # Do not propagate updates during injection. - background(self._set_on_corrections(fk, v)) + background(setter(remote_key, new_value)) descr.__call__(_managed_prop_changed) @@ -740,17 +867,17 @@ class CalibrationManager(DeviceClientBase, Device): try: # If there's been a previous value before # injection, use it. - descr.defaultValue = prev_vals[full_key] + descr.defaultValue = prev_values[remote_key] except KeyError: pass else: self.logger.warn(f'Encountered unknown descriptor type ' f'{type(descr)}') - _sanitize_node(managed_node, managed_tree) + _sanitize_node(managed_node, managed_hash) # Inject the newly prepared node for managed keys. - self.__class__.managed = managed_node + self.__class__.managedKeys = managed_node await self.publishInjectedParameters() self._managed_keys = managed_keys @@ -1025,6 +1152,30 @@ class CalibrationManager(DeviceClientBase, Device): f'device {device_id}') return True + # Find any applicable key patterns to restore configuration for. + member_str = device_id[device_id.rfind('/')+1:] + key_patterns = [key_pattern for member_pattern, key_pattern + in self._restore_config_patterns.items() + if member_pattern.match(member_str)] + + if key_patterns: + try: + # Try to obtain most recent configuration. + old_config = await getConfigurationFromPast( + device_id, datetime.now().isoformat()) + except KaraboError as e: + self.logger.warn(f'Failed receiving previous configuration ' + f'for {device_id}: {e}') + else: + # Match all keys against the found patterns. + keys = {key for key in old_config.paths() + if any((key_pattern.match(key) + for key_pattern in key_patterns))} + self.logger.debug(f'Keys restored on {device_id}: ' + + ', '.join(keys)) + + config.update({key: old_config[key] for key in keys}) + try: msg = await wait_for(instantiate( server, class_id, device_id, config), 5.0) @@ -1069,7 +1220,7 @@ class CalibrationManager(DeviceClientBase, Device): getattr(self.deviceIds, f'{role}Suffix') # Servers by group and layer. - server_by_group = {group: server for group, server, _, _, _, _ + server_by_group = {group: server for group, server, _, _, _, _, _, _ in self.moduleGroups.value} server_by_layer = {layer: server for layer, _, server in self.previewLayers.value} @@ -1130,8 +1281,8 @@ class CalibrationManager(DeviceClientBase, Device): # Instantiate group matchers and bridges. for row in self.moduleGroups.value: - group, server, with_matcher, with_bridge, bridge_port, \ - bridge_pattern = row + group, server, with_matcher, start_matcher, with_bridge, \ + start_bridge, bridge_port, bridge_pattern = row # Group matcher, if applicable. if with_matcher: @@ -1147,23 +1298,12 @@ class CalibrationManager(DeviceClientBase, Device): 'fsSource', input_source_by_module[vname]) for vname in modules_by_group[group]] - if self.instantiationOptions.restoreMatcherSources: - try: - old_config = await getConfigurationFromPast( - matcher_device_id, datetime.now().isoformat()) - except KaraboError: - pass # Ignore configuration on error - else: - config['channels'] = old_config['channels'] - config['slowSources'] = old_config['slowSources'] - config['fastSources'] = old_config['fastSources'] - if not await self._instantiate_device( server, class_ids['groupMatcher'], matcher_device_id, config ): return - elif self.instantiationOptions.autoActivateGroupMatchers: + elif start_matcher: async def _activate_matcher(device_id): with await getDevice(device_id) as device: await sleep(3) @@ -1188,7 +1328,7 @@ class CalibrationManager(DeviceClientBase, Device): server, class_ids['bridge'], bridge_device_id, config ): return - elif self.instantiationOptions.autoActivateGroupBridges: + elif start_bridge: # Delay the slot a bit since it will get lost during # instantation.