Skip to content
Snippets Groups Projects
Commit 45cb6810 authored by David Hammer's avatar David Hammer
Browse files

PickyBoi goes to another repository

parent 02aebda9
No related branches found
No related tags found
1 merge request!53Train picker arbiter kernel
......@@ -39,7 +39,6 @@ setup(
"DetectorAssembler = calng.DetectorAssembler:DetectorAssembler",
"Gotthard2Assembler = calng.Gotthard2Assembler:Gotthard2Assembler",
"LpdminiSplitter = calng.LpdminiSplitter:LpdminiSplitter",
"PickyBoi = calng.PickyBoi:PickyBoi",
],
"karabo.middlelayer_device": [
"CalibrationManager = calng.CalibrationManager:CalibrationManager",
......
import numpy as np
from karabo.bound import (
BOOL_ELEMENT,
DOUBLE_ELEMENT,
FLOAT_ELEMENT,
IMAGEDATA_ELEMENT,
INPUT_CHANNEL,
INT32_ELEMENT,
INT64_ELEMENT,
KARABO_CLASSINFO,
NDARRAY_ELEMENT,
NODE_ELEMENT,
OUTPUT_CHANNEL,
OVERWRITE_ELEMENT,
SLOT_ELEMENT,
STRING_ELEMENT,
UINT32_ELEMENT,
UINT64_ELEMENT,
VECTOR_BOOL_ELEMENT,
VECTOR_STRING_ELEMENT,
ChannelMetaData,
Epochstamp,
ImageData,
Hash,
PythonDevice,
Schema,
State,
Timestamp,
Trainstamp,
Types,
Unit,
)
from . import utils
from ._version import version as deviceVersion
@KARABO_CLASSINFO("PickyBoi", deviceVersion)
class PickyBoi(PythonDevice):
@staticmethod
def expectedParameters(expected):
(
OVERWRITE_ELEMENT(expected)
.key("state")
.setNewOptions(
State.INIT,
State.MONITORING, # waiting for picked train(s)
State.ACQUIRING, # currently getting picked trains
State.PASSIVE, # after getting picked trains with no new pick set
State.ERROR, # missed the train
)
.setNewDefaultValue(State.PASSIVE)
.commit(),
STRING_ELEMENT(expected)
.key("ppuFollowingState")
.setSpecialDisplayType("State")
.readOnly()
.initialValue("OFF")
.commit(),
INPUT_CHANNEL(expected)
.key("input")
.commit(),
UINT64_ELEMENT(expected)
.key("nextTrainToCatch")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
UINT64_ELEMENT(expected)
.key("numberOfTrainsToCatch")
.assignmentOptional()
.defaultValue(1)
.reconfigurable()
.commit(),
INT64_ELEMENT(expected)
.key("indexOfTrainToForward")
.description(
"If this value is negative, all numberOfTrainsToCatch starting from "
"nextTrainToCatch are forwarded. If non-negative, this is used as an "
"index in this range of trains instead i.e. only the train with ID "
"nextTrainToCatch + min(indexOfTrainToForward, numberOfTrainsToCatch-1)"
"will be forwarded. This is relevant for previewing in Karabo GUI."
)
.assignmentOptional()
.defaultValue(-1)
.commit(),
STRING_ELEMENT(expected)
.key("ppuDevice")
.assignmentOptional()
.defaultValue("")
.commit(),
INT64_ELEMENT(expected)
.key("ppuTrainOffset")
.description(
"Offset added to trainTrigger.sequenceStart gotten from watched PPU "
"device. Should be 0 unless there are timing issues."
)
.assignmentOptional()
.defaultValue(0)
.commit(),
DOUBLE_ELEMENT(expected)
.key("ratioOfRecentTrainsReceived")
.description(
"Of the latest trains (from last received train, going back "
"[some buffer range]), how many did we receive? This estimate is "
"updated when new trains come in, so is unreliable if nothing is "
"coming at all."
)
.unit(Unit.PERCENT)
.readOnly()
.initialValue(0)
.commit(),
UINT64_ELEMENT(expected)
.key("trainId")
.readOnly()
.initialValue(0)
.commit(),
SLOT_ELEMENT(expected)
.key("toggleFollowPpu")
.allowedStates(State.MONITORING, State.PASSIVE, State.ERROR)
.commit(),
SLOT_ELEMENT(expected)
.key("resetCapturedSchema")
.commit(),
SLOT_ELEMENT(expected)
.key("captureNextTrain")
.allowedStates(State.MONITORING, State.PASSIVE, State.ERROR)
.commit(),
OUTPUT_CHANNEL(expected)
.key("output")
.commit(),
)
def __init__(self, config):
super().__init__(config)
self._previous_tid = 0
self._trains_to_get = set() # will hold range of trains
self._old_target_tid = None # just used for warnings about missing trains
# manual override: forward starting from next train, whatever it is
self._just_capture_next = False
self.KARABO_SLOT(self.captureNextTrain)
# output schema set from first data received; can be reset
self._schema_is_set = False
self.KARABO_SLOT(self.resetCapturedSchema)
self._following_ppu = None # will hold name of PPU device when following
self.KARABO_SLOT(self.toggleFollowPpu)
self.registerInitialFunction(self._initialization)
def _initialization(self):
self.KARABO_ON_DATA("input", self.input_handler)
self._train_ratio_tracker = utils.TrainRatioTracker()
self._rate_update_timer = utils.RepeatingTimer(
interval=1,
callback=self._update_trackers,
)
# if ppuDevice is set, will try to follow immediately
if self.get("ppuDevice"):
self.toggleFollowPpu()
def resetCapturedSchema(self):
self._schema_is_set = False
def captureNextTrain(self):
self._just_capture_next = True
if not self.get("state") is State.MONITORING:
self.updateState(State.MONITORING)
def toggleFollowPpu(self):
client = self.remote()
if self._following_ppu is None:
ppu_device_id = self.get("ppuDevice")
try:
conf = client.get(ppu_device_id)
self.handlePpuDeviceConfiguration(ppu_device_id, conf)
except TimeoutError:
self.log.WARN(f"Timed out getting configuration of {ppu_device_id}")
self.set("ppuFollowingState", "ERROR")
else:
client.registerDeviceMonitor(
ppu_device_id, self.handlePpuDeviceConfiguration
)
self._following_ppu = ppu_device_id
self.set("ppuFollowingState", "ON")
else:
client.unregisterDeviceMonitor(self._following_ppu)
self.set("ppuFollowingState", "OFF")
self._following_ppu = None
def handlePpuDeviceConfiguration(self, device_id, conf):
if conf.has("trainTrigger.numberOfTrains"):
self.set("numberOfTrainsToCatch", conf["trainTrigger.numberOfTrains"])
if conf.has("trainTrigger.sequenceStart"):
self._old_target_tid = self.get("nextTrainToCatch")
self.set("nextTrainToCatch", conf["trainTrigger.sequenceStart"])
if (
conf.has("trainTrigger.numberOfTrains")
or conf.has("trainTrigger.sequenceStart")
):
self._update_target(offset=self.get("ppuTrainOffset"))
def input_handler(self, data, meta):
if not self._schema_is_set:
schema_update = Schema()
(
OUTPUT_CHANNEL(schema_update)
.key("output")
.dataSchema(hash_to_schema(data))
.commit(),
)
self.updateSchema(schema_update)
self._schema_is_set = True
# TODO: handle multiple (consecutive) trains picked
state = self.get("state")
current_tid = Timestamp.fromHashAttributes(
meta.getAttributes("timestamp")
).getTrainId()
self._train_ratio_tracker.update(current_tid)
# TODO: check against timeserver to handle wild future trains
if self._just_capture_next:
self.set("nextTrainToCatch", current_tid)
self._trains_to_get = set(
range(current_tid, current_tid + self.get("numberOfTrainsToCatch"))
)
self._just_capture_next = False
target_tid = self.get("nextTrainToCatch")
if current_tid in self._trains_to_get:
# capture
if state is not State.ACQUIRING:
self.updateState(State.ACQUIRING)
channel = self.signalSlotable.getOutputChannel("output")
channel.write(
data,
# TODO: forward source name or use own?
ChannelMetaData(
meta.get("source"), Timestamp(Epochstamp(), Trainstamp(current_tid))
),
copyAllData=False,
)
channel.update()
# TODO: consider buffering instead and doing something clever
self._trains_to_get.discard(current_tid)
if not self._trains_to_get:
self.log.INFO("Caught all target trains :D")
self.updateState(State.PASSIVE)
elif target_tid > current_tid:
# wait
if state is not State.MONITORING:
self.updateState(State.MONITORING)
else:
# past capture range
if self._trains_to_get:
# note: wouuld also get triggered by receiving the same train twice
self.log.ERROR(f"Missed some train(s): {self._trains_to_get}")
self.updateState(State.ERROR)
self._trains_to_get.clear()
elif state not in (State.PASSIVE, State.ERROR):
self.log.INFO(f"Weird state: {state}; admonish the developer!")
self.updateState(State.PASSIVE)
self._previous_tid = current_tid
def _update_target(self, offset=0):
# assumes nextTrainToCatch and numberOfTrainsToCatch etc. have been set
new_target_tid = self.get("nextTrainToCatch") + offset
if (index_to_forward := self.get("indexOfTrainToForward")) < 0:
self._trains_to_get = set(
range(
new_target_tid, new_target_tid + self.get("numberOfTrainsToCatch")
)
)
else:
self._trains_to_get = set(
(
new_target_tid + min(
self.get("numberOfTrainsToCatch")-1, index_to_forward
),
)
)
if self._previous_tid >= new_target_tid:
self.log.INFO(
f"Moved target train to {new_target_tid} even though last seen was "
f"{self._previous_tid} - will miss some trains!"
)
else:
if self._old_target_tid < new_target_tid:
self.log.INFO(
f"Moved target train from {self._old_target_tid} to "
f"{new_target_tid} (last received was {self._previous_tid}), "
f"effectively skipping {self._old_target_tid}"
)
self.updateState(State.MONITORING)
def _update_trackers(self):
if self.get("trainId") != self._previous_tid:
self.set(
Hash(
"ratioOfRecentTrainsReceived",
self._train_ratio_tracker.get(),
"trainId",
self._previous_tid
)
)
def preReconfigure(self, config):
super().preReconfigure(config)
if config.has("nextTrainToCatch"):
self._old_target_tid = self.get("nextTrainToCatch")
self._cached_update = config
def postReconfigure(self):
super().postReconfigure()
if not hasattr(self, "_cached_update"):
self.log.WARN("postReconfigure update caching trick failed")
return
if any(
self._cached_update.has(thing)
for thing in (
"nextTrainToCatch",
"numberOfTrainsToCatch",
"indexOfTrainToForward",
)
):
self._update_target()
del self._cached_update
def hash_to_schema(h, root=None, prefix=""):
_element_mapper = {
Types.BOOL: BOOL_ELEMENT,
Types.CHAR: INT32_ELEMENT,
Types.DOUBLE: DOUBLE_ELEMENT,
Types.FLOAT: FLOAT_ELEMENT,
Types.INT8: INT32_ELEMENT,
Types.INT16: INT32_ELEMENT,
Types.INT32: INT32_ELEMENT,
Types.INT64: INT64_ELEMENT,
Types.UINT8: UINT32_ELEMENT,
Types.UINT16: UINT32_ELEMENT,
Types.UINT32: UINT32_ELEMENT,
Types.UINT64: UINT64_ELEMENT,
Types.STRING: STRING_ELEMENT,
Types.VECTOR_BOOL: VECTOR_BOOL_ELEMENT,
Types.VECTOR_CHAR: NDARRAY_ELEMENT,
Types.VECTOR_DOUBLE: NDARRAY_ELEMENT,
Types.VECTOR_FLOAT: NDARRAY_ELEMENT,
Types.VECTOR_INT8: NDARRAY_ELEMENT,
Types.VECTOR_INT16: NDARRAY_ELEMENT,
Types.VECTOR_INT32: NDARRAY_ELEMENT,
Types.VECTOR_INT64: NDARRAY_ELEMENT,
Types.VECTOR_UINT8: NDARRAY_ELEMENT,
Types.VECTOR_UINT16: NDARRAY_ELEMENT,
Types.VECTOR_UINT32: NDARRAY_ELEMENT,
Types.VECTOR_UINT64: NDARRAY_ELEMENT,
Types.VECTOR_STRING: VECTOR_STRING_ELEMENT,
}
if root is None:
root = Schema()
def rec(data, schema, prefix):
for key in data.getKeys():
vtype = data.getType(key)
value = data[key]
new_path = f"{prefix}{key}"
if isinstance(value, np.ndarray):
(
NDARRAY_ELEMENT(schema)
.key(new_path)
.commit())
elif isinstance(value, ImageData):
(
IMAGEDATA_ELEMENT(schema)
.key(new_path)
.commit()
)
elif vtype == Types.HASH:
(
NODE_ELEMENT(schema)
.key(new_path)
.commit()
)
rec(value, schema, f"{new_path}.")
else:
(
_element_mapper[vtype](schema)
.key(new_path)
.readOnly()
.commit()
)
rec(h, root, prefix)
return root
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment