diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index b19962230c9f0333b341096d669da0d0a8fd8b41..03f31b0f9ce6f6b33a377cae74c4d2e477e01566 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -3,6 +3,7 @@ import contextlib import enum import functools import gc +import math import pathlib import threading from timeit import default_timer @@ -519,6 +520,19 @@ class BaseCorrection(PythonDevice): .initialValue(0) .commit(), + DOUBLE_ELEMENT(expected) + .key("performance.inputDelay") + .displayedName("Input delay") + .description( + "Measured delay on the input channel. Measured by difference between " + "'now' and timestamp in input metadata." + ) + .unit(Unit.SECOND) + .metricPrefix(MetricPrefix.MILLI) + .readOnly() + .initialValue(0) + .commit(), + DOUBLE_ELEMENT(expected) .key("performance.ratioOfRecentTrainsReceived") .description( @@ -711,8 +725,9 @@ class BaseCorrection(PythonDevice): "performance.ratioOfRecentTrainsReceived", 0, ) - self._processing_time_ema = utils.ExponentialMovingAverage(alpha=0.3) + self._processing_time_tracker = utils.ExponentialMovingAverage(alpha=0.3) self._rate_tracker = utils.WindowRateTracker() + self._input_delay_tracker = utils.ExponentialMovingAverage(alpha=0.3) self._rate_update_timer = utils.RepeatingTimer( interval=1, callback=self._update_rate_and_state, @@ -1101,13 +1116,21 @@ class BaseCorrection(PythonDevice): self.updateState(State.PROCESSING) self.log_status_info("Processing data") - train_id = metadata.getAttribute("timestamp", "tid") + timestamp = Timestamp.fromHashAttributes(metadata.getAttributes("timestamp")) + train_id = timestamp.getTrainId() # check time server connection with self.warning_context( "deviceInternalsState", WarningLampType.TIMESERVER_CONNECTION ) as warn: - my_train_id = self.getActualTimestamp().getTrainId() + my_timestamp = self.getActualTimestamp() + my_train_id = my_timestamp.getTrainId() + self._input_delay_tracker.update( + (my_timestamp.toTimestamp() - timestamp.toTimestamp()) * 1000 + ) + self._buffered_status_update.set( + "performance.inputDelay", self._input_delay_tracker.get() + ) if my_train_id == 0: my_train_id = self._last_train_id_processed + 1 warn( @@ -1131,7 +1154,14 @@ class BaseCorrection(PythonDevice): self._train_ratio_tracker.update(train_id) self._buffered_status_update.set( "performance.ratioOfRecentTrainsReceived", - self._train_ratio_tracker.get(), + self._train_ratio_tracker.get( + current_train=my_train_id, + expected_delay=math.ceil( + self.unsafe_get("performance.inputDelay") / 100 + ), + ) + if my_train_id != 0 + else self._train_ratio_tracker.get(), ) except utils.NonMonotonicTrainIdWarning as ex: warn( @@ -1175,11 +1205,11 @@ class BaseCorrection(PythonDevice): ) self._last_train_id_processed = train_id self._buffered_status_update.set("trainId", train_id) - self._processing_time_ema.update( + self._processing_time_tracker.update( default_timer() - self._last_processing_started ) self._buffered_status_update.set( - "performance.processingTime", self._processing_time_ema.get() * 1000 + "performance.processingTime", self._processing_time_tracker.get() * 1000 ) self._rate_tracker.update() @@ -1200,6 +1230,7 @@ class BaseCorrection(PythonDevice): self.log_status_info( f"No new train in {PROCESSING_STATE_TIMEOUT} s, switching state." ) + self._train_ratio_tracker.reset() def handle_eos(self, channel): self.updateState(State.ON) diff --git a/src/calng/utils.py b/src/calng/utils.py index 88baaa6fdca33e8316bcd67f3290d524db43cfb5..17be2a9cc8510727704477c0ed1d758d5eaa1036 100644 --- a/src/calng/utils.py +++ b/src/calng/utils.py @@ -411,36 +411,33 @@ class TrainRatioTracker: def __init__(self, buffer_size=50): self._train_id_queue = collections.deque(maxlen=buffer_size) - # should train ID range be explicitly configurable? - self._train_id_range = buffer_size * 10 - def get(self, current_train=None): - """Get the ratio of recent trains until current_train or latest updated. + def get(self, current_train=None, expected_delay=None): + """Get the ratio of recent trains based on buffer contents - If current_train is not provided, then the range considered in computing the - ratio will be from latest train_id in update (going back buffer_size trains). + If current_train is provided, the train ID span used to compute the ratio is + from oldest tid in buffer to current_train. Otherwise, latest update tid is + used as stand-in for current_train. - If you happen to know a more current train ID that has not been given in a - call to update (maybe you are receiving invalid trains and don't count them), - you can give this as current_train, yielding a lower ratio. Note that this will - trim the queue, so a subsequent call with lower or no current_train will return - an incorrectly low ratio. + If expected_delay is provided in addition to current_train, will take max of + latest update tid and current train minus expected delay. """ - if len(self._train_id_queue) == 0: - return 0 - - if current_train is None: - current_train = self._train_id_queue[-1] - - cutoff = current_train - self._train_id_range + 1 try: - while self._train_id_queue[0] < cutoff: - self._train_id_queue.popleft() + if current_train is None: + current_train = self._train_id_queue[-1] + elif expected_delay is not None: + current_train = max( + current_train - expected_delay, self._train_id_queue[-1] + ) + oldest_train = self._train_id_queue[0] except IndexError: return 0 - # TODO: avoid estimator ramp-up (don't initially divide by full range) - return len(self._train_id_queue) * 100 / self._train_id_range + return ( + len(self._train_id_queue) + * 100 + / (current_train - oldest_train + 1) + ) def reset(self): self._train_id_queue.clear()