Skip to content
Snippets Groups Projects
Commit 6f3026c8 authored by David Hammer's avatar David Hammer
Browse files

Clean up rate tracking, state resetting

parent f05a2e3c
No related branches found
No related tags found
2 merge requests!12Snapshot: field test deployed version as of end of run 202201,!3Base correction device, CalCat interaction, DSSC and AGIPD devices
...@@ -215,6 +215,7 @@ class AgipdCorrection(BaseCorrection): ...@@ -215,6 +215,7 @@ class AgipdCorrection(BaseCorrection):
return return
time_start = timeit.default_timer() time_start = timeit.default_timer()
self._last_processing_started = time_start
train_id = metadata.getAttribute("timestamp", "tid") train_id = metadata.getAttribute("timestamp", "tid")
cell_table = np.squeeze(data.get("image.cellId")) cell_table = np.squeeze(data.get("image.cellId"))
...@@ -238,15 +239,6 @@ class AgipdCorrection(BaseCorrection): ...@@ -238,15 +239,6 @@ class AgipdCorrection(BaseCorrection):
if not self._schema_cache["state"] is State.PROCESSING: if not self._schema_cache["state"] is State.PROCESSING:
self.updateState(State.PROCESSING) self.updateState(State.PROCESSING)
self.set("status", "Processing data") self.set("status", "Processing data")
if self._state_reset_timer is None:
self._state_reset_timer = utils.DelayableTimer(
timeout=self._schema_cache["processingStateTimeout"],
callback=self._reset_state_from_processing,
)
else:
self._state_reset_timer.set_timeout(
self._schema_cache["processingStateTimeout"]
)
correction_cell_num = self._schema_cache["constantParameters.memoryCells"] correction_cell_num = self._schema_cache["constantParameters.memoryCells"]
do_generate_preview = ( do_generate_preview = (
...@@ -310,11 +302,9 @@ class AgipdCorrection(BaseCorrection): ...@@ -310,11 +302,9 @@ class AgipdCorrection(BaseCorrection):
# update rate etc. # update rate etc.
self._buffered_status_update.set("trainId", train_id) self._buffered_status_update.set("trainId", train_id)
# self._rate_tracker.update() self._rate_tracker.update()
time_spent = timeit.default_timer() - time_start time_spent = timeit.default_timer() - time_start
self._buffered_status_update.set( self._processing_time_ema.update(time_spent)
"performance.lastProcessingDuration", time_spent * 1000
)
def loadMostRecentConstants(self): def loadMostRecentConstants(self):
self.flush_constants() self.flush_constants()
......
...@@ -113,15 +113,6 @@ class DsscCorrection(BaseCorrection): ...@@ -113,15 +113,6 @@ class DsscCorrection(BaseCorrection):
if not self._schema_cache["state"] is State.PROCESSING: if not self._schema_cache["state"] is State.PROCESSING:
self.updateState(State.PROCESSING) self.updateState(State.PROCESSING)
self.set("status", "Processing data") self.set("status", "Processing data")
if self._state_reset_timer is None:
self._state_reset_timer = utils.DelayableTimer(
timeout=self._schema_cache["processingStateTimeout"],
callback=self._reset_state_from_processing,
)
else:
self._state_reset_timer.set_timeout(
self._schema_cache["processingStateTimeout"]
)
correction_cell_num = self._schema_cache["dataFormat.constantMemoryCells"] correction_cell_num = self._schema_cache["dataFormat.constantMemoryCells"]
do_generate_preview = ( do_generate_preview = (
...@@ -185,9 +176,7 @@ class DsscCorrection(BaseCorrection): ...@@ -185,9 +176,7 @@ class DsscCorrection(BaseCorrection):
self._buffered_status_update.set("trainId", train_id) self._buffered_status_update.set("trainId", train_id)
self._rate_tracker.update() self._rate_tracker.update()
time_spent = timeit.default_timer() - time_start time_spent = timeit.default_timer() - time_start
self._buffered_status_update.set( self._processing_time_ema.update(time_spent)
"performance.lastProcessingDuration", time_spent * 1000
)
def _update_pulse_filter(self, filter_string): def _update_pulse_filter(self, filter_string):
"""Called whenever the pulse filter changes, typically followed by """Called whenever the pulse filter changes, typically followed by
......
import threading import threading
import timeit
import hashToSchema import hashToSchema
import numpy as np import numpy as np
...@@ -34,6 +35,9 @@ from . import scenes, shmem_utils, utils ...@@ -34,6 +35,9 @@ from . import scenes, shmem_utils, utils
from ._version import version as deviceVersion from ._version import version as deviceVersion
PROCESSING_STATE_TIMEOUT = 10
@KARABO_CLASSINFO("BaseCorrection", deviceVersion) @KARABO_CLASSINFO("BaseCorrection", deviceVersion)
class BaseCorrection(PythonDevice): class BaseCorrection(PythonDevice):
_correction_flag_class = None # subclass must override this with some enum class _correction_flag_class = None # subclass must override this with some enum class
...@@ -265,38 +269,8 @@ class BaseCorrection(PythonDevice): ...@@ -265,38 +269,8 @@ class BaseCorrection(PythonDevice):
.commit(), .commit(),
) )
# just measurements and counters to display
( (
NODE_ELEMENT(expected)
.key("performance")
.displayedName("Performance measures")
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.rateUpdateInterval")
.displayedName("Rate update interval")
.description("Interval (seconds) between updates of processing rate.")
.assignmentOptional()
.defaultValue(1)
.reconfigurable()
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.rateBufferSpan")
.displayedName("Rate measurement buffer span")
.description("Event buffer timespan (in seconds) for measuring rate")
.assignmentOptional()
.defaultValue(20)
.reconfigurable()
.commit(),
FLOAT_ELEMENT(expected)
.key("processingStateTimeout")
.description(
"Timeout after which the device goes from PROCESSING back to ACTIVE "
"if no new input is processed"
)
.assignmentOptional()
.defaultValue(10)
.reconfigurable()
.commit(),
# just measurements and counters to display
UINT64_ELEMENT(expected) UINT64_ELEMENT(expected)
.key("trainId") .key("trainId")
.displayedName("Train ID") .displayedName("Train ID")
...@@ -304,42 +278,36 @@ class BaseCorrection(PythonDevice): ...@@ -304,42 +278,36 @@ class BaseCorrection(PythonDevice):
.readOnly() .readOnly()
.initialValue(0) .initialValue(0)
.commit(), .commit(),
NODE_ELEMENT(expected)
.key("performance")
.displayedName("Performance measures")
.commit(),
FLOAT_ELEMENT(expected) FLOAT_ELEMENT(expected)
.key("performance.lastProcessingDuration") .key("performance.processingDuration")
.displayedName("Processing time") .displayedName("Processing time")
.description( .description(
"Amount of time spent in processing latest train. Time includes " "Exponential moving average over time spent processing individual "
"generating preview and sending data." "trains Time includes generating preview and sending data."
) )
.unit(Unit.SECOND) .unit(Unit.SECOND)
.metricPrefix(MetricPrefix.MILLI) .metricPrefix(MetricPrefix.MILLI)
.readOnly() .readOnly()
.initialValue(0) .initialValue(0)
.warnHigh(100)
.info("Processing not fast enough for full speed")
.needsAcknowledging(False)
.commit(), .commit(),
FLOAT_ELEMENT(expected) FLOAT_ELEMENT(expected)
.key("performance.rate") .key("performance.rate")
.displayedName("Rate") .displayedName("Rate")
.description( .description(
"Actual rate with which this device gets / processes / sends trains" "Actual rate with which this device gets, processes, and sends trains. "
"This is a simple moving average."
) )
.unit(Unit.HERTZ) .unit(Unit.HERTZ)
.readOnly() .readOnly()
.initialValue(0) .initialValue(0)
.commit(), .commit(),
FLOAT_ELEMENT(expected)
.key("performance.theoreticalRate")
.displayedName("Processing rate (hypothetical)")
.description(
"Rate with which this device could hypothetically process trains. "
"Based on lastProcessingDuration."
)
.unit(Unit.HERTZ)
.readOnly()
.initialValue(float("NaN"))
.warnLow(10)
.info("Processing not fast enough for full speed")
.needsAcknowledging(False)
.commit(),
) )
( (
...@@ -407,43 +375,26 @@ class BaseCorrection(PythonDevice): ...@@ -407,43 +375,26 @@ class BaseCorrection(PythonDevice):
self._shmem_buffer = None self._shmem_buffer = None
self._has_set_output_schema = False self._has_set_output_schema = False
self._has_updated_shapes = False self._has_updated_shapes = False
# TODO: replace self._processing_time_ema = utils.ExponentialMovingAverage(alpha=0.3)
#self._rate_tracker = calibrationBase.utils.UpdateRate( self._rate_tracker = utils.WindowRateTracker()
#interval=config.get("performance.rateBufferSpan")
#)
self._state_reset_timer = None
self._buffered_status_update = Hash( self._buffered_status_update = Hash(
"trainId", "trainId",
0, 0,
"performance.rate", "performance.rate",
0, 0,
"performance.theoreticalRate", "performance.processingDuration",
float("NaN"),
"performance.lastProcessingDuration",
0, 0,
) )
self._last_processing_started = 0 # not input handler should put timestamp
self._rate_update_timer = utils.RepeatingTimer( self._rate_update_timer = utils.RepeatingTimer(
interval=config.get("performance.rateUpdateInterval"), interval=0.5,
callback=self._update_actual_rate, callback=self._update_rate_and_state,
) )
self._buffer_lock = threading.Lock() self._buffer_lock = threading.Lock()
self.KARABO_SLOT(self.requestScene) self.KARABO_SLOT(self.requestScene)
def preReconfigure(self, config): def preReconfigure(self, config):
if config.has("performance.rateUpdateInterval"):
self._rate_update_timer.stop()
self._rate_update_timer = utils.RepeatingTimer(
interval=config.get("performance.rateUpdateInterval"),
callback=self._update_actual_rate,
)
if config.has("performance.rateBufferSpan"):
#self._rate_tracker = calibrationBase.utils.UpdateRate(
#interval=config.get("performance.rateBufferSpan")
#)
...
for path in config.getPaths(): for path in config.getPaths():
if path in self._schema_cache_slots: if path in self._schema_cache_slots:
self._schema_cache[path] = config.get(path) self._schema_cache[path] = config.get(path)
...@@ -620,28 +571,26 @@ class BaseCorrection(PythonDevice): ...@@ -620,28 +571,26 @@ class BaseCorrection(PythonDevice):
self._has_updated_shapes = True self._has_updated_shapes = True
def _reset_state_from_processing(self): def _update_rate_and_state(self):
# TODO: merge with rate updates (buffer status update checking)
if self.get("state") is State.PROCESSING:
self.updateState(State.ON)
self._state_reset_timer = None
def _update_actual_rate(self):
if not self.get("state") is State.PROCESSING: if not self.get("state") is State.PROCESSING:
self._rate_update_timer.delay() self._rate_update_timer.delay()
return return
#self._buffered_status_update.set("performance.rate", self._rate_tracker.rate())
last_processing = self._buffered_status_update.get( self._buffered_status_update.set("performance.rate", self._rate_tracker.get())
"performance.lastProcessingDuration" self._buffered_status_update.set(
"performance.processingDuration", self._processing_time_ema.get() * 1000
) )
if last_processing > 0: # trainId should be set on _buffered_status_update in input handler
theoretical_rate = 1000 / last_processing
self._buffered_status_update.set(
"performance.theoreticalRate", theoretical_rate
)
self.set(self._buffered_status_update) self.set(self._buffered_status_update)
self._rate_update_timer.delay() self._rate_update_timer.delay()
if (
timeit.default_timer() - self._last_processing_started
> PROCESSING_STATE_TIMEOUT
):
if self.get("state") is State.PROCESSING:
self.updateState(State.ON)
def handle_eos(self, channel): def handle_eos(self, channel):
self._has_set_output_schema = False self._has_set_output_schema = False
self.updateState(State.ON) self.updateState(State.ON)
......
import collections
import functools import functools
import inspect import inspect
import threading import threading
...@@ -181,3 +182,46 @@ class Throttler: ...@@ -181,3 +182,46 @@ class Throttler:
return True return True
else: else:
return False return False
class ExponentialMovingAverage:
def __init__(self, alpha, use_first_value=True):
self.alpha = alpha
self.initialised = not use_first_value
self.mean = 0
def update(self, value):
if self.initialised:
self.mean += self.alpha * (value - self.mean)
else:
self.mean = value
self.initialised = True
def get(self):
return self.mean
class WindowRateTracker:
def __init__(self, buffer_size=20, time_window=20, timer=timeit.default_timer):
self.time_window = time_window
self.buffer_size = buffer_size
self.deque = collections.deque(maxlen=self.buffer_size)
self.timer = timer
def update(self):
now = self.timer()
self.deque.append(now)
def get(self):
now = self.timer()
cutoff = now - self.time_window
try:
while self.deque[0] < cutoff:
self.deque.popleft()
except IndexError:
return 0
if len(self.deque) < self.buffer_size:
return len(self.deque) / self.time_window
else:
# if going faster than buffer size per time window, look at timestamps
return len(self.deque) / (self.deque[-1] - self.deque[0])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment