Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
calng
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
calibration
calng
Commits
38e2086c
Commit
38e2086c
authored
3 years ago
by
David Hammer
Browse files
Options
Downloads
Plain Diff
Merge branch 'feat/dynamic-manager' into devel
parents
422e4ad5
042ed810
No related branches found
Branches containing commit
No related tags found
Tags containing commit
1 merge request
!12
Snapshot: field test deployed version as of end of run 202201
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/calng/CalibrationManager.py
+232
-92
232 additions, 92 deletions
src/calng/CalibrationManager.py
with
232 additions
and
92 deletions
src/calng/CalibrationManager.py
+
232
−
92
View file @
38e2086c
...
@@ -14,6 +14,7 @@ from traceback import format_exc
...
@@ -14,6 +14,7 @@ from traceback import format_exc
from
urllib.parse
import
urlparse
from
urllib.parse
import
urlparse
import
json
import
json
import
logging
import
logging
import
re
from
tornado.httpclient
import
AsyncHTTPClient
,
HTTPError
from
tornado.httpclient
import
AsyncHTTPClient
,
HTTPError
from
tornado.platform.asyncio
import
AsyncIOMainLoop
,
to_asyncio_future
from
tornado.platform.asyncio
import
AsyncIOMainLoop
,
to_asyncio_future
...
@@ -26,7 +27,7 @@ from karabo.middlelayer import (
...
@@ -26,7 +27,7 @@ from karabo.middlelayer import (
UInt16
,
UInt32
,
Bool
,
Double
,
Schema
,
String
,
VectorString
,
VectorHash
,
UInt16
,
UInt32
,
Bool
,
Double
,
Schema
,
String
,
VectorString
,
VectorHash
,
background
,
call
,
callNoWait
,
setNoWait
,
sleep
,
instantiate
,
slot
,
coslot
,
background
,
call
,
callNoWait
,
setNoWait
,
sleep
,
instantiate
,
slot
,
coslot
,
getDevice
,
getTopology
,
getConfiguration
,
getConfigurationFromPast
,
getDevice
,
getTopology
,
getConfiguration
,
getConfigurationFromPast
,
get_property
)
getConfigurationFromName
,
get_property
)
from
karabo.middlelayer_api.proxy
import
ProxyFactory
from
karabo.middlelayer_api.proxy
import
ProxyFactory
from
karabo
import
version
as
karaboVersion
from
karabo
import
version
as
karaboVersion
...
@@ -151,10 +152,16 @@ class ModuleGroupRow(Configurable):
...
@@ -151,10 +152,16 @@ class ModuleGroupRow(Configurable):
displayedName
=
'
Device server
'
)
displayedName
=
'
Device server
'
)
withMatcher
=
Bool
(
withMatcher
=
Bool
(
displayedName
=
'
Matcher?
'
)
displayedName
=
'
Include matcher?
'
)
startMatcher
=
Bool
(
displayedName
=
'
Start matcher?
'
)
withBridge
=
Bool
(
withBridge
=
Bool
(
displayedName
=
'
Bridge?
'
)
displayedName
=
'
Include bridge?
'
)
startBridge
=
Bool
(
displayedName
=
'
Start bridge?
'
)
bridgePort
=
UInt16
(
bridgePort
=
UInt16
(
displayedName
=
'
Bridge port
'
,
displayedName
=
'
Bridge port
'
,
...
@@ -228,44 +235,19 @@ class WebserverApiNode(Configurable):
...
@@ -228,44 +235,19 @@ class WebserverApiNode(Configurable):
accessMode
=
AccessMode
.
RECONFIGURABLE
)
accessMode
=
AccessMode
.
RECONFIGURABLE
)
class
InstantiationOptionsNode
(
Configurable
):
class
RestoredConfigurationRow
(
Configurable
):
restoreMatcherSources
=
Bool
(
enabled
=
Bool
(
displayedName
=
'
Restore matcher sources
'
,
displayedName
=
'
Enabled
'
)
description
=
'
Attempt to retrieve and restore the last known
'
'
configuration for slow and fast sources of matcher
'
'
devices when the pipeline is instantiated.
'
,
defaultValue
=
False
,
accessMode
=
AccessMode
.
RECONFIGURABLE
)
autoActivateGroupBridges
=
Bool
(
memberPattern
=
String
(
displayedName
=
'
Activate bridges automatically
'
,
displayedName
=
'
Member pattern
'
)
description
=
'
Whether to activate all group bridges immediately after
'
'
instantation.
'
,
defaultValue
=
False
,
accessMode
=
AccessMode
.
RECONFIGURABLE
)
autoActivateGroupMatchers
=
Bool
(
keyPattern
=
String
(
displayedName
=
'
Activate group matchers automatically
'
,
displayedName
=
'
Key pattern
'
)
description
=
'
Whether to activate all group matchers immediately after
'
'
instantation.
'
,
defaultValue
=
False
,
accessMode
=
AccessMode
.
RECONFIGURABLE
)
class
ManagedKeysNode
(
Configurable
):
class
ManagedKeysNode
(
Configurable
):
# Keys managed on detector DAQ devices.
pass
DAQ_KEYS
=
{
'
DataDispatcher.trainStride
'
:
'
daqTrainStride
'
}
@UInt32
(
displayedName
=
'
DAQ train stride
'
,
unitSymbol
=
Unit
.
COUNT
,
defaultValue
=
5
,
allowedStates
=
[
State
.
ACTIVE
],
minInc
=
1
)
async
def
daqTrainStride
(
self
,
value
):
self
.
daqTrainStride
=
value
background
(
get_instance_parent
(
self
).
_set_on_daq
(
'
DataDispatcher.trainStride
'
,
value
))
class
ManagedKeysCloneFactory
(
ProxyFactory
):
class
ManagedKeysCloneFactory
(
ProxyFactory
):
...
@@ -356,6 +338,16 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -356,6 +338,16 @@ class CalibrationManager(DeviceClientBase, Device):
accessMode
=
AccessMode
.
INITONLY
,
accessMode
=
AccessMode
.
INITONLY
,
assignment
=
Assignment
.
MANDATORY
)
assignment
=
Assignment
.
MANDATORY
)
daqDevicesPattern
=
String
(
displayedName
=
'
DAQ devices pattern
'
,
description
=
'
Regexp pattern for the DAQ data aggregators of this
'
'
detector, the formatting placeholder
'
'
\'
detector_identifier
\'
may be used. Leave empty to
'
'
disable DAQ interaction.
'
,
defaultValue
=
'
^{detector_identifier}/DET/\d*CH0$
'
,
accessMode
=
AccessMode
.
INITONLY
,
assignment
=
Assignment
.
MANDATORY
)
classIds
=
Node
(
classIds
=
Node
(
ClassIdsNode
,
ClassIdsNode
,
displayedName
=
'
Class IDs
'
,
displayedName
=
'
Class IDs
'
,
...
@@ -421,22 +413,35 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -421,22 +413,35 @@ class CalibrationManager(DeviceClientBase, Device):
accessMode
=
AccessMode
.
INITONLY
,
accessMode
=
AccessMode
.
INITONLY
,
assignment
=
Assignment
.
MANDATORY
)
assignment
=
Assignment
.
MANDATORY
)
calcatUrl
=
String
(
displayedName
=
'
CalCat URL
'
,
description
=
'
[NYI] URL to CalCat API to use for constant retrieval,
'
'
set by local secrets file
'
,
accessMode
=
AccessMode
.
READONLY
)
webserverApi
=
Node
(
webserverApi
=
Node
(
WebserverApiNode
,
WebserverApiNode
,
displayedName
=
'
Webserver API
'
,
displayedName
=
'
Webserver API
'
,
description
=
'
Configurations for the webserver API to control device
'
description
=
'
Configurations for the webserver API to control device
'
'
servers.
'
)
'
servers.
'
)
instantiationOptions
=
Node
(
managedKeyConfiguration
=
String
(
InstantiationOptionsNode
,
displayedName
=
'
Managed key configuration
'
,
displayedName
=
'
Instantiation options
'
,
description
=
'
Name of manager device configuration to apply to
'
description
=
'
Optional flags controlling the pipeline instantiation.
'
)
'
managed keys on startup, leave empty to disable.
'
,
defaultValue
=
''
,
accessMode
=
AccessMode
.
INITONLY
)
@VectorHash
(
displayedName
=
'
Restored configurations
'
,
rows
=
RestoredConfigurationRow
,
description
=
'
Regexp patterns matched against instantiated devices
'
'
to selectively restore the most previous configuration.
'
,
defaultValue
=
[
Hash
(
'
enabled
'
,
False
,
'
memberPattern
'
,
'
MATCH_G\d
'
,
'
keyPattern
'
,
'
^(channels|fastSources|slowSources)
'
)],
accessMode
=
AccessMode
.
RECONFIGURABLE
)
async
def
restoredConfigurations
(
self
,
new_configs
):
self
.
restoredConfigurations
=
new_configs
self
.
_restore_config_patterns
=
{
re
.
compile
(
member_pattern
):
re
.
compile
(
key_pattern
)
for
enabled
,
member_pattern
,
key_pattern
in
new_configs
.
value
if
enabled
}
doNotCompressEvents
=
Bool
(
doNotCompressEvents
=
Bool
(
requiredAccessLevel
=
AccessLevel
.
GOD
,
requiredAccessLevel
=
AccessLevel
.
GOD
,
...
@@ -459,6 +464,20 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -459,6 +464,20 @@ class CalibrationManager(DeviceClientBase, Device):
self
.
state
=
State
.
CHANGING
self
.
state
=
State
.
CHANGING
background
(
self
.
_instantiate_pipeline
())
background
(
self
.
_instantiate_pipeline
())
managedDevices
=
VectorString
(
displayedName
=
'
Managed devices
'
,
description
=
'
List of currently managed devices.
'
,
defaultValue
=
[],
accessMode
=
AccessMode
.
READONLY
)
@Slot
(
displayedName
=
'
Discover managed devices
'
,
description
=
''
,
allowedStates
=
[
State
.
ACTIVE
])
async
def
discoverManagedDevices
(
self
):
self
.
state
=
State
.
CHANGING
background
(
self
.
_discover_managed_devices
())
@Slot
(
@Slot
(
displayedName
=
'
Apply managed values
'
,
displayedName
=
'
Apply managed values
'
,
description
=
'
Set all managed keys to the values currently active on
'
description
=
'
Set all managed keys to the values currently active on
'
...
@@ -467,7 +486,7 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -467,7 +486,7 @@ class CalibrationManager(DeviceClientBase, Device):
async
def
applyManagedValues
(
self
):
async
def
applyManagedValues
(
self
):
background
(
self
.
_apply_managed_values
())
background
(
self
.
_apply_managed_values
())
managed
=
Node
(
managed
Keys
=
Node
(
ManagedKeysNode
,
ManagedKeysNode
,
displayedName
=
'
Managed keys
'
,
displayedName
=
'
Managed keys
'
,
description
=
'
Properties and slots managed on devices in the pipeline.
'
)
description
=
'
Properties and slots managed on devices in the pipeline.
'
)
...
@@ -483,6 +502,15 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -483,6 +502,15 @@ class CalibrationManager(DeviceClientBase, Device):
self
.
_correction_class_id
=
self
.
classIds
.
correctionClass
.
format
(
self
.
_correction_class_id
=
self
.
classIds
.
correctionClass
.
format
(
self
.
detectorType
.
value
.
lower
().
capitalize
())
self
.
detectorType
.
value
.
lower
().
capitalize
())
# Concretized pattern for device IDs of data aggregators.
if
self
.
daqDevicesPattern
:
# Match function of compiled pattern.
self
.
_is_daq_device
=
re
.
compile
(
self
.
daqDevicesPattern
.
format
(
detector_identifier
=
self
.
detectorIdentifier
.
value
)).
match
else
:
# Always False.
self
.
_is_daq_device
=
lambda
x
:
False
# Set of data aggregators associated with the managed detector.
# Set of data aggregators associated with the managed detector.
self
.
_daq_device_ids
=
set
()
self
.
_daq_device_ids
=
set
()
...
@@ -494,6 +522,13 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -494,6 +522,13 @@ class CalibrationManager(DeviceClientBase, Device):
# in the same domain and having the specified class ID.
# in the same domain and having the specified class ID.
self
.
_correction_device_ids
=
set
()
self
.
_correction_device_ids
=
set
()
# Mapping of device ID pattern to key pattern to selectively
# restore past configuration for.
self
.
_restore_config_patterns
=
dict
()
# Task object to update managed devices list.
self
.
_managed_devices_updater
=
None
async
def
onInitialization
(
self
):
async
def
onInitialization
(
self
):
self
.
state
=
State
.
INIT
self
.
state
=
State
.
INIT
...
@@ -512,6 +547,7 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -512,6 +547,7 @@ class CalibrationManager(DeviceClientBase, Device):
if
info
[
'
type
'
]
==
'
device
'
:
if
info
[
'
type
'
]
==
'
device
'
:
self
.
_check_new_device
(
instance_id
,
info
[
'
classId
'
])
self
.
_check_new_device
(
instance_id
,
info
[
'
classId
'
])
self
.
_update_managed_devices
()
@slot
@slot
def
slotInstanceGone
(
self
,
instance_id
,
info
):
def
slotInstanceGone
(
self
,
instance_id
,
info
):
...
@@ -523,6 +559,7 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -523,6 +559,7 @@ class CalibrationManager(DeviceClientBase, Device):
self
.
_daq_device_ids
.
discard
(
instance_id
)
self
.
_daq_device_ids
.
discard
(
instance_id
)
self
.
_domain_device_ids
.
discard
(
instance_id
)
self
.
_domain_device_ids
.
discard
(
instance_id
)
self
.
_correction_device_ids
.
discard
(
instance_id
)
self
.
_correction_device_ids
.
discard
(
instance_id
)
self
.
_update_managed_devices
()
async
def
_async_init
(
self
):
async
def
_async_init
(
self
):
# Populate the device ID sets with what's out there right now.
# Populate the device ID sets with what's out there right now.
...
@@ -546,15 +583,17 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -546,15 +583,17 @@ class CalibrationManager(DeviceClientBase, Device):
self
.
state
=
State
.
ACTIVE
self
.
state
=
State
.
ACTIVE
def
_check_new_device
(
self
,
device_id
,
class_id
):
def
_check_new_device
(
self
,
device_id
,
class_id
):
if
class_id
==
'
DataAggregator
'
and
\
if
class_id
==
'
DataAggregator
'
and
self
.
_is_daq_device
(
device_id
):
device_id
.
startswith
(
self
.
detectorIdentifier
.
value
):
# This device is a data aggregator belonging to the detector
# This device is a data aggregator belonging to the detector
# installation
# installation
self
.
_daq_device_ids
.
add
(
device_id
)
self
.
_daq_device_ids
.
add
(
device_id
)
elif
device_id
.
startswith
(
self
.
_device_id_root
):
elif
device_id
.
startswith
(
self
.
_device_id_root
):
# This device lives under the same device ID root as this
# This device lives under the same device ID root as this
# manager instance.
# manager instance, but don't add yourself!
if
device_id
==
self
.
deviceId
:
return
self
.
_domain_device_ids
.
add
(
device_id
)
self
.
_domain_device_ids
.
add
(
device_id
)
if
class_id
==
self
.
_correction_class_id
:
if
class_id
==
self
.
_correction_class_id
:
...
@@ -582,6 +621,36 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -582,6 +621,36 @@ class CalibrationManager(DeviceClientBase, Device):
for
device_id
in
devices
:
for
device_id
in
devices
:
self
.
_check_new_device
(
device_id
,
devices
[
device_id
,
'
classId
'
])
self
.
_check_new_device
(
device_id
,
devices
[
device_id
,
'
classId
'
])
self
.
_update_managed_devices
(
True
)
async
def
_discover_managed_devices
(
self
):
self
.
_daq_device_ids
.
clear
()
self
.
_domain_device_ids
.
clear
()
self
.
_correction_device_ids
.
clear
()
await
self
.
_check_topology
()
self
.
state
=
State
.
ACTIVE
async
def
_delayed_managed_devices_update
(
self
):
await
sleep
(
1.0
)
# Throttle updates to at most once a second.
self
.
managedDevices
=
sorted
(
self
.
_domain_device_ids
|
self
.
_daq_device_ids
)
self
.
_managed_devices_updater
=
None
# Clear task again.
def
_update_managed_devices
(
self
,
forced
=
False
):
if
self
.
_managed_devices_updater
is
not
None
:
# Update already in progress, ignore.
return
all_managed_devices
=
self
.
_domain_device_ids
|
self
.
_daq_device_ids
if
forced
or
len
(
all_managed_devices
)
!=
len
(
self
.
managedDevices
):
# Trigger an update either if forced or the number of
# devices changed.
self
.
_managed_devices_updater
=
background
(
self
.
_delayed_managed_devices_update
())
async
def
_get_shared_keys
(
self
,
device_ids
,
keys
):
async
def
_get_shared_keys
(
self
,
device_ids
,
keys
):
"""
Find the most common property values on devices.
"""
"""
Find the most common property values on devices.
"""
...
@@ -607,13 +676,25 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -607,13 +676,25 @@ class CalibrationManager(DeviceClientBase, Device):
return
{
key
:
max
(
set
(
values
),
key
=
values
.
count
)
for
key
,
values
return
{
key
:
max
(
set
(
values
),
key
=
values
.
count
)
for
key
,
values
in
key_values
.
items
()}
in
key_values
.
items
()}
@staticmethod
def
_get_managed_daq_keys
():
# List of tuples [remote key, local key, descriptor]
return
[
(
'
DataDispatcher.trainStride
'
,
'
daqTrainStride
'
,
UInt32
(
displayedName
=
'
DAQ train stride
'
,
unitSymbol
=
Unit
.
COUNT
,
defaultValue
=
5
,
allowedStates
=
[
State
.
ACTIVE
],
minInc
=
1
))
]
async
def
_inject_managed_keys
(
self
):
async
def
_inject_managed_keys
(
self
):
"""
Attempt to retrieve the correction device
'
s schema and insert
"""
Attempt to retrieve the correction device
'
s schema and insert
part of it as managed keys.
part of it as managed keys.
"""
"""
correction_device_servers
=
[
correction_device_servers
=
[
server
for
_
,
server
,
_
,
_
,
_
,
_
in
self
.
moduleGroups
.
value
]
server
for
_
,
server
,
_
,
_
,
_
,
_
,
_
,
_
in
self
.
moduleGroups
.
value
]
up_corr_servers
=
await
self
.
_get_servers_in_state
(
up_corr_servers
=
await
self
.
_get_servers_in_state
(
'
up
'
,
servers
=
correction_device_servers
)
'
up
'
,
servers
=
correction_device_servers
)
...
@@ -647,28 +728,67 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -647,28 +728,67 @@ class CalibrationManager(DeviceClientBase, Device):
f
'
or loadable by device server `
{
corr_server
}
`
'
)
f
'
or loadable by device server `
{
corr_server
}
`
'
)
return
return
# Collect the keys to be managed and build a nested hash
# Collect the keys to be managed including the nodes leading up
# expressing its hierarchy, leafs are set to None.
# to them. To do this a new hash is constructed with the managed
# key paths, filling in the node gaps in between.
managed_keys
=
set
(
managed_schema
.
hash
[
'
managedKeys
'
,
'
defaultValue
'
])
managed_keys
=
set
(
managed_schema
.
hash
[
'
managedKeys
'
,
'
defaultValue
'
])
managed_tree
=
Hash
(
*
chain
.
from_iterable
(
managed_paths
=
set
(
Hash
(
*
chain
.
from_iterable
(
zip
(
managed_keys
,
repeat
(
None
,
len
(
managed_keys
)))))
zip
(
managed_keys
,
repeat
(
None
,
len
(
managed_keys
))))).
paths
())
managed_paths
=
set
(
managed_tree
.
paths
())
# Reduce the correction schema to the managed paths.
# Reduce the correction schema to the managed paths.
managed_hash
=
managed_schema
.
hash
managed_hash
=
managed_schema
.
hash
for
path
in
managed_hash
.
paths
():
for
path
in
managed_hash
.
paths
():
if
path
not
in
managed_paths
:
if
path
not
in
managed_paths
:
# Remove unmanaged path.
del
managed_hash
[
path
]
del
managed_hash
[
path
]
else
:
# Set owner and remote name (identical for corrections).
managed_hash
[
path
,
...].
update
(
__owner
=
'
corrections
'
,
__remote
=
path
)
# Retrieve any previous values already on running devices in
# Retrieve any previous values already on running devices in
# order to update the defaultValue attribute in the schema just
# order to update the defaultValue attribute in the schema just
# before injection.
# before injection.
prev_vals
=
await
self
.
_get_shared_keys
(
prev_val
ue
s
=
await
self
.
_get_shared_keys
(
self
.
_correction_device_ids
,
managed_keys
)
self
.
_correction_device_ids
,
managed_keys
)
if
self
.
_daq_device_ids
:
# Add in managed DAQ keys, if enabled.
prev_vals
.
update
(
await
self
.
_get_shared_keys
(
if
self
.
daqDevicesPattern
:
self
.
_daq_device_ids
,
ManagedKeysNode
.
DAQ_KEYS
.
keys
()))
remote_keys
=
[]
for
remote_key
,
local_key
,
descr
in
self
.
_get_managed_daq_keys
():
remote_keys
.
append
(
remote_key
)
_
,
attrs
=
descr
.
toSchemaAndAttrs
(
None
,
None
)
managed_hash
[
local_key
]
=
0
managed_hash
[
local_key
,
...].
update
(
attrs
,
__owner
=
'
daq
'
,
__remote
=
remote_key
)
if
self
.
_daq_device_ids
:
prev_values
.
update
(
await
self
.
_get_shared_keys
(
self
.
_daq_device_ids
,
remote_keys
))
if
self
.
managedKeyConfiguration
:
config_name
=
self
.
managedKeyConfiguration
.
value
# Try to obtain the specified manager configuration to apply
# its values of managed keys on top of the current values.
try
:
named_config
=
(
await
getConfigurationFromName
(
self
.
deviceId
,
config_name
))[
'
managedKeys
'
]
except
KaraboError
as
e
:
self
.
logger
.
warn
(
f
'
Failed receiving named manager
'
f
'
configuration `
{
config_name
}
`:
{
e
}
'
)
except
KeyError
:
self
.
logger
.
warn
(
f
'
Missing `managedKeys` on named manager
'
f
'
configuration `
{
config_name
}
`
'
)
else
:
# Pick out the keys managed now and check for hashable
# values (e.g. excluding slots).
prev_values
.
update
({
key
:
named_config
[
key
]
for
key
in
set
(
named_config
.
paths
())
&
managed_keys
if
isinstance
(
named_config
[
key
],
Hashable
)})
# Retrieve the attributes on the current managed node. The
# Retrieve the attributes on the current managed node. The
# original implementation of toSchemaAndAttrs in the Node's
# original implementation of toSchemaAndAttrs in the Node's
...
@@ -678,9 +798,9 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -678,9 +798,9 @@ class CalibrationManager(DeviceClientBase, Device):
# The value are then obtained from the Node object again since
# The value are then obtained from the Node object again since
# enums are converted to their values by toSchemaAndAttrs, which
# enums are converted to their values by toSchemaAndAttrs, which
# in turn is not valid for property definition.
# in turn is not valid for property definition.
_
,
attrs
=
Descriptor
.
toSchemaAndAttrs
(
self
.
__class__
.
managed
,
_
,
attrs
=
Descriptor
.
toSchemaAndAttrs
(
self
.
__class__
.
managed
Keys
,
None
,
None
)
None
,
None
)
managed_node_attrs
=
{
key
:
getattr
(
self
.
__class__
.
managed
,
key
)
managed_node_attrs
=
{
key
:
getattr
(
self
.
__class__
.
managed
Keys
,
key
)
for
key
in
attrs
.
keys
()}
for
key
in
attrs
.
keys
()}
# Build a proxy from the managed schema, and create a new node
# Build a proxy from the managed schema, and create a new node
...
@@ -693,20 +813,22 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -693,20 +813,22 @@ class CalibrationManager(DeviceClientBase, Device):
# Walk the managed tree to and sanitize all descriptors to our
# Walk the managed tree to and sanitize all descriptors to our
# specifications.
# specifications.
def
_sanitize_node
(
parent
,
tree
,
prefix
=
''
):
def
_sanitize_node
(
parent
,
tree
,
prefix
=
''
):
for
key
,
value
in
tree
.
items
():
for
leaf_
key
,
value
in
tree
.
items
():
# Fetch the descriptor class, not its instance!
# Fetch the descriptor class, not its instance!
descr
=
getattr
(
parent
.
cls
,
key
)
descr
=
getattr
(
parent
.
cls
,
leaf_
key
)
full_key
=
f
'
{
prefix
}
.
{
key
}
'
if
prefix
else
key
local_key
=
f
'
{
prefix
}
.
{
leaf_key
}
'
if
prefix
else
leaf_key
remote_key
=
managed_hash
[
local_key
,
'
__remote
'
]
if
isinstance
(
descr
,
Node
):
if
isinstance
(
descr
,
Node
):
_sanitize_node
(
descr
,
value
,
ful
l_key
)
_sanitize_node
(
descr
,
value
,
loca
l_key
)
elif
isinstance
(
descr
,
Slot
):
elif
isinstance
(
descr
,
Slot
):
async
def
_managed_slot_called
(
parent
,
fk
=
full_key
):
async
def
_managed_slot_called
(
parent
,
background
(
self
.
_call_on_corrections
(
fk
))
remote_key
=
remote_key
):
background
(
self
.
_call_on_corrections
(
remote_key
))
_managed_slot_called
.
__name__
=
f
'
managed
.
{
ful
l_key
}
'
_managed_slot_called
.
__name__
=
f
'
managed
Keys.
{
loca
l_key
}
'
descr
.
__call__
(
_managed_slot_called
)
descr
.
__call__
(
_managed_slot_called
)
# Managed slots can only be called in the ACTIVE
# Managed slots can only be called in the ACTIVE
...
@@ -720,13 +842,18 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -720,13 +842,18 @@ class CalibrationManager(DeviceClientBase, Device):
# Add a callback only if the original descriptor
# Add a callback only if the original descriptor
# is reconfigurable.
# is reconfigurable.
async
def
_managed_prop_changed
(
parent
,
v
,
k
=
key
,
async
def
_managed_prop_changed
(
fk
=
full_key
):
parent
,
new_value
,
setattr
(
parent
,
k
,
v
)
leaf_key
=
leaf_key
,
remote_key
=
remote_key
,
setter
=
getattr
(
self
,
'
_set_on_
'
+
managed_hash
[
local_key
,
'
__owner
'
])
):
setattr
(
parent
,
leaf_key
,
new_value
)
if
self
.
state
!=
State
.
INIT
:
if
self
.
state
!=
State
.
INIT
:
# Do not propagate updates during injection.
# Do not propagate updates during injection.
background
(
se
lf
.
_set_on_corrections
(
fk
,
v
))
background
(
se
tter
(
remote_key
,
new_value
))
descr
.
__call__
(
_managed_prop_changed
)
descr
.
__call__
(
_managed_prop_changed
)
...
@@ -740,17 +867,17 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -740,17 +867,17 @@ class CalibrationManager(DeviceClientBase, Device):
try
:
try
:
# If there's been a previous value before
# If there's been a previous value before
# injection, use it.
# injection, use it.
descr
.
defaultValue
=
prev_val
s
[
full
_key
]
descr
.
defaultValue
=
prev_val
ues
[
remote
_key
]
except
KeyError
:
except
KeyError
:
pass
pass
else
:
else
:
self
.
logger
.
warn
(
f
'
Encountered unknown descriptor type
'
self
.
logger
.
warn
(
f
'
Encountered unknown descriptor type
'
f
'
{
type
(
descr
)
}
'
)
f
'
{
type
(
descr
)
}
'
)
_sanitize_node
(
managed_node
,
managed_
tree
)
_sanitize_node
(
managed_node
,
managed_
hash
)
# Inject the newly prepared node for managed keys.
# Inject the newly prepared node for managed keys.
self
.
__class__
.
managed
=
managed_node
self
.
__class__
.
managed
Keys
=
managed_node
await
self
.
publishInjectedParameters
()
await
self
.
publishInjectedParameters
()
self
.
_managed_keys
=
managed_keys
self
.
_managed_keys
=
managed_keys
...
@@ -1025,6 +1152,30 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -1025,6 +1152,30 @@ class CalibrationManager(DeviceClientBase, Device):
f
'
device
{
device_id
}
'
)
f
'
device
{
device_id
}
'
)
return
True
return
True
# Find any applicable key patterns to restore configuration for.
member_str
=
device_id
[
device_id
.
rfind
(
'
/
'
)
+
1
:]
key_patterns
=
[
key_pattern
for
member_pattern
,
key_pattern
in
self
.
_restore_config_patterns
.
items
()
if
member_pattern
.
match
(
member_str
)]
if
key_patterns
:
try
:
# Try to obtain most recent configuration.
old_config
=
await
getConfigurationFromPast
(
device_id
,
datetime
.
now
().
isoformat
())
except
KaraboError
as
e
:
self
.
logger
.
warn
(
f
'
Failed receiving previous configuration
'
f
'
for
{
device_id
}
:
{
e
}
'
)
else
:
# Match all keys against the found patterns.
keys
=
{
key
for
key
in
old_config
.
paths
()
if
any
((
key_pattern
.
match
(
key
)
for
key_pattern
in
key_patterns
))}
self
.
logger
.
debug
(
f
'
Keys restored on
{
device_id
}
:
'
+
'
,
'
.
join
(
keys
))
config
.
update
({
key
:
old_config
[
key
]
for
key
in
keys
})
try
:
try
:
msg
=
await
wait_for
(
instantiate
(
msg
=
await
wait_for
(
instantiate
(
server
,
class_id
,
device_id
,
config
),
5.0
)
server
,
class_id
,
device_id
,
config
),
5.0
)
...
@@ -1069,7 +1220,7 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -1069,7 +1220,7 @@ class CalibrationManager(DeviceClientBase, Device):
getattr
(
self
.
deviceIds
,
f
'
{
role
}
Suffix
'
)
getattr
(
self
.
deviceIds
,
f
'
{
role
}
Suffix
'
)
# Servers by group and layer.
# Servers by group and layer.
server_by_group
=
{
group
:
server
for
group
,
server
,
_
,
_
,
_
,
_
server_by_group
=
{
group
:
server
for
group
,
server
,
_
,
_
,
_
,
_
,
_
,
_
in
self
.
moduleGroups
.
value
}
in
self
.
moduleGroups
.
value
}
server_by_layer
=
{
layer
:
server
for
layer
,
_
,
server
server_by_layer
=
{
layer
:
server
for
layer
,
_
,
server
in
self
.
previewLayers
.
value
}
in
self
.
previewLayers
.
value
}
...
@@ -1130,8 +1281,8 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -1130,8 +1281,8 @@ class CalibrationManager(DeviceClientBase, Device):
# Instantiate group matchers and bridges.
# Instantiate group matchers and bridges.
for
row
in
self
.
moduleGroups
.
value
:
for
row
in
self
.
moduleGroups
.
value
:
group
,
server
,
with_matcher
,
with_bridge
,
bridge
_port
,
\
group
,
server
,
with_matcher
,
start_matcher
,
with_
bridge
,
\
bridge_pattern
=
row
start_bridge
,
bridge_port
,
bridge_pattern
=
row
# Group matcher, if applicable.
# Group matcher, if applicable.
if
with_matcher
:
if
with_matcher
:
...
@@ -1147,23 +1298,12 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -1147,23 +1298,12 @@ class CalibrationManager(DeviceClientBase, Device):
'
fsSource
'
,
input_source_by_module
[
vname
])
'
fsSource
'
,
input_source_by_module
[
vname
])
for
vname
in
modules_by_group
[
group
]]
for
vname
in
modules_by_group
[
group
]]
if
self
.
instantiationOptions
.
restoreMatcherSources
:
try
:
old_config
=
await
getConfigurationFromPast
(
matcher_device_id
,
datetime
.
now
().
isoformat
())
except
KaraboError
:
pass
# Ignore configuration on error
else
:
config
[
'
channels
'
]
=
old_config
[
'
channels
'
]
config
[
'
slowSources
'
]
=
old_config
[
'
slowSources
'
]
config
[
'
fastSources
'
]
=
old_config
[
'
fastSources
'
]
if
not
await
self
.
_instantiate_device
(
if
not
await
self
.
_instantiate_device
(
server
,
class_ids
[
'
groupMatcher
'
],
server
,
class_ids
[
'
groupMatcher
'
],
matcher_device_id
,
config
matcher_device_id
,
config
):
):
return
return
elif
s
elf
.
instantiationOptions
.
autoActivateGroupM
atcher
s
:
elif
s
tart_m
atcher
:
async
def
_activate_matcher
(
device_id
):
async
def
_activate_matcher
(
device_id
):
with
await
getDevice
(
device_id
)
as
device
:
with
await
getDevice
(
device_id
)
as
device
:
await
sleep
(
3
)
await
sleep
(
3
)
...
@@ -1188,7 +1328,7 @@ class CalibrationManager(DeviceClientBase, Device):
...
@@ -1188,7 +1328,7 @@ class CalibrationManager(DeviceClientBase, Device):
server
,
class_ids
[
'
bridge
'
],
bridge_device_id
,
config
server
,
class_ids
[
'
bridge
'
],
bridge_device_id
,
config
):
):
return
return
elif
s
elf
.
instantiationOptions
.
autoActivateGroupB
ridge
s
:
elif
s
tart_b
ridge
:
# Delay the slot a bit since it will get lost during
# Delay the slot a bit since it will get lost during
# instantation.
# instantation.
...
...
This diff is collapsed.
Click to expand it.
David Hammer
@hammerd
mentioned in commit
3566342d
·
2 years ago
mentioned in commit
3566342d
mentioned in commit 3566342d031db1bb435b6430f100afdbaaa6c4bf
Toggle commit list
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment