Skip to content
Snippets Groups Projects
Commit 3e1522bc authored by David Hammer's avatar David Hammer
Browse files

Initial steps towards abstracting base classes, adding AGIPD version

parent 7bb78f18
No related branches found
No related tags found
2 merge requests!12Snapshot: field test deployed version as of end of run 202201,!3Base correction device, CalCat interaction, DSSC and AGIPD devices
...@@ -24,6 +24,7 @@ setup(name='calng', ...@@ -24,6 +24,7 @@ setup(name='calng',
packages=find_packages('src'), packages=find_packages('src'),
entry_points={ entry_points={
'karabo.bound_device': [ 'karabo.bound_device': [
'AgipdCorrection = calng.AgipdCorrection:AgipdCorrection',
'DsscCorrection = calng.DsscCorrection:DsscCorrection', 'DsscCorrection = calng.DsscCorrection:DsscCorrection',
'ModuleStacker = calng.ModuleStacker:ModuleStacker', 'ModuleStacker = calng.ModuleStacker:ModuleStacker',
'ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ', 'ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ',
......
import timeit
import calibrationBase
import numpy as np
from karabo.bound import KARABO_CLASSINFO
from karabo.common.states import State
from . import shmem_utils, utils
from ._version import version as deviceVersion
from .base_correction import BaseCorrection
from .agipd_gpu import AgipdGpuRunner
@KARABO_CLASSINFO("AgipdCorrection", deviceVersion)
class AgipdCorrection(BaseCorrection):
@staticmethod
def expectedParameters(expected):
AgipdCorrection.addConstant(
"ThresholdsDark",
"Dark",
expected,
optional=False,
mandatoryForIteration=True,
)
AgipdCorrection.addConstant(
"Offset", "Dark", expected, optional=False, mandatoryForIteration=True
)
AgipdCorrection.addConstant(
"SlopesPC", "Dark", expected, optional=True, mandatoryForIteration=True
)
AgipdCorrection.addConstant(
"SlopesFF",
"Illuminated",
expected,
optional=True,
mandatoryForIteration=True,
)
AgipdCorrection.addConstant(
"BadPixelsDark", "Dark", expected, optional=True, mandatoryForIteration=True
)
AgipdCorrection.addConstant(
"BadPixelsPC", "Dark", expected, optional=True, mandatoryForIteration=True
)
AgipdCorrection.addConstant(
"BadPixelsFF",
"Illuminated",
expected,
optional=True,
mandatoryForIteration=True,
)
super(AgipdCorrection, AgipdCorrection).expectedParameters(expected)
def __init__(self, config):
super().__init__(config)
output_axis_order = config.get("dataFormat.outputAxisOrder")
if output_axis_order == "pixels-fast":
self._output_transpose = (0, 2, 1)
elif output_axis_order == "memorycells-fast":
self._output_transpose = (2, 1, 0)
else:
self._output_transpose = None
self._offset_map = None
self._update_pulse_filter(config.get("pulseFilter"))
self._update_shapes(
config.get("dataFormat.pixelsX"),
config.get("dataFormat.pixelsY"),
config.get("dataFormat.memoryCells"),
self.pulse_filter,
self._output_transpose,
)
self.updateState(State.ON)
def process_input(self, data, metadata):
"""Registered for dataInput, handles all processing and sending"""
if not self.get("doAnything"):
if self.get("state") is State.PROCESSING:
self.updateState(State.ACTIVE)
return
source = metadata.get("source")
if source not in self.sources:
self.log.INFO(f"Ignoring unknown source {source}")
return
# TODO: what are these empty things for?
if not data.has("image"):
self.log.INFO("Ignoring hash without image node")
return
time_start = timeit.default_timer()
train_id = metadata.getAttribute("timestamp", "tid")
cell_table = np.squeeze(data.get("image.cellId"))
assert isinstance(cell_table, np.ndarray), "image.cellId should be ndarray"
if len(cell_table.shape) == 0:
msg = "cellId had 0 dimensions. DAQ may not be sending data."
self.set("status", msg)
self.log.WARN(msg)
return
# original shape: memory_cell, data/raw_gain, x, y
# TODO: consider making paths configurable
image_data = data.get("image.data")
self.log.INFO(f"Image data had shape: {image_data.shape}")
return
if image_data.shape[0] != self.get("dataFormat.memoryCells"):
self.set(
"status", f"Updating input shapes based on received {image_data.shape}"
)
# TODO: truncate if > 800
self.set("dataFormat.memoryCells", image_data.shape[0])
with self._buffer_lock:
self._update_pulse_filter(self.get("pulseFilter"))
self._update_shapes(
self.get("dataFormat.pixelsX"),
self.get("dataFormat.pixelsY"),
self.get("dataFormat.memoryCells"),
self.pulse_filter,
self._output_transpose,
)
# TODO: check shape (DAQ fake data and RunToPipe don't agree)
# TODO: consider just updating shapes based on whatever comes in
correction_cell_num = self.get("dataFormat.memoryCellsCorrection")
do_generate_preview = train_id % self.get(
"preview.trainIdModulo"
) == 0 and self.get("preview.enable")
can_apply_correction = correction_cell_num > 0
do_apply_correction = self.get("applyCorrection")
if not self.get("state") is State.PROCESSING:
self.updateState(State.PROCESSING)
self.set("status", "Processing data")
if self._state_reset_timer is None:
self._state_reset_timer = utils.DelayableTimer(
timeout=self.get("processingStateTimeout"),
callback=self._reset_state_from_processing,
)
else:
self._state_reset_timer.set_timeout(self.get("processingStateTimeout"))
with self._buffer_lock:
cell_table = cell_table[self.pulse_filter]
pulse_table = np.squeeze(data.get("image.pulseId"))[self.pulse_filter]
cell_table_max = np.max(cell_table)
if do_apply_correction:
if not can_apply_correction:
msg = "No constant loaded, correction will not be applied."
self.log.WARN(msg)
self.set("status", msg)
do_apply_correction = False
elif cell_table_max >= correction_cell_num:
msg = (
f"Max cell ID ({cell_table_max}) exceeds range for loaded "
f"constant (has {correction_cell_num} cells). Some frames "
"will not be corrected."
)
self.log.WARN(msg)
self.set("status", msg)
self.gpu_runner.load_data(image_data)
buffer_handle, buffer_array = self._shmem_buffer.next_slot()
if do_apply_correction:
self.gpu_runner.load_cell_table(cell_table)
self.gpu_runner.correct()
else:
self.gpu_runner.only_cast()
self.gpu_runner.reshape(out=buffer_array)
if do_generate_preview:
preview_slice_index = self.get("preview.pulse")
if preview_slice_index >= 0:
# look at pulse_table to find which index this pulse ID is in
pulse_id_found = np.where(pulse_table == preview_slice_index)[0]
if len(pulse_id_found) == 0:
pulse_found_instead = pulse_table[0]
msg = (
f"Pulse {preview_slice_index} not found in "
f"image.pulseId, arbitrary pulse "
f"{pulse_found_instead} will be shown."
)
preview_slice_index = 0
self.log.WARN(msg)
self.set("status", msg)
else:
preview_slice_index = pulse_id_found[0]
if not do_apply_correction:
if can_apply_correction:
# in this case, cell table has not been loaded, but needs to be now
self.gpu_runner.load_cell_table(cell_table)
else:
# in this case, there will be no corrected preview
self.log.WARN(
"Corrected preview will not actually be corrected."
)
preview_raw, preview_corrected = self.gpu_runner.compute_preview(
preview_slice_index,
have_corrected=do_apply_correction,
can_correct=can_apply_correction,
)
data.set("image.data", buffer_handle)
data.set("image.cellId", cell_table[:, np.newaxis])
data.set("image.pulseId", pulse_table[:, np.newaxis])
data.set("calngShmemPaths", ["image.data"])
self._write_output(data, metadata)
if do_generate_preview:
self._write_combiner_preview(
preview_raw, preview_corrected, train_id, source
)
# update rate etc.
self._buffered_status_update.set("trainId", train_id)
self._rate_tracker.update()
time_spent = timeit.default_timer() - time_start
self._buffered_status_update.set(
"performance.lastProcessingDuration", time_spent * 1000
)
if self.get("performance.rateUpdateOnEachInput"):
self._update_actual_rate()
def constantLoaded(self):
"""Hook from CalibrationReceiverBaseDevice called after each getConstant
Here, used to load the received constants (or correction maps derived
fromt them) onto GPU.
TODO: call after receiving *all* constants instead of calling once per
new constant (will cause some overhead for bigger devices)
"""
self.log.WARN("Not ready to handle constants yet")
...
def _update_pulse_filter(self, filter_string):
"""Called whenever the pulse filter changes, typically followed by
_update_shapes"""
if filter_string.strip() == "":
new_filter = np.arange(self.get("dataFormat.memoryCells"), dtype=np.uint16)
else:
new_filter = np.array(eval(filter_string), dtype=np.uint16)
assert np.max(new_filter) < self.get("dataFormat.memoryCells")
self.pulse_filter = new_filter
def _update_shapes(
self, pixels_x, pixels_y, memory_cells, pulse_filter, output_transpose
):
"""(Re)initialize (GPU) buffers according to expected data shapes"""
input_data_shape = (memory_cells, 1, pixels_y, pixels_x)
# reflect the axis reordering in the expected output shape
output_data_shape = utils.shape_after_transpose(
input_data_shape, output_transpose
)
self.set("dataFormat.inputDataShape", list(input_data_shape))
self.set("dataFormat.outputDataShape", list(output_data_shape))
if self._shmem_buffer is None:
shmem_buffer_name = self.getInstanceId() + ":dataOutput"
memory_budget = self.get("outputShmemBufferSize") * 2 ** 30
self.log.INFO(f"Opening new shmem buffer: {shmem_buffer_name}")
self._shmem_buffer = shmem_utils.ShmemCircularBuffer(
memory_budget,
output_data_shape,
self.output_data_dtype,
shmem_buffer_name,
)
else:
self._shmem_buffer.change_shape(output_data_shape)
self.gpu_runner = AgipdGpuRunner(
pixels_x,
pixels_y,
memory_cells,
output_transpose=output_transpose,
input_data_dtype=self.input_data_dtype,
output_data_dtype=self.output_data_dtype,
)
self._update_maps_on_gpu()
def _update_maps_on_gpu(self):
"""Updates the correction maps stored on GPU based on constants known
This only does something useful if constants have been retrieved from
CalCat. Should be called automatically upon retrieval and after
changing the data shape.
"""
self.set("status", "Updating constants on GPU using known constants")
self.updateState(State.CHANGING)
if self._offset_map is not None:
self.gpu_runner.load_constants(self._offset_map)
msg = "Done transferring known constant(s) to GPU"
self.log.INFO(msg)
self.set("status", msg)
self.updateState(State.ON)
This diff is collapsed.
import enum
import cupy
import numpy as np
from . import base_gpu, utils
class CorrectionFlags(enum.IntFlag):
THRESHOLD = 1
OFFSET = 2
BLSHIFT = 4
REL_GAIN_PC = 8
REL_GAIN_XRAY = 16
BPMASK = 32
class AgipdGpuRunner(base_gpu.BaseGpuRunner):
_kernel_source_filename = "agipd_gpu_kernels.cpp"
def __init__(
self,
pixels_x,
pixels_y,
memory_cells,
output_transpose=(1, 2, 0), # default: memorycells-fast
constant_memory_cells=None,
input_data_dtype=np.uint16,
output_data_dtype=np.float32,
):
self.input_shape = (memory_cells, 2, pixels_x, pixels_y)
self.processed_shape = (memory_cells, pixels_x, pixels_y)
super().__init__(
pixels_x,
pixels_y,
memory_cells,
output_transpose,
constant_memory_cells,
input_data_dtype,
output_data_dtype,
)
self.gain_map_gpu = cupy.empty(self.processed_shape, dtype=np.uint8)
self.map_shape = (self.pixels_x, self.pixels_y, self.constant_memory_cells)
self.gm_map_shape = self.map_shape + (3,) # for gain-mapped constants
self.threshold_map_shape = self.map_shape + (2,)
# constants
self.gain_thresholds_gpu = cupy.empty(
self.threshold_map_shape, dtype=np.float32
)
self.offset_map_gpu = cupy.zeros(self.gm_map_shape, dtype=np.float32)
self.rel_gain_pc_map_gpu = cupy.ones(self.gm_map_shape, dtype=np.float32)
# optional extra offset for medium gain
self.md_additional_offset_gpu = cupy.zeros(1, dtype=np.float32)
self.rel_gain_xray_map_gpu = cupy.ones(self.map_shape, dtype=np.float32)
self.badpixel_map_gpu = cupy.zeros(self.map_shape, dtype=np.uint32)
self.update_block_size((1, 1, 64))
def load_thresholds(self, threshold_map):
# shape: y, x, memory cell, threshold 0 / threshold 1 / 3 gain values
# TODO: do we need the gain values for anything?
to_set = np.transpose(threshold_map[..., :2], (1, 0, 2, 3)).astype(np.float32)
self.gain_thresholds_gpu.set(
to_set
)
def load_offset_map(self, offset_map):
# shape: y, x, memory cell, gain stage
self.offset_map_gpu.set(
np.transpose(offset_map, (1, 0, 2, 3)).astype(np.float32)
)
def load_rel_gain_pc_map(self, slopes_pc_map):
# pc has funny shape (11, 352, 128, 512) from file
# this is (fi, memory cell, y, x)
pc_high_m = slopes_pc_map[0]
pc_high_I = slopes_pc_map[1]
pc_med_m = slopes_pc_map[3]
pc_med_I = slopes_pc_map[4]
frac_high_med = pc_high_m / pc_med_m
# TODO: verify formula
md_additional_offset = pc_high_I - pc_med_I * frac_high_med
self.rel_gain_pc_map_gpu[..., 0] = 1 # rel xray gain can come after
self.rel_gain_pc_map_gpu[..., 1] = self.rel_gain_pc_map[..., 0] * frac_high_med
self.rel_gain_pc_map_gpu[..., 2] = self.rel_gain_pc_map[..., 1] * 4.48
# TODO: enable overriding this based on user input
self.md_additional_offset_gpu.set(md_additional_offset)
def load_rel_gain_ff_map(self, slopes_ff_map):
# constant shape: y, x, memory cell
if slopes_ff_map.shape[2] == 2:
# old format, is per pixel only
raise NotImplementedError("Old slopes FF map format")
self.rel_gain_xray_map_gpu.set(
np.transpose(slopes_ff_map, (1, 0, 2)).astype(np.float32)
)
def correct(self, flags):
"""Apply corrections to data (must load constant, data, and cell_table first)
Applies corrections to input data and casts to desired output dtype.
Parameter cell_table allows out of order or non-contiguous memory cells
in input data. Both input ndarrays are assumed to be on GPU already,
preferably wrapped in GPU arrays (cupy array).
Will return string encoded handle to shared memory output buffer and
(view of) said buffer as an ndarray. Keep in mind that the output
buffers will get overwritten eventually (circular buffer).
"""
self.correction_kernel(
self.full_grid,
self.full_block,
(
self.input_data_gpu,
self.cell_table_gpu,
np.uint8(flags),
self.gain_thresholds_gpu,
self.offset_map_gpu,
self.rel_gain_pc_map_gpu,
self.md_additional_offset_gpu,
self.rel_gain_xray_map_gpu,
self.badpixel_map_gpu,
self.gain_map_gpu,
self.processed_data_gpu,
),
)
def _init_kernels(self):
kernel_source = self._kernel_template.render(
{
"pixels_x": self.pixels_x,
"pixels_y": self.pixels_y,
"data_memory_cells": self.memory_cells,
"constant_memory_cells": self.constant_memory_cells,
"input_data_dtype": utils.np_dtype_to_c_type(self.input_data_dtype),
"output_data_dtype": utils.np_dtype_to_c_type(self.output_data_dtype),
}
)
print(kernel_source)
self.source_module = cupy.RawModule(code=kernel_source)
self.correction_kernel = self.source_module.get_function("correct")
self.casting_kernel = self.source_module.get_function("only_cast")
#include <cuda_fp16.h>
#include <math_constants.h>
const unsigned char CORR_THRESHOLD = 1;
const unsigned char CORR_OFFSET = 2;
const unsigned char CORR_BLSHIFT = 4;
const unsigned char CORR_REL_GAIN_PC = 8;
const unsigned char CORR_REL_GAIN_XRAY = 16;
const unsigned char CORR_BPMASK = 32;
extern "C" {
/*
Perform correction: offset
Take cell_table into account when getting correction values
Converting to float for doing the correction
Converting to output dtype at the end
*/
__global__ void correct(const {{input_data_dtype}}* data,
const unsigned short* cell_table,
const unsigned char corr_flags,
const float* threshold_map,
const float* offset_map,
const float* rel_gain_pc_map,
const float md_additional_offset,
const float* rel_gain_xray_map,
const unsigned int* bad_pixel_map,
unsigned char* gain_map,
{{output_data_dtype}}* output) {
const size_t X = {{pixels_x}};
const size_t Y = {{pixels_y}};
const size_t input_cells = {{data_memory_cells}};
const size_t map_cells = {{constant_memory_cells}};
const size_t cell = blockIdx.x * blockDim.x + threadIdx.x;
const size_t x = blockIdx.y * blockDim.y + threadIdx.y;
const size_t y = blockIdx.z * blockDim.z + threadIdx.z;
if (cell >= input_cells || y >= Y || x >= X) {
return;
}
// data shape: memory cell, data/raw_gain (dim size 2), x, y
const size_t data_stride_y = 1;
const size_t data_stride_x = Y * data_stride_y;
const size_t data_stride_raw_gain = X * data_stride_x;
const size_t data_stride_cell = 2 * data_stride_raw_gain;
const size_t data_index = cell * data_stride_cell +
0 * data_stride_raw_gain +
y * data_stride_y +
x * data_stride_x;
const size_t raw_gain_index = cell * data_stride_cell +
1 * data_stride_raw_gain +
y * data_stride_y +
x * data_stride_x;
float corrected = (float)data[data_index];
const float raw_gain_val = (float)data[raw_gain_index];
const size_t output_stride_y = 1;
const size_t output_stride_x = output_stride_y * Y;
const size_t output_stride_cell = output_stride_x * X;
const size_t output_index = cell * output_stride_cell + x * output_stride_x + y * output_stride_y;
// threshold constant shape: x, y, cell, threshold (dim size 2)
const size_t threshold_map_stride_threshold = 1;
const size_t threshold_map_stride_cell = 2 * threshold_map_stride_threshold;
const size_t threshold_map_stride_y = map_cells * threshold_map_stride_cell;
const size_t threshold_map_stride_x = Y * threshold_map_stride_y;
// gain mapped constant shape: x, y, memory cell, gain_level (dim size 3)
const size_t gm_map_stride_gain = 1;
const size_t gm_map_stride_cell = 3 * gm_map_stride_gain;
const size_t gm_map_stride_y = map_cells * gm_map_stride_cell;
const size_t gm_map_stride_x = Y * gm_map_stride_y;
// TODO: handle multiple maps, multiple strides, multiple limits
const size_t map_cell = cell_table[cell];
if (map_cell < map_cells) {
unsigned char gain = 0;
if (corr_flags & CORR_THRESHOLD) {
const float threshold_0 = threshold_map[0 * threshold_map_stride_threshold +
map_cell * threshold_map_stride_cell +
y * threshold_map_stride_y +
x * threshold_map_stride_x];
const float threshold_1 = threshold_map[1 * threshold_map_stride_threshold +
map_cell * threshold_map_stride_cell +
y * threshold_map_stride_y +
x * threshold_map_stride_x];
// could consider making this const using ternaries / tiny function
if (raw_gain_val <= threshold_0) {
gain = 0;
} else if (raw_gain_val <= threshold_1) {
gain = 1;
} else {
gain = 2;
}
}
const size_t gm_map_index = gain * gm_map_stride_gain +
map_cell * gm_map_stride_cell +
y * gm_map_stride_y +
x * gm_map_stride_x;
if ((corr_flags & CORR_BPMASK) && bad_pixel_map[gm_map_index]) {
corrected = CUDART_NAN_F;
} else {
if (corr_flags & CORR_OFFSET) {
corrected -= offset_map[gm_map_index];
}
// TODO: baseline shift
if (corr_flags & CORR_REL_GAIN_PC) {
corrected *= rel_gain_pc_map[gm_map_index];
if (gain == 1) {
corrected += md_additional_offset;
}
}
if (corr_flags & CORR_REL_GAIN_XRAY) {
// TODO
//corrected *= rel_gain_xray_map[map_index];
}
}
gain_map[output_index] = gain;
{% if output_data_dtype == "half" %}
output[output_index] = __float2half(corrected);
{% else %}
output[output_index] = ({{output_data_dtype}})corrected;
{% endif %}
} else {
// TODO: decide what to do when we cannot threshold
gain_map[data_index] = 255;
{% if output_data_dtype == "half" %}
output[data_index] = __float2half(corrected);
{% else %}
output[data_index] = ({{output_data_dtype}})corrected;
{% endif %}
}
}
/*
Same as correction, except don't do any correction
*/
__global__ void only_cast(const {{input_data_dtype}}* data,
unsigned char* gain_map,
{{output_data_dtype}}* output) {
}
}
import threading
import calibrationBase
import hashToSchema
import numpy as np
from karabo.bound import (
BOOL_ELEMENT,
FLOAT_ELEMENT,
INPUT_CHANNEL,
INT32_ELEMENT,
NDARRAY_ELEMENT,
NODE_ELEMENT,
OUTPUT_CHANNEL,
STRING_ELEMENT,
UINT32_ELEMENT,
UINT64_ELEMENT,
VECTOR_STRING_ELEMENT,
VECTOR_UINT32_ELEMENT,
ChannelMetaData,
Epochstamp,
Hash,
MetricPrefix,
Schema,
Timestamp,
Trainstamp,
Unit,
)
from karabo.common.states import State
from . import shmem_utils, utils
class BaseCorrection(calibrationBase.CalibrationReceiverBaseDevice):
_dict_cache_slots = {
"applyCorrection",
"doAnything",
"dataFormat.memoryCells",
"dataFormat.memoryCellsCorrection",
"dataFormat.pixelsX",
"dataFormat.pixelsY",
"preview.enable",
"preview.pulse",
"preview.trainIdModulo",
"processingStateTimeout",
"performance.rateUpdateOnEachInput",
"state",
}
@staticmethod
def expectedParameters(expected):
(
BOOL_ELEMENT(expected)
.key("doAnything")
.displayedName("Enable input processing")
.description(
"Toggle handling of input (at all). If False, the input handler of "
"this device will be skipped. Useful to decrease logspam if device is "
"misconfigured."
)
.assignmentOptional()
.defaultValue(True)
.reconfigurable()
.commit(),
BOOL_ELEMENT(expected)
.key("applyCorrection")
.displayedName("Enable correction(s)")
.description(
"Toggle whether not correction(s) are applied to image data. If "
"false, this device still reshapes data to output shape, applies the "
"pulse filter, and casts to output dtype. Useful if constants are "
"missing / bad, or if data is sent to application doing its own "
"correction."
)
.assignmentOptional()
.defaultValue(True)
.reconfigurable()
.commit(),
INPUT_CHANNEL(expected).key("dataInput").commit(),
# note: output schema not set, will be updated to match data later
OUTPUT_CHANNEL(expected).key("dataOutput").commit(),
VECTOR_STRING_ELEMENT(expected)
.key("fastSources")
.displayedName("Fast data sources")
.description(
"Sources to get data from. Only incoming hashes from these sources "
"will be processed."
)
.assignmentMandatory()
.commit(),
STRING_ELEMENT(expected)
.key("pulseFilter")
.displayedName("[disabled] Pulse filter")
.description(
"Filter pulses: will be evaluated as array of indices to keep from "
"data. Can be anything which can be turned into numpy uint16 array. "
"Numpy is available as np. Take care not to include duplicates. If "
"empty, will not filter at all."
)
.readOnly()
.initialValue("")
.commit(),
UINT32_ELEMENT(expected)
.key("outputShmemBufferSize")
.displayedName("Output buffer size limit (GB)")
.description(
"Corrected trains are written to shared memory locations. These are "
"pre-allocated and re-used. This parameter determines how big (number "
"of GB) the circular buffer will be."
)
.assignmentOptional()
.defaultValue(10)
.commit(),
)
(
NODE_ELEMENT(expected)
.key("dataFormat")
.displayedName("Data format (in/out)")
.commit(),
STRING_ELEMENT(expected)
.key("dataFormat.inputImageDtype")
.displayedName("Input image data dtype")
.description("The (numpy) dtype to expect for incoming image data.")
.options("uint16,float32")
.assignmentOptional()
.defaultValue("uint16")
.commit(),
STRING_ELEMENT(expected)
.key("dataFormat.outputImageDtype")
.displayedName("Output image data dtype")
.description(
"The (numpy) dtype to use for outgoing image data. Input is "
"cast to float32, corrections are applied, and only then will "
"the result be cast back to outputImageDtype (all on GPU)."
)
.options("float16,float32,uint16")
.assignmentOptional()
.defaultValue("float32")
.commit(),
# important: shape of data as going into correction
UINT32_ELEMENT(expected)
.key("dataFormat.pixelsX")
.displayedName("Pixels x")
.description("Number of pixels of image data along X axis")
.assignmentMandatory()
.commit(),
UINT32_ELEMENT(expected)
.key("dataFormat.pixelsY")
.displayedName("Pixels y")
.description("Number of pixels of image data along Y axis")
.assignmentMandatory()
.commit(),
UINT32_ELEMENT(expected)
.key("dataFormat.memoryCells")
.displayedName("Memory cells")
.description("Full number of memory cells in incoming data")
.assignmentMandatory()
.commit(),
STRING_ELEMENT(expected)
.key("dataFormat.outputAxisOrder")
.displayedName("Output axis order")
.description(
"Axes of main data output can be reordered after correction. Choose "
"between 'pixels-fast' (memory_cell, x, y), 'memorycells-fast' "
"(x, y, memory_cell), and 'no-reshape' (memory_cell, y, x)"
)
.options("pixels-fast,memorycells-fast,no-reshape")
.assignmentOptional()
.defaultValue("pixels-fast")
.commit(),
UINT32_ELEMENT(expected)
.key("dataFormat.memoryCellsCorrection")
.displayedName("(Debug) Memory cells in correction map")
.description(
"Full number of memory cells in currently loaded correction map. "
"May exceed memory cell number in input if veto is on. "
"This value just displayed for debugging."
)
.readOnly()
.initialValue(0)
.commit(),
VECTOR_UINT32_ELEMENT(expected)
.key("dataFormat.inputDataShape")
.displayedName("Input data shape")
.description(
"Image data shape in incoming data (from reader / DAQ). This value is "
"computed from pixelsX, pixelsY, and memoryCells - this field just "
"shows you what is currently expected."
)
.readOnly()
.initialValue([])
.commit(),
VECTOR_UINT32_ELEMENT(expected)
.key("dataFormat.outputDataShape")
.displayedName("Output data shape")
.description(
"Image data shape for data output from this device. This value is "
"computed from pixelsX, pixelsY, and the size of the pulse filter - "
"this field just shows what is currently expected."
)
.readOnly()
.initialValue([])
.commit(),
)
preview_schema = Schema()
(
NODE_ELEMENT(expected).key("preview").displayedName("Preview").commit(),
NODE_ELEMENT(preview_schema).key("data").commit(),
NDARRAY_ELEMENT(preview_schema).key("data.adc").dtype("FLOAT").commit(),
OUTPUT_CHANNEL(expected)
.key("preview.outputRaw")
.dataSchema(preview_schema)
.commit(),
OUTPUT_CHANNEL(expected)
.key("preview.outputCorrected")
.dataSchema(preview_schema)
.commit(),
BOOL_ELEMENT(expected)
.key("preview.enable")
.displayedName("Enable preview data generation")
.assignmentOptional()
.defaultValue(True)
.reconfigurable()
.commit(),
INT32_ELEMENT(expected)
.key("preview.pulse")
.displayedName("Pulse (or stat) for preview")
.description(
"If this value is ≥ 0, the corresponding index from data will be "
"sliced for the preview. If this value is ≤ 0, preview will be one of "
"the following stats:"
"-1: max, "
"-2: mean, "
"-3: sum, "
"-4: stdev. "
"Max means selecting the pulse with the maximum integrated value. The "
"others are computed across all filtered pulses in the train."
)
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
UINT32_ELEMENT(expected)
.key("preview.trainIdModulo")
.displayedName("Train modulo for throttling")
.description(
"Preview will only be generated for trains whose ID modulo this "
"number is zero. Higher values means fewer preview updates. Should be "
"adjusted based on input rate. Keep in mind that the GUI has limited "
"refresh rate anyway and that network is precious."
)
.assignmentOptional()
.defaultValue(6)
.reconfigurable()
.commit(),
)
(
NODE_ELEMENT(expected)
.key("performance")
.displayedName("Performance measures")
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.rateUpdateInterval")
.displayedName("Rate update interval")
.description(
"Maximum interval (seconds) between updates of the rate. Mostly "
"relevant if not rateUpdateOnEachInput or if input is slow."
)
.assignmentOptional()
.defaultValue(1)
.reconfigurable()
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.rateBufferSpan")
.displayedName("Rate measurement buffer span")
.description("Event buffer timespan (in seconds) for measuring rate")
.assignmentOptional()
.defaultValue(20)
.reconfigurable()
.commit(),
BOOL_ELEMENT(expected)
.key("performance.rateUpdateOnEachInput")
.displayedName("Update rate on each input")
.description(
"Whether or not to update the device rate for each input (otherwise "
"only based on rateUpdateInterval). Note that processed trains are "
"always registered - this just impacts when the rate is computed "
"based on this."
)
.assignmentOptional()
.defaultValue(False)
.reconfigurable()
.commit(),
FLOAT_ELEMENT(expected)
.key("processingStateTimeout")
.description(
"Timeout after which the device goes from PROCESSING back to ACTIVE "
"if no new input is processed"
)
.assignmentOptional()
.defaultValue(10)
.reconfigurable()
.commit(),
# just measurements and counters to display
UINT64_ELEMENT(expected)
.key("trainId")
.displayedName("Train ID")
.description("ID of latest train processed by this device.")
.readOnly()
.initialValue(0)
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.lastProcessingDuration")
.displayedName("Processing time")
.description(
"Amount of time spent in processing latest train. Time includes "
"generating preview and sending data."
)
.unit(Unit.SECOND)
.metricPrefix(MetricPrefix.MILLI)
.readOnly()
.initialValue(0)
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.rate")
.displayedName("Rate")
.description(
"Actual rate with which this device gets / processes / sends trains"
)
.unit(Unit.HERTZ)
.readOnly()
.initialValue(0)
.commit(),
FLOAT_ELEMENT(expected)
.key("performance.theoreticalRate")
.displayedName("Processing rate (hypothetical)")
.description(
"Rate with which this device could hypothetically process trains. "
"Based on lastProcessingDuration."
)
.unit(Unit.HERTZ)
.readOnly()
.initialValue(float("NaN"))
.warnLow(10)
.info("Processing not fast enough for full speed")
.needsAcknowledging(False)
.commit(),
)
def __init__(self, config):
self._dict_cache = {k: config.get(k) for k in self._dict_cache_slots}
super().__init__(config)
self.KARABO_ON_DATA("dataInput", self.process_input)
self.KARABO_ON_EOS("dataInput", self.handle_eos)
self.sources = set(config.get("fastSources"))
self.input_data_dtype = np.dtype(config.get("dataFormat.inputImageDtype"))
self.output_data_dtype = np.dtype(config.get("dataFormat.outputImageDtype"))
self._shmem_buffer = None
self._has_set_output_schema = False
self._rate_tracker = calibrationBase.utils.UpdateRate(
interval=config.get("performance.rateBufferSpan")
)
self._state_reset_timer = None
self._buffered_status_update = Hash(
"trainId",
0,
"performance.rate",
0,
"performance.theoreticalRate",
float("NaN"),
"performance.lastProcessingDuration",
0,
)
self._rate_update_timer = utils.RepeatingTimer(
interval=config.get("performance.rateUpdateInterval"),
callback=self._update_actual_rate,
)
self._buffer_lock = threading.Lock()
def preReconfigure(self, config):
if config.has("performance.rateUpdateInterval"):
self._rate_update_timer.stop()
self._rate_update_timer = utils.RepeatingTimer(
interval=config.get("performance.rateUpdateInterval"),
callback=self._update_actual_rate,
)
if config.has("performance.rateBufferSpan"):
self._rate_tracker = calibrationBase.utils.UpdateRate(
interval=config.get("performance.rateBufferSpan")
)
for path in config.getPaths():
if path in self._dict_cache_slots:
self._dict_cache[path] = config.get(path)
def get(self, key):
if key in self._dict_cache_slots:
return self._dict_cache.get(key)
else:
return super().get(key)
def set(self, *args):
if len(args) == 2:
key, value = args
if key in self._dict_cache_slots:
self._dict_cache[key] = value
super().set(*args)
def _write_output(self, data, old_metadata):
metadata = ChannelMetaData(
old_metadata.get("source"),
Timestamp.fromHashAttributes(old_metadata.getAttributes("timestamp")),
)
if "image.passport" not in data:
data["image.passport"] = []
data["image.passport"].append(self.getInstanceId())
if not self._has_set_output_schema:
self.updateState(State.CHANGING)
self._update_output_schema(data)
self.updateState(State.PROCESSING)
channel = self.signalSlotable.getOutputChannel("dataOutput")
channel.write(data, metadata, False)
channel.update()
def _write_combiner_preview(self, data_raw, data_corrected, train_id, source):
# TODO: take into account updated pulse table after pulse filter
preview_hash = Hash()
preview_hash.set("image.passport", [self.getInstanceId()])
preview_hash.set("image.trainId", train_id)
preview_hash.set("image.pulseId", self.get("preview.pulse"))
# note: have to construct because setting .tid after init is broken
timestamp = Timestamp(Epochstamp(), Trainstamp(train_id))
metadata = ChannelMetaData(source, timestamp)
for channel_name, data in (
("preview.outputRaw", data_raw),
("preview.outputCorrected", data_corrected),
):
preview_hash.set("data.adc", data[..., np.newaxis])
channel = self.signalSlotable.getOutputChannel(channel_name)
channel.write(preview_hash, metadata, False)
channel.update()
def _update_output_schema(self, data):
"""Updates the schema of dataOutput based on parameter data (a Hash)
This should only be called once: when handling output for the first
time, we update the schema to match the modified data we'd send.
"""
self.log.INFO("Updating output schema")
my_schema_update = Schema()
data_schema = hashToSchema.HashToSchema(data).schema
(
OUTPUT_CHANNEL(my_schema_update)
.key("dataOutput")
.dataSchema(data_schema)
.commit()
)
self.updateSchema(my_schema_update)
self._has_set_output_schema = True
def _reset_state_from_processing(self):
if self.get("state") is State.PROCESSING:
self.updateState(State.ON)
self._state_reset_timer = None
def _update_actual_rate(self):
if not self.get("state") is State.PROCESSING:
self._rate_update_timer.delay()
return
self._buffered_status_update.set("performance.rate", self._rate_tracker.rate())
last_processing = self._buffered_status_update.get(
"performance.lastProcessingDuration"
)
if last_processing > 0:
theoretical_rate = 1000 / last_processing
self._buffered_status_update.set(
"performance.theoreticalRate", theoretical_rate
)
self.set(self._buffered_status_update)
self._rate_update_timer.delay()
def handle_eos(self, channel):
self._has_set_output_schema = False
self.updateState(State.ON)
self.signalEndOfStream("dataOutput")
def getConstant(self, name):
"""Hacky override of getConstant to actually return None on failure
Full function is from CalibrationReceiverBaseDevice
"""
const = super().getConstant(name)
if const is not None and len(const.shape) == 1:
self.log.WARN(
f"Constant {name} should probably be None, but is array"
f" of size {const.size}, shape {const.shape}"
)
const = None
return const
import pathlib
import cupy
import cupyx
import jinja2
import numpy as np
from . import utils
class BaseGpuRunner:
"""Class to handle instantiation and execution of CUDA kernels on trains
All GPU buffers are kept within this class. This generally means that you will
want to load data into it and then do something. Typical usage in correct order:
1. instantiate
2. load_constants
3. load_data
4. load_cell_table
5. correct
6a. reshape (only here does data transfer back to host)
6b. compute_preview (optional)
repeat from 2. or 3.
In case no constants are available / correction is not desired, can skip 3 and 4
and use only_cast in step 5 instead of correct (taking care to call
compute_preview with parameters set accordingly).
"""
def __init__(
self,
pixels_x,
pixels_y,
memory_cells,
output_transpose=(2, 1, 0), # default: memorycells-fast
constant_memory_cells=None,
input_data_dtype=np.uint16,
output_data_dtype=np.float32,
):
_src_dir = pathlib.Path(__file__).absolute().parent
# subclass must define _kernel_source_filename
with (_src_dir / self._kernel_source_filename).open("r") as fd:
self._kernel_template = jinja2.Template(fd.read())
self.pixels_x = pixels_x
self.pixels_y = pixels_y
self.memory_cells = memory_cells
self.output_transpose = output_transpose
if constant_memory_cells is None:
self.constant_memory_cells = memory_cells
else:
self.constant_memory_cells = constant_memory_cells
self.output_shape = utils.shape_after_transpose(
self.processed_shape, self.output_transpose
)
# preview will only be single memory cell
self.preview_shape = (self.pixels_x, self.pixels_y)
self.input_data_dtype = input_data_dtype
self.output_data_dtype = output_data_dtype
self._init_kernels()
# reuse output arrays
self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=np.uint16)
self.input_data_gpu = cupy.empty(self.input_shape, dtype=input_data_dtype)
self.processed_data_gpu = cupy.empty(self.input_shape, dtype=output_data_dtype)
self.reshaped_data_gpu = cupy.empty(self.output_shape, dtype=output_data_dtype)
self.preview_raw = cupyx.empty_pinned(self.preview_shape, dtype=np.float32)
self.preview_corrected = cupyx.empty_pinned(
self.preview_shape, dtype=np.float32
)
def only_cast(self):
"""Like correct without the correction
This currently means just casting to output dtype.
"""
self.casting_kernel(
self.full_grid,
self.full_block,
(
self.input_data_gpu,
self.processed_data_gpu,
),
)
def reshape(self, out=None):
"""Move axes to desired output order
The out parameter is passed directly to the get function of GPU array: if
None, then a new ndarray (in host memory) is returned. If not None, then data
will be loaded into the provided array, which must match shape / dtype.
"""
# TODO: avoid copy
if self.output_transpose is None:
self.reshaped_data_gpu = cupy.ascontiguousarray(
cupy.squeeze(self.processed_data_gpu)
)
else:
self.reshaped_data_gpu = cupy.ascontiguousarray(
cupy.transpose(
cupy.squeeze(self.processed_data_gpu), self.output_transpose
)
)
return self.reshaped_data_gpu.get(out=out)
def load_data(self, raw_data):
print(raw_data.shape)
print(self.input_data_gpu.shape)
self.input_data_gpu.set(np.squeeze(raw_data))
def load_cell_table(self, cell_table):
self.cell_table_gpu.set(cell_table)
def compute_preview(self, preview_index, have_corrected=True, can_correct=True):
"""Generate single slice or reduction preview of raw and corrected data
Special values of preview_index are -1 for max, -2 for mean, -3 for
sum, and -4 for stdev (across cells).
Note that preview_index is taken from data without checking cell table.
Caller has to figure out which index along memory cell dimension they
actually want to preview.
Can reuse data from corrected output buffer with have_corrected parameter.
Note that preview requires relevant data to be loaded (raw data for raw
preview, correction map and cell table in addition for corrected preview).
"""
if preview_index < -4:
raise ValueError(f"No statistic with code {preview_index} defined")
elif preview_index >= self.memory_cells:
raise ValueError(f"Memory cell index {preview_index} out of range")
if (not have_corrected) and can_correct:
self.correct()
# if not have_corrected and not can_correct, assume only_cast already done
# TODO: enum around reduction type
for (image_data, output_buffer) in (
(self.input_data_gpu, self.preview_raw),
(self.processed_data_gpu, self.preview_corrected),
):
if preview_index >= 0:
# TODO: change axis order when moving reshape to after correction
image_data[preview_index].astype(np.float32).transpose().get(
out=output_buffer
)
elif preview_index == -1:
# TODO: select argmax independently for raw and corrected?
# TODO: send frame sums somewhere to compute global max frame
max_index = cupy.argmax(
cupy.sum(image_data, axis=(1, 2), dtype=cupy.float32)
)
image_data[max_index].astype(np.float32).transpose().get(
out=output_buffer
)
elif preview_index in (-2, -3, -4):
stat_fun = {-2: cupy.mean, -3: cupy.sum, -4: cupy.std}[preview_index]
stat_fun(image_data, axis=0, dtype=cupy.float32).transpose().get(
out=output_buffer
)
return self.preview_raw, self.preview_corrected
def update_block_size(self, full_block, target_shape=None):
"""Compute grid such that thread block grid covers target shape
Execution is scheduled with 3d "blocks" of CUDA threads, tuning can affect
performance. Correction kernels are "monolithic" for simplicity (i.e. each
logical thread handles one entry in output data), so in each dimension we
parallelize, grid * block >= length.
Note that individual kernels must themselves check whether they go out of
bounds; grid dimensions get rounded up in case ndarray size is not multiple of
block size.
"""
if target_shape is None:
target_shape = self.processed_shape
assert len(full_block) == 3
self.full_block = tuple(full_block)
self.full_grid = tuple(
utils.ceil_div(a_length, block_length)
for (a_length, block_length) in zip(target_shape, full_block)
)
import pathlib
import cupy import cupy
import cupyx
import jinja2
import numpy as np import numpy as np
from . import utils from . import base_gpu, utils
class DsscGpuRunner:
"""Class to handle instantiation and execution of CUDA kernels on trains
All GPU buffers are kept within this class. This generally means that you will
want to load data into it and then do something. Typical usage in correct order:
1. instantiate
2. load_constants
3. load_data
4. load_cell_table
5. correct
6a. reshape (only here does data transfer back to host)
6b. compute_preview (optional)
repeat from 2. or 3.
In case no constants are available / correction is not desired, can skip 3 and 4 class DsscGpuRunner(base_gpu.BaseGpuRunner):
and use only_cast in step 5 instead of correct (taking care to call _kernel_source_filename = "dssc_gpu_kernels.cpp"
compute_preview with parameters set accordingly).
"""
_src_dir = pathlib.Path(__file__).absolute().parent
with (_src_dir / "gpu-dssc-correct.cpp").open("r") as fd:
_kernel_template = jinja2.Template(fd.read())
def __init__( def __init__(
self, self,
...@@ -43,39 +17,24 @@ class DsscGpuRunner: ...@@ -43,39 +17,24 @@ class DsscGpuRunner:
input_data_dtype=np.uint16, input_data_dtype=np.uint16,
output_data_dtype=np.float32, output_data_dtype=np.float32,
): ):
self.pixels_x = pixels_x self.input_shape = (memory_cells, pixels_y, pixels_x)
self.pixels_y = pixels_y self.processed_shape = self.input_shape
self.memory_cells = memory_cells super().__init__(
self.output_transpose = output_transpose pixels_x,
if constant_memory_cells is None: pixels_y,
self.constant_memory_cells = memory_cells memory_cells,
else: output_transpose,
self.constant_memory_cells = constant_memory_cells constant_memory_cells,
self.input_shape = (self.memory_cells, self.pixels_y, self.pixels_x) input_data_dtype,
self.output_shape = utils.shape_after_transpose( output_data_dtype,
self.input_shape, self.output_transpose
) )
self.map_shape = (self.pixels_x, self.pixels_y, self.constant_memory_cells) self.map_shape = (self.pixels_x, self.pixels_y, self.constant_memory_cells)
# preview will only be single memory cell self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32)
self.preview_shape = (self.pixels_x, self.pixels_y)
self.input_data_dtype = input_data_dtype
self.output_data_dtype = output_data_dtype
self._init_kernels() self._init_kernels()
self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32) self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32)
# reuse output arrays
self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=np.uint16)
self.input_data_gpu = cupy.empty(self.input_shape, dtype=input_data_dtype)
self.processed_data_gpu = cupy.empty(self.input_shape, dtype=output_data_dtype)
self.reshaped_data_gpu = cupy.empty(self.output_shape, dtype=output_data_dtype)
self.preview_raw = cupyx.empty_pinned(self.preview_shape, dtype=np.float32)
self.preview_corrected = cupyx.empty_pinned(
self.preview_shape, dtype=np.float32
)
self.output_buffer_next_index = 0
self.update_block_size((1, 1, 64)) self.update_block_size((1, 1, 64))
def load_constants(self, offset_map): def load_constants(self, offset_map):
...@@ -87,29 +46,6 @@ class DsscGpuRunner: ...@@ -87,29 +46,6 @@ class DsscGpuRunner:
self._init_kernels() self._init_kernels()
self.offset_map_gpu.set(offset_map) self.offset_map_gpu.set(offset_map)
def load_data(self, raw_data):
self.input_data_gpu.set(np.squeeze(raw_data))
def load_cell_table(self, cell_table):
self.cell_table_gpu.set(cell_table)
def update_block_size(self, full_block):
"""Execution is scheduled with 3d "blocks" of CUDA threads, tuning can
affect performance
Grid size is automatically computed based on block size. Note that
individual kernels must themselves check whether they go out of bounds;
grid dimensions get rounded up in case ndarray size is not multiple of
block size.
"""
assert len(full_block) == 3
self.full_block = tuple(full_block)
self.full_grid = tuple(
utils.ceil_div(a_length, block_length)
for (a_length, block_length) in zip(self.input_shape, full_block)
)
def correct(self): def correct(self):
"""Apply corrections to data (must load constant, data, and cell_table first) """Apply corrections to data (must load constant, data, and cell_table first)
...@@ -133,90 +69,6 @@ class DsscGpuRunner: ...@@ -133,90 +69,6 @@ class DsscGpuRunner:
), ),
) )
def only_cast(self):
"""Like correct without the correction
This currently means just casting to output dtype.
"""
self.casting_kernel(
self.full_grid,
self.full_block,
(
self.input_data_gpu,
self.processed_data_gpu,
),
)
def reshape(self, out=None):
"""Move axes to desired output order
The out parameter is passed directly to the get function of GPU array: if
None, then a new ndarray (in host memory) is returned. If not None, then data
will be loaded into the provided array, which must match shape / dtype.
"""
# TODO: avoid copy
if self.output_transpose is None:
self.reshaped_data_gpu = cupy.ascontiguousarray(
cupy.squeeze(self.processed_data_gpu)
)
else:
self.reshaped_data_gpu = cupy.ascontiguousarray(
cupy.transpose(
cupy.squeeze(self.processed_data_gpu), self.output_transpose
)
)
return self.reshaped_data_gpu.get(out=out)
def compute_preview(self, preview_index, have_corrected=True, can_correct=True):
"""Generate single slice or reduction preview of raw and corrected data
Special values of preview_index are -1 for max, -2 for mean, -3 for
sum, and -4 for stdev (across cells).
Note that preview_index is taken from data without checking cell table.
Caller has to figure out which index along memory cell dimension they
actually want to preview.
Can reuse data from corrected output buffer with have_corrected parameter.
Note that preview requires relevant data to be loaded (raw data for raw
preview, correction map and cell table in addition for corrected preview).
"""
if preview_index < -4:
raise ValueError(f"No statistic with code {preview_index} defined")
elif preview_index >= self.memory_cells:
raise ValueError(f"Memory cell index {preview_index} out of range")
if (not have_corrected) and can_correct:
self.correct()
# if not have_corrected and not can_correct, assume only_cast already done
# TODO: enum around reduction type
for (image_data, output_buffer) in (
(self.input_data_gpu, self.preview_raw),
(self.processed_data_gpu, self.preview_corrected),
):
if preview_index >= 0:
# TODO: change axis order when moving reshape to after correction
image_data[preview_index].astype(np.float32).transpose().get(
out=output_buffer
)
elif preview_index == -1:
# TODO: select argmax independently for raw and corrected?
# TODO: send frame sums somewhere to compute global max frame
max_index = cupy.argmax(
cupy.sum(image_data, axis=(1, 2), dtype=cupy.float32)
)
image_data[max_index].astype(np.float32).transpose().get(
out=output_buffer
)
elif preview_index in (-2, -3, -4):
stat_fun = {-2: cupy.mean, -3: cupy.sum, -4: cupy.std}[preview_index]
stat_fun(image_data, axis=0, dtype=cupy.float32).transpose().get(
out=output_buffer
)
return self.preview_raw, self.preview_corrected
def _init_kernels(self): def _init_kernels(self):
kernel_source = self._kernel_template.render( kernel_source = self._kernel_template.render(
{ {
......
File moved
import numpy as np
import pytest
from calng import agipd_gpu
input_dtype = np.uint16
output_dtype = np.float16
corr_dtype = np.float32
pixels_x = 512
pixels_y = 128
memory_cells = 352
offset_map = (
np.random.random(size=(pixels_x, pixels_y, memory_cells)).astype(corr_dtype) * 20
)
cell_table = np.arange(memory_cells, dtype=np.uint16)
np.random.shuffle(cell_table)
raw_data = np.random.randint(
low=0, high=2000, size=(memory_cells, 2, pixels_x, pixels_y), dtype=input_dtype
)
thresholds = np.random.random(size=(pixels_y, pixels_x, memory_cells)) * 1000
thresholds = np.stack((thresholds, thresholds*2), axis=3).astype(np.float32)
gm_const_zeros = np.zeros((pixels_x, pixels_y, memory_cells, 3), dtype=np.float32)
gm_const_ones = np.ones((pixels_x, pixels_y, memory_cells, 3), dtype=np.float32)
kernel_runner = agipd_gpu.AgipdGpuRunner(
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells=memory_cells,
input_data_dtype=input_dtype,
output_data_dtype=output_dtype,
)
def thresholding_cpu(data, cell_table, thresholds):
# get to memory_cell, x, y
raw_gain = data[:, 1, ...].astype(np.float32)
# get to threshold, memory_cell, x, y
thresholds = np.transpose(thresholds, (3, 2, 1, 0))[:, cell_table]
res = np.zeros((memory_cells, pixels_x, pixels_y), dtype=np.uint8)
res[raw_gain > thresholds[0]] = 1
res[raw_gain > thresholds[1]] = 2
return res
kernel_runner.load_cell_table(cell_table)
kernel_runner.load_data(raw_data)
kernel_runner.load_thresholds(thresholds)
kernel_runner.correct(agipd_gpu.CorrectionFlags.THRESHOLD)
gpu_res = kernel_runner.gain_map_gpu.get()
print(np.sum(gpu_res, axis=(0,2)))
def test_only_thresholding():
kernel_runner.load_cell_table(cell_table)
kernel_runner.load_data(raw_data)
kernel_runner.load_thresholds(thresholds)
kernel_runner.correct(agipd_gpu.CorrectionFlags.THRESHOLD)
gpu_res = kernel_runner.gain_map_gpu.get()
cpu_res = thresholding_cpu(raw_data, cell_table, thresholds)
assert np.allclose(gpu_res, cpu_res)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment