diff --git a/src/calng/CalibrationManager.py b/src/calng/CalibrationManager.py index 7fc7cd61dceb10ad09ca8d952053a609f9818d4f..0e61873329082b90ecb3b74b04c14344abc62ca8 100644 --- a/src/calng/CalibrationManager.py +++ b/src/calng/CalibrationManager.py @@ -14,6 +14,7 @@ from traceback import format_exc from urllib.parse import urlparse import json import logging +import re from tornado.httpclient import AsyncHTTPClient, HTTPError from tornado.platform.asyncio import AsyncIOMainLoop, to_asyncio_future @@ -268,19 +269,7 @@ class InstantiationOptionsNode(Configurable): class ManagedKeysNode(Configurable): - # Keys managed on detector DAQ devices. - DAQ_KEYS = {'DataDispatcher.trainStride': 'daqTrainStride'} - - @UInt32( - displayedName='DAQ train stride', - unitSymbol=Unit.COUNT, - defaultValue=5, - allowedStates=[State.ACTIVE], - minInc=1) - async def daqTrainStride(self, value): - self.daqTrainStride = value - background(get_instance_parent(self)._set_on_daq( - 'DataDispatcher.trainStride', value)) + pass class ManagedKeysCloneFactory(ProxyFactory): @@ -317,6 +306,16 @@ class CalibrationManager(DeviceClientBase, Device): accessMode=AccessMode.INITONLY, assignment=Assignment.MANDATORY) + daqDevicesPattern = String( + displayedName='DAQ devices pattern', + description='Regexp pattern for the DAQ data aggregators of this ' + 'detector, the formatting placeholder ' + '\'detector_identifier\' may be used. Leave empty to ' + 'disable DAQ interaction.', + defaultValue='^{detector_identifier}/DET/\d*CH0$', + accessMode=AccessMode.INITONLY, + assignment=Assignment.MANDATORY) + classIds = Node( ClassIdsNode, displayedName='Class IDs', @@ -437,6 +436,15 @@ class CalibrationManager(DeviceClientBase, Device): self._correction_class_id = self.classIds.correctionClass.format( self.detectorType.value.lower().capitalize()) + # Concretized pattern for device IDs of data aggregators. + if self.daqDevicesPattern: + # Match function of compiled pattern. + self._is_daq_device = re.compile(self.daqDevicesPattern.format( + detector_identifier=self.detectorIdentifier.value)).match + else: + # Always False. + self._is_daq_device = lambda x: False + # Set of data aggregators associated with the managed detector. self._daq_device_ids = set() @@ -500,8 +508,7 @@ class CalibrationManager(DeviceClientBase, Device): self.state = State.ACTIVE def _check_new_device(self, device_id, class_id): - if class_id == 'DataAggregator' and \ - device_id.startswith(self.detectorIdentifier.value): + if class_id == 'DataAggregator' and self._is_daq_device(device_id): # This device is a data aggregator belonging to the detector # installation self._daq_device_ids.add(device_id) @@ -561,6 +568,18 @@ class CalibrationManager(DeviceClientBase, Device): return {key: max(set(values), key=values.count) for key, values in key_values.items()} + @staticmethod + def _get_managed_daq_keys(): + # List of tuples [remote key, local key, descriptor] + return [ + ('DataDispatcher.trainStride', 'daqTrainStride', + UInt32(displayedName='DAQ train stride', + unitSymbol=Unit.COUNT, + defaultValue=5, + allowedStates=[State.ACTIVE], + minInc=1)) + ] + async def _inject_managed_keys(self): """Attempt to retrieve the correction device's schema and insert part of it as managed keys. @@ -598,28 +617,45 @@ class CalibrationManager(DeviceClientBase, Device): f'or loadable by device server `{corr_server}`') return - # Collect the keys to be managed and build a nested hash - # expressing its hierarchy, leafs are set to None. + # Collect the keys to be managed including the nodes leading up + # to them. To do this a new hash is constructed with the managed + # key paths, filling in the node gaps in between. managed_keys = set(managed_schema.hash['managedKeys', 'defaultValue']) - managed_tree = Hash(*chain.from_iterable( - zip(managed_keys, repeat(None, len(managed_keys))))) - managed_paths = set(managed_tree.paths()) + managed_paths = set(Hash(*chain.from_iterable( + zip(managed_keys, repeat(None, len(managed_keys))))).paths()) # Reduce the correction schema to the managed paths. managed_hash = managed_schema.hash for path in managed_hash.paths(): if path not in managed_paths: + # Remove unmanaged path. del managed_hash[path] + else: + # Set owner and remote name (identical for corrections). + managed_hash[path, '__owner'] = 'corrections' + managed_hash[path, '__remote'] = path # Retrieve any previous values already on running devices in # order to update the defaultValue attribute in the schema just # before injection. - prev_vals = await self._get_shared_keys( + prev_values = await self._get_shared_keys( self._correction_device_ids, managed_keys) - if self._daq_device_ids: - prev_vals.update(await self._get_shared_keys( - self._daq_device_ids, ManagedKeysNode.DAQ_KEYS.keys())) + # Add in managed DAQ keys, if enabled. + if self.daqDevicesPattern: + remote_keys = [] + + for remote_key, local_key, descr in self._get_managed_daq_keys(): + remote_keys.append(remote_key) + _, attrs = descr.toSchemaAndAttrs(None, None) + + managed_hash[local_key] = 0 + managed_hash[local_key, ...].update( + attrs, __owner='daq', __remote=remote_key) + + if self._daq_device_ids: + prev_values.update(await self._get_shared_keys( + self._daq_device_ids, remote_keys)) # Retrieve the attributes on the current managed node. The # original implementation of toSchemaAndAttrs in the Node's @@ -644,20 +680,22 @@ class CalibrationManager(DeviceClientBase, Device): # Walk the managed tree to and sanitize all descriptors to our # specifications. def _sanitize_node(parent, tree, prefix=''): - for key, value in tree.items(): + for leaf_key, value in tree.items(): # Fetch the descriptor class, not its instance! - descr = getattr(parent.cls, key) + descr = getattr(parent.cls, leaf_key) - full_key = f'{prefix}.{key}' if prefix else key + local_key = f'{prefix}.{leaf_key}' if prefix else leaf_key + remote_key = managed_hash[local_key, '__remote'] if isinstance(descr, Node): - _sanitize_node(descr, value, full_key) + _sanitize_node(descr, value, local_key) elif isinstance(descr, Slot): - async def _managed_slot_called(parent, fk=full_key): - background(self._call_on_corrections(fk)) + async def _managed_slot_called(parent, + remote_key=remote_key): + background(self._call_on_corrections(remote_key)) - _managed_slot_called.__name__ = f'managed.{full_key}' + _managed_slot_called.__name__ = f'managed.{local_key}' descr.__call__(_managed_slot_called) # Managed slots can only be called in the ACTIVE @@ -671,13 +709,18 @@ class CalibrationManager(DeviceClientBase, Device): # Add a callback only if the original descriptor # is reconfigurable. - async def _managed_prop_changed(parent, v, k=key, - fk=full_key): - setattr(parent, k, v) + async def _managed_prop_changed( + parent, new_value, + leaf_key=leaf_key, + remote_key=remote_key, + setter=getattr(self, '_set_on_' + managed_hash[ + local_key, '__owner']) + ): + setattr(parent, leaf_key, new_value) if self.state != State.INIT: # Do not propagate updates during injection. - background(self._set_on_corrections(fk, v)) + background(setter(remote_key, new_value)) descr.__call__(_managed_prop_changed) @@ -691,14 +734,14 @@ class CalibrationManager(DeviceClientBase, Device): try: # If there's been a previous value before # injection, use it. - descr.defaultValue = prev_vals[full_key] + descr.defaultValue = prev_values[remote_key] except KeyError: pass else: self.logger.warn(f'Encountered unknown descriptor type ' f'{type(descr)}') - _sanitize_node(managed_node, managed_tree) + _sanitize_node(managed_node, managed_hash) # Inject the newly prepared node for managed keys. self.__class__.managed = managed_node