Skip to content
Snippets Groups Projects
Commit b6e1885a authored by David Hammer's avatar David Hammer
Browse files

Clean up and remove unused utils

parent 33323b67
No related branches found
No related tags found
2 merge requests!12Snapshot: field test deployed version as of end of run 202201,!3Base correction device, CalCat interaction, DSSC and AGIPD devices
...@@ -4,13 +4,11 @@ import timeit ...@@ -4,13 +4,11 @@ import timeit
import numpy as np import numpy as np
from karabo.bound import ( from karabo.bound import (
BOOL_ELEMENT, BOOL_ELEMENT,
NODE_ELEMENT,
FLOAT_ELEMENT, FLOAT_ELEMENT,
KARABO_CLASSINFO, KARABO_CLASSINFO,
NODE_ELEMENT,
STRING_ELEMENT, STRING_ELEMENT,
UINT32_ELEMENT, ImageData,
Hash,
Schema,
) )
from karabo.common.states import State from karabo.common.states import State
......
...@@ -567,13 +567,13 @@ class BaseCorrection(PythonDevice): ...@@ -567,13 +567,13 @@ class BaseCorrection(PythonDevice):
# TODO: put this under lock so dictionary doesn't change shape underneath us # TODO: put this under lock so dictionary doesn't change shape underneath us
for constant_name, constant_data in self._cached_constants.items(): for constant_name, constant_data in self._cached_constants.items():
self.log.INFO(f"Reload constant {constant_name}")
self._load_constant_to_gpu(constant_name, constant_data) self._load_constant_to_gpu(constant_name, constant_data)
self._has_updated_shapes = True self._has_updated_shapes = True
def _update_rate_and_state(self): def _update_rate_and_state(self):
if not self.get("state") is State.PROCESSING: if not self.get("state") is State.PROCESSING:
self._rate_update_timer.delay()
return return
self._buffered_status_update.set("performance.rate", self._rate_tracker.get()) self._buffered_status_update.set("performance.rate", self._rate_tracker.get())
...@@ -582,7 +582,6 @@ class BaseCorrection(PythonDevice): ...@@ -582,7 +582,6 @@ class BaseCorrection(PythonDevice):
) )
# trainId should be set on _buffered_status_update in input handler # trainId should be set on _buffered_status_update in input handler
self.set(self._buffered_status_update) self.set(self._buffered_status_update)
self._rate_update_timer.delay()
if ( if (
timeit.default_timer() - self._last_processing_started timeit.default_timer() - self._last_processing_started
......
import re
import pycuda.driver
import pycuda.gpuarray
_gpuptr_re = re.compile(
r"GPUPTR:(?P<gpu_pointer>\w+)" r"DEVID:(?P<device_id>.+)" r"SHAPE:(?P<shape>.+)"
)
def get_shape_from_ipc_handle(handle_string):
match = _gpuptr_re.match(handle_string)
return tuple(int(num) for num in match.group("shape").split(","))
class IPCGPUArray:
"""Context manager providing a GPUArray opened from string encoding IPC handle
Arguments:
handle_string: String encoding a "GPU pointer" (IPC address) plus some more
stuff. This is "parsed" using _gpuptr_re.
dtype: self-explanatory (but make sure it is correct)
aray shape is parsed from the handle_string
"""
def __init__(self, handle_string, dtype, gpu_pointer_re=None):
match = _gpuptr_re.match(handle_string)
assert match is not None
self.dtype = dtype
self.handle_address = bytearray.fromhex(match.group("gpu_pointer"))
self.shape = tuple(int(num) for num in match.group("shape").split(","))
# assuming contiguous C-order strides probably
# TODO: smarter
self.open_handle = None
self.gpu_array = None
def __enter__(self):
self.open_handle = pycuda.driver.IPCMemoryHandle(self.handle_address)
self.gpu_array = pycuda.gpuarray.GPUArray(
self.shape, dtype=self.dtype, gpudata=self.open_handle
)
return self.gpu_array
def __exit__(self, t, v, tb):
self.open_handle.close()
class GPUContextContext:
def __init__(self, gpu_context):
self.gpu_context = gpu_context
def __enter__(self):
self.gpu_context.push()
return self.gpu_context
def __exit__(self, t, v, tb):
self.gpu_context.pop()
...@@ -86,43 +86,8 @@ def shape_after_transpose(input_shape, transpose_pattern, squeeze=True): ...@@ -86,43 +86,8 @@ def shape_after_transpose(input_shape, transpose_pattern, squeeze=True):
return tuple(np.array(input_shape)[list(transpose_pattern)].tolist()) return tuple(np.array(input_shape)[list(transpose_pattern)].tolist())
class DelayableTimer:
"""Start a timer which can be extended
Useful for reverting to state after inactivity, for instance.
timer defaults to timeit.default_timer - it should be a timer returning
globally increasing number of seconds.
"""
def __init__(self, timeout, callback, timer=timeit.default_timer):
self.timer = timer
self.stop_time = self.timer() + timeout
def runner():
now = self.timer()
while now < self.stop_time:
diff = self.stop_time - now
time.sleep(diff)
now = self.timer()
callback()
self.thread = threading.Thread(target=runner)
self.thread.start()
def set_timeout(self, timeout):
"""Delay stop time to now + timeout
If now + timeout is sooner than already set timeout, this does nothing"""
self.stop_time = self.timer() + timeout
def add_timeout(self, timeout):
"""Simply add timeout to current stop time"""
self.stop_time += timeout
class RepeatingTimer: class RepeatingTimer:
"""Similar to DelayableTimer, but will keep running with pre-set intervals""" """A timer which will call callback every interval seconds"""
def __init__(self, interval, callback, timer=timeit.default_timer, start_now=True): def __init__(self, interval, callback, timer=timeit.default_timer, start_now=True):
self.timer = timer self.timer = timer
...@@ -132,24 +97,21 @@ class RepeatingTimer: ...@@ -132,24 +97,21 @@ class RepeatingTimer:
if start_now: if start_now:
self.start() self.start()
def delay(self):
self.stop_time = self.timer() + self.interval
def start(self): def start(self):
self.stopped = False self.stopped = False
self.stop_time = self.timer() + self.interval self.wakeup_time = self.timer() + self.interval
def runner(): def runner():
while not self.stopped: while not self.stopped:
now = self.timer() now = self.timer()
while now < self.stop_time: while now < self.wakeup_time:
diff = self.stop_time - now diff = self.wakeup_time - now
time.sleep(diff) time.sleep(diff)
if self.stopped: if self.stopped:
return return
now = self.timer() now = self.timer()
self.callback() self.callback()
self.stop_time = self.timer() + self.interval self.wakeup_time = self.timer() + self.interval
self.thread = threading.Thread(target=runner) self.thread = threading.Thread(target=runner)
self.thread.start() self.thread.start()
...@@ -158,32 +120,6 @@ class RepeatingTimer: ...@@ -158,32 +120,6 @@ class RepeatingTimer:
self.stopped = True self.stopped = True
class Throttler:
"""Similar to DelayableTimer, but will keep running with pre-set intervals"""
def __init__(self, interval, timer=timeit.default_timer):
self.timer = timer
self.interval = interval
self.latest_call = None
def ready(self):
if self.latest_call is None:
return True
else:
return self.latest_call + self.interval <= self.timer()
def update(self):
self.latest_call = self.timer()
def ready_update(self):
time = self.timer()
if self.latest_call is None or self.latest_call + self.interval <= time:
self.latest_call = time
return True
else:
return False
class ExponentialMovingAverage: class ExponentialMovingAverage:
def __init__(self, alpha, use_first_value=True): def __init__(self, alpha, use_first_value=True):
self.alpha = alpha self.alpha = alpha
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment