From 38f4cd3dfbc758ddb525ccf83bef427ced09871a Mon Sep 17 00:00:00 2001 From: David Hammer <dhammer@mailbox.org> Date: Mon, 31 Jan 2022 12:06:58 +0100 Subject: [PATCH] Add measurement of recent trains received --- src/calng/base_correction.py | 35 ++++++++++++++++++---- src/calng/utils.py | 58 ++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 5 deletions(-) diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index ffc37d17..7d93990a 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -590,6 +590,19 @@ class BaseCorrection(PythonDevice): .readOnly() .initialValue(0) .commit(), + + DOUBLE_ELEMENT(expected) + .key("performance.ratioOfRecentTrainsReceived") + .description( + "Of the latest trains (from last received train, going back " + "[some buffer range]), how many did we receive? This estimate is " + "updated when new trains come in, so is unreliable if nothing is " + "coming at all." + ) + .unit(Unit.PERCENT) + .readOnly() + .initialValue(0) + .commit(), ) # this node will be filled out by subclass @@ -657,6 +670,8 @@ class BaseCorrection(PythonDevice): 0, "performance.processingTime", 0, + "performance.ratioOfRecentTrainsReceived", + 0, ) self._processing_time_ema = utils.ExponentialMovingAverage(alpha=0.3) self._rate_tracker = utils.WindowRateTracker() @@ -664,6 +679,9 @@ class BaseCorrection(PythonDevice): interval=1, callback=self._update_rate_and_state, ) + self._train_ratio_tracker = utils.TrainRatioTracker( + warn_callback=self.log_status_warn + ) self.KARABO_ON_INPUT("dataInput", self.input_handler) self.KARABO_ON_EOS("dataInput", self.handle_eos) @@ -956,6 +974,7 @@ class BaseCorrection(PythonDevice): return train_id = metadata.getAttribute("timestamp", "tid") + self._train_ratio_tracker.update(train_id) cell_table = data_hash.get(self._cell_table_path) if ( (isinstance(cell_table, np.ndarray) and cell_table.size == 0) @@ -1012,13 +1031,19 @@ class BaseCorrection(PythonDevice): self._rate_tracker.update() def _update_rate_and_state(self): - self._buffered_status_update.set("performance.rate", self._rate_tracker.get()) - self._buffered_status_update.set( - "performance.processingTime", self._processing_time_ema.get() * 1000 - ) - # trainId in _buffered_status_update should be updated in input handler if self.get("state") is State.PROCESSING: + self._buffered_status_update.set( + "performance.rate", self._rate_tracker.get() + ) + self._buffered_status_update.set( + "performance.processingTime", self._processing_time_ema.get() * 1000 + ) + self._buffered_status_update.set( + "performance.ratioOfRecentTrainsReceived", + self._train_ratio_tracker.get(), + ) + # trainId in _buffered_status_update should be updated in input handler self.set(self._buffered_status_update) if ( default_timer() - self._last_processing_started diff --git a/src/calng/utils.py b/src/calng/utils.py index 0006ab94..940e0aa3 100644 --- a/src/calng/utils.py +++ b/src/calng/utils.py @@ -284,3 +284,61 @@ class Stopwatch: return f"{self.elapsed():.3f} s" else: return f"{self.name}: {self.elapsed():.3f} s" + + +class TrainRatioTracker: + """Measure how many percent of recent train IDs (from contiguous set) were seen + + The tracker will maintain a queue of buffer_size train IDs going back at most + buffer_size from latest train ID (depending on calls to get). Call update(train_id) + when you see a new train and call get to get() the ratio of recent trains seen. + + In case warn_callback is given, update can issue a warning in case invalid train + IDs are received. The tracker assumes trains are strictly increasing and that they + are supposed to be contiguous - hence the ability to infer when some are missing. + """ + + def __init__(self, buffer_size=50, warn_callback=None): + self._train_id_queue = collections.deque(maxlen=buffer_size) + self._train_id_range = buffer_size + self._warn_callback = warn_callback + + def get(self, current_train=None): + """Get the ratio of recent trains until current_train or latest updated. + + If current_train is not provided, then the range considered in computing the + ratio will be from latest train_id in update (going back buffer_size trains). + + If you happen to know a more current train ID that has not been given in a + call to update (maybe you are receiving invalid trains and don't count them), + you can give this as current_train, yielding a lower ratio. Note that this will + trim the queue, so a subsequent call with lower or no current_train will return + an incorrectly low ratio. + """ + if len(self._train_id_queue) == 0: + return 0 + + if current_train is None: + current_train = self._train_id_queue[-1] + + cutoff = current_train - self._train_id_range + 1 + try: + while self._train_id_queue[0] < cutoff: + self._train_id_queue.popleft() + except IndexError: + return 0 + + # TODO: avoid estimator ramp-up (don't initially divide by full range) + return len(self._train_id_queue) * 100 / self._train_id_range + + def update(self, train_id): + if ( + len(self._train_id_queue) > 0 + and self._train_id_queue[-1] >= train_id + and self._warn_callback is not None + ): + self._warn_callback( + f"New train ID {train_id} not greater than last thing in queue, " + f"{self._train_id_queue[-1]}, just thought you should know..." + ) + self._train_id_queue.append(train_id) -- GitLab