From 6f3026c87570141eacbc7878141a48734b13a791 Mon Sep 17 00:00:00 2001 From: David Hammer <dhammer@mailbox.org> Date: Wed, 6 Oct 2021 13:41:51 +0200 Subject: [PATCH] Clean up rate tracking, state resetting --- src/calng/AgipdCorrection.py | 16 +---- src/calng/DsscCorrection.py | 13 +--- src/calng/base_correction.py | 123 ++++++++++------------------------- src/calng/utils.py | 44 +++++++++++++ 4 files changed, 84 insertions(+), 112 deletions(-) diff --git a/src/calng/AgipdCorrection.py b/src/calng/AgipdCorrection.py index d6f525b0..79ad4783 100644 --- a/src/calng/AgipdCorrection.py +++ b/src/calng/AgipdCorrection.py @@ -215,6 +215,7 @@ class AgipdCorrection(BaseCorrection): return time_start = timeit.default_timer() + self._last_processing_started = time_start train_id = metadata.getAttribute("timestamp", "tid") cell_table = np.squeeze(data.get("image.cellId")) @@ -238,15 +239,6 @@ class AgipdCorrection(BaseCorrection): if not self._schema_cache["state"] is State.PROCESSING: self.updateState(State.PROCESSING) self.set("status", "Processing data") - if self._state_reset_timer is None: - self._state_reset_timer = utils.DelayableTimer( - timeout=self._schema_cache["processingStateTimeout"], - callback=self._reset_state_from_processing, - ) - else: - self._state_reset_timer.set_timeout( - self._schema_cache["processingStateTimeout"] - ) correction_cell_num = self._schema_cache["constantParameters.memoryCells"] do_generate_preview = ( @@ -310,11 +302,9 @@ class AgipdCorrection(BaseCorrection): # update rate etc. self._buffered_status_update.set("trainId", train_id) - # self._rate_tracker.update() + self._rate_tracker.update() time_spent = timeit.default_timer() - time_start - self._buffered_status_update.set( - "performance.lastProcessingDuration", time_spent * 1000 - ) + self._processing_time_ema.update(time_spent) def loadMostRecentConstants(self): self.flush_constants() diff --git a/src/calng/DsscCorrection.py b/src/calng/DsscCorrection.py index 93009796..b1a616ab 100644 --- a/src/calng/DsscCorrection.py +++ b/src/calng/DsscCorrection.py @@ -113,15 +113,6 @@ class DsscCorrection(BaseCorrection): if not self._schema_cache["state"] is State.PROCESSING: self.updateState(State.PROCESSING) self.set("status", "Processing data") - if self._state_reset_timer is None: - self._state_reset_timer = utils.DelayableTimer( - timeout=self._schema_cache["processingStateTimeout"], - callback=self._reset_state_from_processing, - ) - else: - self._state_reset_timer.set_timeout( - self._schema_cache["processingStateTimeout"] - ) correction_cell_num = self._schema_cache["dataFormat.constantMemoryCells"] do_generate_preview = ( @@ -185,9 +176,7 @@ class DsscCorrection(BaseCorrection): self._buffered_status_update.set("trainId", train_id) self._rate_tracker.update() time_spent = timeit.default_timer() - time_start - self._buffered_status_update.set( - "performance.lastProcessingDuration", time_spent * 1000 - ) + self._processing_time_ema.update(time_spent) def _update_pulse_filter(self, filter_string): """Called whenever the pulse filter changes, typically followed by diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index 4dfa8726..5ce2bdf8 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -1,4 +1,5 @@ import threading +import timeit import hashToSchema import numpy as np @@ -34,6 +35,9 @@ from . import scenes, shmem_utils, utils from ._version import version as deviceVersion +PROCESSING_STATE_TIMEOUT = 10 + + @KARABO_CLASSINFO("BaseCorrection", deviceVersion) class BaseCorrection(PythonDevice): _correction_flag_class = None # subclass must override this with some enum class @@ -265,38 +269,8 @@ class BaseCorrection(PythonDevice): .commit(), ) + # just measurements and counters to display ( - NODE_ELEMENT(expected) - .key("performance") - .displayedName("Performance measures") - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.rateUpdateInterval") - .displayedName("Rate update interval") - .description("Interval (seconds) between updates of processing rate.") - .assignmentOptional() - .defaultValue(1) - .reconfigurable() - .commit(), - FLOAT_ELEMENT(expected) - .key("performance.rateBufferSpan") - .displayedName("Rate measurement buffer span") - .description("Event buffer timespan (in seconds) for measuring rate") - .assignmentOptional() - .defaultValue(20) - .reconfigurable() - .commit(), - FLOAT_ELEMENT(expected) - .key("processingStateTimeout") - .description( - "Timeout after which the device goes from PROCESSING back to ACTIVE " - "if no new input is processed" - ) - .assignmentOptional() - .defaultValue(10) - .reconfigurable() - .commit(), - # just measurements and counters to display UINT64_ELEMENT(expected) .key("trainId") .displayedName("Train ID") @@ -304,42 +278,36 @@ class BaseCorrection(PythonDevice): .readOnly() .initialValue(0) .commit(), + NODE_ELEMENT(expected) + .key("performance") + .displayedName("Performance measures") + .commit(), FLOAT_ELEMENT(expected) - .key("performance.lastProcessingDuration") + .key("performance.processingDuration") .displayedName("Processing time") .description( - "Amount of time spent in processing latest train. Time includes " - "generating preview and sending data." + "Exponential moving average over time spent processing individual " + "trains Time includes generating preview and sending data." ) .unit(Unit.SECOND) .metricPrefix(MetricPrefix.MILLI) .readOnly() .initialValue(0) + .warnHigh(100) + .info("Processing not fast enough for full speed") + .needsAcknowledging(False) .commit(), FLOAT_ELEMENT(expected) .key("performance.rate") .displayedName("Rate") .description( - "Actual rate with which this device gets / processes / sends trains" + "Actual rate with which this device gets, processes, and sends trains. " + "This is a simple moving average." ) .unit(Unit.HERTZ) .readOnly() .initialValue(0) .commit(), - FLOAT_ELEMENT(expected) - .key("performance.theoreticalRate") - .displayedName("Processing rate (hypothetical)") - .description( - "Rate with which this device could hypothetically process trains. " - "Based on lastProcessingDuration." - ) - .unit(Unit.HERTZ) - .readOnly() - .initialValue(float("NaN")) - .warnLow(10) - .info("Processing not fast enough for full speed") - .needsAcknowledging(False) - .commit(), ) ( @@ -407,43 +375,26 @@ class BaseCorrection(PythonDevice): self._shmem_buffer = None self._has_set_output_schema = False self._has_updated_shapes = False - # TODO: replace - #self._rate_tracker = calibrationBase.utils.UpdateRate( - #interval=config.get("performance.rateBufferSpan") - #) - self._state_reset_timer = None + self._processing_time_ema = utils.ExponentialMovingAverage(alpha=0.3) + self._rate_tracker = utils.WindowRateTracker() self._buffered_status_update = Hash( "trainId", 0, "performance.rate", 0, - "performance.theoreticalRate", - float("NaN"), - "performance.lastProcessingDuration", + "performance.processingDuration", 0, ) + self._last_processing_started = 0 # not input handler should put timestamp self._rate_update_timer = utils.RepeatingTimer( - interval=config.get("performance.rateUpdateInterval"), - callback=self._update_actual_rate, + interval=0.5, + callback=self._update_rate_and_state, ) self._buffer_lock = threading.Lock() self.KARABO_SLOT(self.requestScene) def preReconfigure(self, config): - if config.has("performance.rateUpdateInterval"): - self._rate_update_timer.stop() - self._rate_update_timer = utils.RepeatingTimer( - interval=config.get("performance.rateUpdateInterval"), - callback=self._update_actual_rate, - ) - - if config.has("performance.rateBufferSpan"): - #self._rate_tracker = calibrationBase.utils.UpdateRate( - #interval=config.get("performance.rateBufferSpan") - #) - ... - for path in config.getPaths(): if path in self._schema_cache_slots: self._schema_cache[path] = config.get(path) @@ -620,28 +571,26 @@ class BaseCorrection(PythonDevice): self._has_updated_shapes = True - def _reset_state_from_processing(self): - # TODO: merge with rate updates (buffer status update checking) - if self.get("state") is State.PROCESSING: - self.updateState(State.ON) - self._state_reset_timer = None - - def _update_actual_rate(self): + def _update_rate_and_state(self): if not self.get("state") is State.PROCESSING: self._rate_update_timer.delay() return - #self._buffered_status_update.set("performance.rate", self._rate_tracker.rate()) - last_processing = self._buffered_status_update.get( - "performance.lastProcessingDuration" + + self._buffered_status_update.set("performance.rate", self._rate_tracker.get()) + self._buffered_status_update.set( + "performance.processingDuration", self._processing_time_ema.get() * 1000 ) - if last_processing > 0: - theoretical_rate = 1000 / last_processing - self._buffered_status_update.set( - "performance.theoreticalRate", theoretical_rate - ) + # trainId should be set on _buffered_status_update in input handler self.set(self._buffered_status_update) self._rate_update_timer.delay() + if ( + timeit.default_timer() - self._last_processing_started + > PROCESSING_STATE_TIMEOUT + ): + if self.get("state") is State.PROCESSING: + self.updateState(State.ON) + def handle_eos(self, channel): self._has_set_output_schema = False self.updateState(State.ON) diff --git a/src/calng/utils.py b/src/calng/utils.py index d28139e7..202ed9e6 100644 --- a/src/calng/utils.py +++ b/src/calng/utils.py @@ -1,3 +1,4 @@ +import collections import functools import inspect import threading @@ -181,3 +182,46 @@ class Throttler: return True else: return False + + +class ExponentialMovingAverage: + def __init__(self, alpha, use_first_value=True): + self.alpha = alpha + self.initialised = not use_first_value + self.mean = 0 + + def update(self, value): + if self.initialised: + self.mean += self.alpha * (value - self.mean) + else: + self.mean = value + self.initialised = True + + def get(self): + return self.mean + + +class WindowRateTracker: + def __init__(self, buffer_size=20, time_window=20, timer=timeit.default_timer): + self.time_window = time_window + self.buffer_size = buffer_size + self.deque = collections.deque(maxlen=self.buffer_size) + self.timer = timer + + def update(self): + now = self.timer() + self.deque.append(now) + + def get(self): + now = self.timer() + cutoff = now - self.time_window + try: + while self.deque[0] < cutoff: + self.deque.popleft() + except IndexError: + return 0 + if len(self.deque) < self.buffer_size: + return len(self.deque) / self.time_window + else: + # if going faster than buffer size per time window, look at timestamps + return len(self.deque) / (self.deque[-1] - self.deque[0]) -- GitLab