Skip to content
Snippets Groups Projects
Commit 45556239 authored by David Hammer's avatar David Hammer
Browse files

Allow multiple consecutive trains

parent 025899b3
No related branches found
No related tags found
1 merge request!53Train picker arbiter kernel
...@@ -18,12 +18,14 @@ from karabo.bound import ( ...@@ -18,12 +18,14 @@ from karabo.bound import (
UINT64_ELEMENT, UINT64_ELEMENT,
VECTOR_BOOL_ELEMENT, VECTOR_BOOL_ELEMENT,
VECTOR_STRING_ELEMENT, VECTOR_STRING_ELEMENT,
Hash, ChannelMetaData,
Epochstamp,
ImageData, ImageData,
PythonDevice, PythonDevice,
Schema, Schema,
State, State,
Timestamp, Timestamp,
Trainstamp,
Types, Types,
) )
...@@ -37,9 +39,22 @@ class PickyBoi(PythonDevice): ...@@ -37,9 +39,22 @@ class PickyBoi(PythonDevice):
( (
OVERWRITE_ELEMENT(expected) OVERWRITE_ELEMENT(expected)
.key("state") .key("state")
.setNewOptions(
State.INIT,
State.MONITORING, # waiting for picked train(s)
State.ACQUIRING, # currently getting picked trains
State.PASSIVE, # after getting picked trains with no new pick set
State.ERROR, # missed the train
)
.setNewDefaultValue(State.INIT) .setNewDefaultValue(State.INIT)
.commit(), .commit(),
STRING_ELEMENT(expected)
.key("ppuFollowingState")
.readOnly()
.initialValue("OFF")
.commit(),
INPUT_CHANNEL(expected) INPUT_CHANNEL(expected)
.key("input") .key("input")
.commit(), .commit(),
...@@ -52,7 +67,7 @@ class PickyBoi(PythonDevice): ...@@ -52,7 +67,7 @@ class PickyBoi(PythonDevice):
.commit(), .commit(),
UINT64_ELEMENT(expected) UINT64_ELEMENT(expected)
.key("numTrainToCatch") .key("numberOfTrainsToCatch")
.assignmentOptional() .assignmentOptional()
.defaultValue(1) .defaultValue(1)
.reconfigurable() .reconfigurable()
...@@ -65,7 +80,8 @@ class PickyBoi(PythonDevice): ...@@ -65,7 +80,8 @@ class PickyBoi(PythonDevice):
.commit(), .commit(),
SLOT_ELEMENT(expected) SLOT_ELEMENT(expected)
.key("watchPpu") .key("toggleFollowPpu")
.allowedStates([State.MONITORING, State.PASSIVE, State.ERROR])
.commit(), .commit(),
SLOT_ELEMENT(expected) SLOT_ELEMENT(expected)
...@@ -74,43 +90,70 @@ class PickyBoi(PythonDevice): ...@@ -74,43 +90,70 @@ class PickyBoi(PythonDevice):
SLOT_ELEMENT(expected) SLOT_ELEMENT(expected)
.key("captureNextTrain") .key("captureNextTrain")
.allowedStates([State.MONITORING, State.PASSIVE, State.ERROR])
.commit(), .commit(),
) )
def __init__(self, config): def __init__(self, config):
super().__init__(config) super().__init__(config)
self.registerInitialFunction(self._initialization)
self._schema_is_set = False
self._previous_tid = 0 self._previous_tid = 0
self._trains_to_get = set() # will hold range of trains
self._old_target_tid = None # just used for warnings about missing trains
# manual override: forward starting from next train, whatever it is
self._just_capture_next = False self._just_capture_next = False
self.KARABO_SLOT(self.resetCapturedSchema)
self.KARABO_SLOT(self.captureNextTrain) self.KARABO_SLOT(self.captureNextTrain)
# output schema set from first data received; can be reset
self._schema_is_set = False
self.KARABO_SLOT(self.resetCapturedSchema)
self._following_ppu = None # will hold name of PPU device when following
self.KARABO_SLOT(self.toggleFollowPpu)
self.registerInitialFunction(self._initialization)
def _initialization(self):
self.KARABO_ON_DATA("input", self.input_handler)
# if ppuDevice is set, will try to follow immediately
if self.get("ppuDevice"):
self.toggleFollowPpu()
def resetCapturedSchema(self): def resetCapturedSchema(self):
self._schema_is_set = False self._schema_is_set = False
def captureNextTrain(self): def captureNextTrain(self):
self._just_capture_next = True self._just_capture_next = True
if not self.get("state") is State.MONITORING:
self.updateState(State.MONITORING)
def watchPpu(self): def toggleFollowPpu(self):
ppu_device_id = self.get("ppuDevice")
client = self.remote() client = self.remote()
conf = client.getConfiguration(ppu_device_id) if self._following_ppu is None:
self.handlePpuDeviceConfiguration(conf) ppu_device_id = self.get("ppuDevice")
client.registerDeviceMonitor(ppu_device_id, self.handlePpuDeviceConfiguration) conf = client.getConfiguration(ppu_device_id)
self.handlePpuDeviceConfiguration(conf)
client.registerDeviceMonitor(
ppu_device_id, self.handlePpuDeviceConfiguration
)
self._following_ppu = ppu_device_id
self.set("ppuFollowingState", "ON")
else:
client.unregisterDeviceMonitor(self._following_ppu)
self.set("ppuFollowingState", "OFF")
self._following_ppu = None
def handlePpuDeviceConfiguration(self, conf): def handlePpuDeviceConfiguration(self, conf):
... if conf.has("numberOfTrains"):
# TODO self.set("numberOfTrainsToCatch", conf["numberOfTrains"])
self._set_new_target_tid(new_target_tid) if conf.has("sequenceStart"):
self.set("nextTrainToCatch", new_target_tid) self._old_target_tid = self.get("nextTrainToCatch")
self.set("nextTrainToCatch", conf["sequenceStart"])
def _initialization(self): if conf.has("numberOfTrains") or conf.has("sequenceStart"):
self.KARABO_ON_DATA("input", self.input_handler) self._update_target()
def input_handler(self, data, meta): def input_handler(self, data, meta):
if not self._schema_is_set: if not self._schema_is_set:
self.updateState(State.ROTATING)
schema_update = Schema() schema_update = Schema()
( (
OUTPUT_CHANNEL(schema_update) OUTPUT_CHANNEL(schema_update)
...@@ -126,51 +169,87 @@ class PickyBoi(PythonDevice): ...@@ -126,51 +169,87 @@ class PickyBoi(PythonDevice):
current_tid = Timestamp.fromHashAttributes( current_tid = Timestamp.fromHashAttributes(
meta.getAttributes("timestamp") meta.getAttributes("timestamp")
).getTrainId() ).getTrainId()
# TODO: check against timeserver to handle wild future trains
if self._just_capture_next: if self._just_capture_next:
self.set("nextTrainToCatch", current_tid) self.set("nextTrainToCatch", current_tid)
self._traint_to_get = set(
range(current_tid, current_tid + self.get("numberOfTrainsToCatch"))
)
self._just_capture_next = False self._just_capture_next = False
target_tid = self.get("nextTrainToCatch") target_tid = self.get("nextTrainToCatch")
if target_tid > current_tid: if current_tid in self._trains_to_get:
# capture
if state is not State.ACQUIRING:
self.updateState(State.ACQUIRING)
channel = self.signalSlotable.getOutputChannel("output")
channel.write(
data,
# TODO: forward source name or use own?
ChannelMetaData(
meta.get("source"), Timestamp(Epochstamp(), Trainstamp(current_tid))
),
copyAllData=False,
)
channel.update()
# TODO: consider buffering instead and doing something clever
self._trains_to_get.discard(current_tid)
if not self._trains_to_get:
self.log.INFO("Caught all target trains :D")
self.updateState(State.PASSIVE)
elif target_tid > current_tid:
# wait
if state is not State.MONITORING: if state is not State.MONITORING:
self.updateState(State.MONITORING) self.updateState(State.MONITORING)
elif target_tid < current_tid:
if self._previous_tid < target_tid and state is not State.ERROR:
self.log.ERROR(f"Missed target train of {target_tid} :(")
self.updateState(State.ERROR)
else:
if state is not State.PASSIVE:
self.updateState(State.PASSIVE)
else: else:
self.updateState(State.FILLING) # past capture range
self.log.INFO(f"Got target train {target} now :D") if self._trains_to_get:
# TODO: copy train ID on metadata # note: wouuld also get triggered by receiving the same train twice
self.writeChannel("output", data) self.log.ERROR(f"Missed some train(s): {self._trains_to_get}")
self.updateState(State.DISENGAGED) self.updateState(State.ERROR)
self._previous_tid = current self._trains_to_get.clear()
elif state not in (State.PASSIVE, State.ERROR):
def _set_new_target_tid(self, new_target_tid): self.log.INFO(f"Weird state: {state}; admonish the developer!")
# assumes nextTrainToCatch gets set *after* this function self.updateState(State.PASSIVE)
current_target_tid = self.get("nextTrainToCatch") self._previous_tid = current_tid
def _update_target(self, new_target_tid):
# assumes nextTrainToCatch and numberOfTrainsToCatch have been set
new_target_tid = self.get("nextTrainToCatch")
self._trains_to_get = set(
range(new_target_tid, new_target_tid + self.get("numberOfTrainsToCatch"))
)
if self._previous_tid >= new_target_tid: if self._previous_tid >= new_target_tid:
self.log.INFO( self.log.INFO(
f"Moved target train to {new_target_tid} even though last seen was " f"Moved target train to {new_target_tid} even though last seen was "
f"{self._previous_tid} - will not be able to retroactively catch this." f"{self._previous_tid} - will miss some trains!"
) )
self.updateState(State.ERROR)
else: else:
if current_target_tid < new_target_tid: if self._old_target_tid < new_target_tid:
self.log.INFO( self.log.INFO(
f"Moved target train from {current_target_tid} to {new_target_tid}" f"Moved target train from {self._old_target_tid} to "
f"even though last seen was {self._previous_tid} " f"{new_target_tid} (last received was {self._previous_tid}), "
f"effectively skipping {current_target_tid}" f"effectively skipping {self._old_target_tid}"
) )
self.updateState(State.MONITORING) self.updateState(State.MONITORING)
def preReconfigure(self, config): def preReconfigure(self, config):
super().preReconfigure(config) super().preReconfigure(config)
if config.has("nextTrainToCatch"): if config.has("nextTrainToCatch"):
self._set_new_target_tid(config["nextTrainToCatch"]) self._old_target_tid = self.get("nextTrainToCatch")
self._cached_update = config
def postReconfigure(self):
super().postReconfigure()
if not hasattr(self, "_cached_update"):
self.log.WARN("postReconfigure update caching trick failed")
return
if (
self._cached_update.has("nextTrainToCatch")
or self._cached_update.has("numberOfTrainsToCatch")
):
self._update_target()
def hash_to_schema(h, root=None, prefix=""): def hash_to_schema(h, root=None, prefix=""):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment