diff --git a/src/calng/AgipdCorrection.py b/src/calng/AgipdCorrection.py index 79ad47833a242f40f99435a0dd44332a1eb78f8d..276c223a6702e95a25e92aa06736c8c7bc0417ae 100644 --- a/src/calng/AgipdCorrection.py +++ b/src/calng/AgipdCorrection.py @@ -4,13 +4,11 @@ import timeit import numpy as np from karabo.bound import ( BOOL_ELEMENT, - NODE_ELEMENT, FLOAT_ELEMENT, KARABO_CLASSINFO, + NODE_ELEMENT, STRING_ELEMENT, - UINT32_ELEMENT, - Hash, - Schema, + ImageData, ) from karabo.common.states import State diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py index 5ce2bdf8459d050e0b8429bec62316a5d5df0888..3f12c65744e04559907c719e85a5e99a9fdb9777 100644 --- a/src/calng/base_correction.py +++ b/src/calng/base_correction.py @@ -567,13 +567,13 @@ class BaseCorrection(PythonDevice): # TODO: put this under lock so dictionary doesn't change shape underneath us for constant_name, constant_data in self._cached_constants.items(): + self.log.INFO(f"Reload constant {constant_name}") self._load_constant_to_gpu(constant_name, constant_data) self._has_updated_shapes = True def _update_rate_and_state(self): if not self.get("state") is State.PROCESSING: - self._rate_update_timer.delay() return self._buffered_status_update.set("performance.rate", self._rate_tracker.get()) @@ -582,7 +582,6 @@ class BaseCorrection(PythonDevice): ) # trainId should be set on _buffered_status_update in input handler self.set(self._buffered_status_update) - self._rate_update_timer.delay() if ( timeit.default_timer() - self._last_processing_started diff --git a/src/calng/gpu_utils.py b/src/calng/gpu_utils.py deleted file mode 100644 index c945aee1a5bf8b64fa2621de49cf19511cabd41b..0000000000000000000000000000000000000000 --- a/src/calng/gpu_utils.py +++ /dev/null @@ -1,59 +0,0 @@ -import re - -import pycuda.driver -import pycuda.gpuarray - -_gpuptr_re = re.compile( - r"GPUPTR:(?P<gpu_pointer>\w+)" r"DEVID:(?P<device_id>.+)" r"SHAPE:(?P<shape>.+)" -) - - -def get_shape_from_ipc_handle(handle_string): - match = _gpuptr_re.match(handle_string) - return tuple(int(num) for num in match.group("shape").split(",")) - - -class IPCGPUArray: - """Context manager providing a GPUArray opened from string encoding IPC handle - - Arguments: - handle_string: String encoding a "GPU pointer" (IPC address) plus some more - stuff. This is "parsed" using _gpuptr_re. - dtype: self-explanatory (but make sure it is correct) - aray shape is parsed from the handle_string - """ - - def __init__(self, handle_string, dtype, gpu_pointer_re=None): - match = _gpuptr_re.match(handle_string) - assert match is not None - - self.dtype = dtype - self.handle_address = bytearray.fromhex(match.group("gpu_pointer")) - self.shape = tuple(int(num) for num in match.group("shape").split(",")) - # assuming contiguous C-order strides probably - # TODO: smarter - - self.open_handle = None - self.gpu_array = None - - def __enter__(self): - self.open_handle = pycuda.driver.IPCMemoryHandle(self.handle_address) - self.gpu_array = pycuda.gpuarray.GPUArray( - self.shape, dtype=self.dtype, gpudata=self.open_handle - ) - return self.gpu_array - - def __exit__(self, t, v, tb): - self.open_handle.close() - - -class GPUContextContext: - def __init__(self, gpu_context): - self.gpu_context = gpu_context - - def __enter__(self): - self.gpu_context.push() - return self.gpu_context - - def __exit__(self, t, v, tb): - self.gpu_context.pop() diff --git a/src/calng/utils.py b/src/calng/utils.py index 202ed9e6d689d1965ca9505f7167e18e7b866ba7..7c4c78e5f47e1b338f9da54afe172632f72064dc 100644 --- a/src/calng/utils.py +++ b/src/calng/utils.py @@ -86,43 +86,8 @@ def shape_after_transpose(input_shape, transpose_pattern, squeeze=True): return tuple(np.array(input_shape)[list(transpose_pattern)].tolist()) -class DelayableTimer: - """Start a timer which can be extended - - Useful for reverting to state after inactivity, for instance. - - timer defaults to timeit.default_timer - it should be a timer returning - globally increasing number of seconds. - """ - - def __init__(self, timeout, callback, timer=timeit.default_timer): - self.timer = timer - self.stop_time = self.timer() + timeout - - def runner(): - now = self.timer() - while now < self.stop_time: - diff = self.stop_time - now - time.sleep(diff) - now = self.timer() - callback() - - self.thread = threading.Thread(target=runner) - self.thread.start() - - def set_timeout(self, timeout): - """Delay stop time to now + timeout - - If now + timeout is sooner than already set timeout, this does nothing""" - self.stop_time = self.timer() + timeout - - def add_timeout(self, timeout): - """Simply add timeout to current stop time""" - self.stop_time += timeout - - class RepeatingTimer: - """Similar to DelayableTimer, but will keep running with pre-set intervals""" + """A timer which will call callback every interval seconds""" def __init__(self, interval, callback, timer=timeit.default_timer, start_now=True): self.timer = timer @@ -132,24 +97,21 @@ class RepeatingTimer: if start_now: self.start() - def delay(self): - self.stop_time = self.timer() + self.interval - def start(self): self.stopped = False - self.stop_time = self.timer() + self.interval + self.wakeup_time = self.timer() + self.interval def runner(): while not self.stopped: now = self.timer() - while now < self.stop_time: - diff = self.stop_time - now + while now < self.wakeup_time: + diff = self.wakeup_time - now time.sleep(diff) if self.stopped: return now = self.timer() self.callback() - self.stop_time = self.timer() + self.interval + self.wakeup_time = self.timer() + self.interval self.thread = threading.Thread(target=runner) self.thread.start() @@ -158,32 +120,6 @@ class RepeatingTimer: self.stopped = True -class Throttler: - """Similar to DelayableTimer, but will keep running with pre-set intervals""" - - def __init__(self, interval, timer=timeit.default_timer): - self.timer = timer - self.interval = interval - self.latest_call = None - - def ready(self): - if self.latest_call is None: - return True - else: - return self.latest_call + self.interval <= self.timer() - - def update(self): - self.latest_call = self.timer() - - def ready_update(self): - time = self.timer() - if self.latest_call is None or self.latest_call + self.interval <= time: - self.latest_call = time - return True - else: - return False - - class ExponentialMovingAverage: def __init__(self, alpha, use_first_value=True): self.alpha = alpha