Skip to content
Snippets Groups Projects
Commit 85429ccd authored by David Hammer's avatar David Hammer
Browse files

Base correction device, CalCat interaction, DSSC and AGIPD devices

parent 048d7312
No related branches found
No related tags found
1 merge request!3Base correction device, CalCat interaction, DSSC and AGIPD devices
Showing
with 6133 additions and 0 deletions
TrainMatcher, 1.2.0-2.10.2
PipeToZeroMQ, 3.2.6-2.11.0
calngDeps, 0.0.3-2.10.0
calibrationClient, 9.0.6
# calng # calng
calng is a collection of Karabo devices to perform online processing of 2D X-ray detector data at runtime. It is the successor of the calPy package. calng is a collection of Karabo devices to perform online processing of 2D X-ray detector data at runtime. It is the successor of the calPy package.
# CalCat secrets and deployment
Correction devices each run their own `calibration_client.CalibrationClient`, so they need to have credentials for CalCat.
They expect to be able to load these from a JSON file; by default, this will be in `$KARABO/var/data/calibration-client-secrets.json` (`var/data` is CWD of Karabo devices).
The file should look something like:
```json
{
"base_url": "https://in.xfel.eu/test_calibration",
"client_id": "[sort of secret]",
"client_secret": "[actual secret]",
"user_email": "[eh, not that secret]",
"caldb_store_path": "/gpfs/exfel/d/cal/caldb_store"
}
```
For deployment, you'll want `/calibration` instead of `/test_calibration` and the caldb store as seen from ONC will be `/common/cal/caldb_store`.
setup.py 0 → 100644
#!/usr/bin/env python
from os.path import dirname, join, realpath
from setuptools import setup, find_packages
from karabo.packaging.versioning import device_scm_version
ROOT_FOLDER = dirname(realpath(__file__))
scm_version = device_scm_version(
ROOT_FOLDER,
join(ROOT_FOLDER, 'src', 'calng', '_version.py')
)
setup(name='calng',
use_scm_version=scm_version,
author='CAL team',
author_email='da-support@xfel.eu',
description='',
long_description='',
url='',
package_dir={'': 'src'},
packages=find_packages('src'),
entry_points={
'karabo.bound_device': [
'AgipdCorrection = calng.AgipdCorrection:AgipdCorrection',
'DsscCorrection = calng.DsscCorrection:DsscCorrection',
'ModuleStacker = calng.ModuleStacker:ModuleStacker',
'ShmemToZMQ = calng.ShmemToZMQ:ShmemToZMQ',
],
'karabo.middlelayer_device': [
'CalibrationManager = calng.CalibrationManager:CalibrationManager'
],
},
package_data={'': ['kernels/*']},
requires=[],
)
This diff is collapsed.
This diff is collapsed.
import enum
import cupy
import numpy as np
from karabo.bound import (
DOUBLE_ELEMENT,
KARABO_CLASSINFO,
OVERWRITE_ELEMENT,
VECTOR_STRING_ELEMENT,
State,
)
from . import base_gpu, calcat_utils, utils
from ._version import version as deviceVersion
from .base_correction import BaseCorrection, add_correction_step_schema
class CorrectionFlags(enum.IntFlag):
NONE = 0
OFFSET = 1
class DsscConstants(enum.Enum):
Offset = enum.auto()
class DsscGpuRunner(base_gpu.BaseGpuRunner):
_kernel_source_filename = "dssc_gpu.cu"
_corrected_axis_order = "cyx"
def __init__(
self,
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells,
input_data_dtype=np.uint16,
output_data_dtype=np.float32,
):
self.input_shape = (memory_cells, pixels_y, pixels_x)
self.processed_shape = self.input_shape
super().__init__(
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells,
input_data_dtype,
output_data_dtype,
)
self.map_shape = (self.constant_memory_cells, self.pixels_y, self.pixels_x)
self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32)
self._init_kernels()
self.offset_map_gpu = cupy.empty(self.map_shape, dtype=np.float32)
self.update_block_size((1, 1, 64))
def _get_raw_for_preview(self):
return self.input_data_gpu
def _get_corrected_for_preview(self):
return self.processed_data_gpu
def load_offset_map(self, offset_map):
# can have an extra dimension for some reason
if len(offset_map.shape) == 4: # old format (see offsetcorrection_dssc.py)?
offset_map = offset_map[..., 0]
# shape (now): x, y, memory cell
offset_map = np.transpose(offset_map).astype(np.float32)
self.offset_map_gpu.set(offset_map)
def correct(self, flags):
self.correction_kernel(
self.full_grid,
self.full_block,
(
self.input_data_gpu,
self.cell_table_gpu,
np.uint8(flags),
self.offset_map_gpu,
self.processed_data_gpu,
),
)
def _init_kernels(self):
kernel_source = self._kernel_template.render(
{
"pixels_x": self.pixels_x,
"pixels_y": self.pixels_y,
"data_memory_cells": self.memory_cells,
"constant_memory_cells": self.constant_memory_cells,
"input_data_dtype": utils.np_dtype_to_c_type(self.input_data_dtype),
"output_data_dtype": utils.np_dtype_to_c_type(self.output_data_dtype),
"corr_enum": utils.enum_to_c_template(CorrectionFlags),
}
)
self.source_module = cupy.RawModule(code=kernel_source)
self.correction_kernel = self.source_module.get_function("correct")
class DsscCalcatFriend(calcat_utils.BaseCalcatFriend):
_constant_enum_class = DsscConstants
def __init__(self, device, *args, **kwargs):
super().__init__(device, *args, **kwargs)
self._constants_need_conditions = {
DsscConstants.Offset: self.dark_condition,
}
@staticmethod
def add_schema(
schema,
managed_keys,
param_prefix="constantParameters",
status_prefix="foundConstants",
):
super(DsscCalcatFriend, DsscCalcatFriend).add_schema(
schema, managed_keys, "DSSC-Type", param_prefix, status_prefix
)
(
OVERWRITE_ELEMENT(schema)
.key(f"{param_prefix}.memoryCells")
.setNewDefaultValue(400)
.commit(),
OVERWRITE_ELEMENT(schema)
.key(f"{param_prefix}.biasVoltage")
.setNewDefaultValue(100) # TODO: proper
.commit()
)
(
DOUBLE_ELEMENT(schema)
.key(f"{param_prefix}.pulseIdChecksum")
.assignmentOptional()
.defaultValue(2.8866323107820637e-36)
.commit(),
DOUBLE_ELEMENT(schema)
.key(f"{param_prefix}.acquisitionRate")
.assignmentOptional()
.defaultValue(4.5)
.commit(),
DOUBLE_ELEMENT(schema)
.key(f"{param_prefix}.encodedGain")
.assignmentOptional()
.defaultValue(67328)
.commit(),
)
calcat_utils.add_status_schema_from_enum(schema, status_prefix, DsscConstants)
def dark_condition(self):
res = calcat_utils.OperatingConditions()
res["Memory cells"] = self._get_param("memoryCells")
res["Sensor Bias Voltage"] = self._get_param("biasVoltage")
res["Pixels X"] = self._get_param("pixelsX")
res["Pixels Y"] = self._get_param("pixelsY")
# res["Pulse id checksum"] = self._get_param("pulseIdChecksum")
# res["Acquisition rate"] = self._get_param("acquisitionRate")
# res["Encoded gain"] = self._get_param("encodedGain")
return res
@KARABO_CLASSINFO("DsscCorrection", deviceVersion)
class DsscCorrection(BaseCorrection):
# subclass *must* set these attributes
_correction_flag_class = CorrectionFlags
_correction_field_names = (("offset", CorrectionFlags.OFFSET),)
_kernel_runner_class = DsscGpuRunner
_calcat_friend_class = DsscCalcatFriend
_constant_enum_class = DsscConstants
_managed_keys = BaseCorrection._managed_keys.copy()
@staticmethod
def expectedParameters(expected):
(
OVERWRITE_ELEMENT(expected)
.key("dataFormat.memoryCells")
.setNewDefaultValue(400)
.commit(),
OVERWRITE_ELEMENT(expected)
.key("preview.selectionMode")
.setNewDefaultValue("pulse")
.commit(),
)
DsscCalcatFriend.add_schema(expected, DsscCorrection._managed_keys)
add_correction_step_schema(
expected,
DsscCorrection._managed_keys,
DsscCorrection._correction_field_names,
)
(
VECTOR_STRING_ELEMENT(expected)
.key("managedKeys")
.assignmentOptional()
.defaultValue(list(DsscCorrection._managed_keys))
.commit()
)
@property
def input_data_shape(self):
return (
self.get("dataFormat.memoryCells"),
1,
self.get("dataFormat.pixelsY"),
self.get("dataFormat.pixelsX"),
)
def process_data(
self,
data_hash,
metadata,
source,
train_id,
image_data,
cell_table,
do_generate_preview,
):
pulse_table = np.squeeze(data_hash.get("image.pulseId"))
if self._frame_filter is not None:
try:
cell_table = cell_table[self._frame_filter]
pulse_table = pulse_table[self._frame_filter]
image_data = image_data[self._frame_filter]
except IndexError:
self.log_status_warn(
"Failed to apply frame filter, please check that it is valid!"
)
return
try:
self.kernel_runner.load_data(image_data)
except ValueError as e:
self.log_status_warn(f"Failed to load data: {e}")
return
except Exception as e:
self.log_status_warn(f"Unknown exception when loading data to GPU: {e}")
buffer_handle, buffer_array = self._shmem_buffer.next_slot()
self.kernel_runner.load_cell_table(cell_table)
self.kernel_runner.correct(self._correction_flag_enabled)
self.kernel_runner.reshape(
output_order=self.unsafe_get("dataFormat.outputAxisOrder"),
out=buffer_array,
)
if do_generate_preview:
if self._correction_flag_enabled != self._correction_flag_preview:
self.kernel_runner.correct(self._correction_flag_preview)
(
preview_slice_index,
preview_cell,
preview_pulse,
) = utils.pick_frame_index(
self.unsafe_get("preview.selectionMode"),
self.unsafe_get("preview.index"),
cell_table,
pulse_table,
warn_func=self.log_status_warn,
)
preview_raw, preview_corrected = self.kernel_runner.compute_previews(
preview_slice_index,
)
data_hash.set("image.data", buffer_handle)
data_hash.set("image.cellId", cell_table[:, np.newaxis])
data_hash.set("image.pulseId", pulse_table[:, np.newaxis])
data_hash.set("calngShmemPaths", ["image.data"])
self._write_output(data_hash, metadata)
if do_generate_preview:
self._write_combiner_previews(
(
("preview.outputRaw", preview_raw),
("preview.outputCorrected", preview_corrected),
),
train_id,
source,
)
def _load_constant_to_runner(self, constant, constant_data):
assert constant is DsscConstants.Offset
self.kernel_runner.load_offset_map(constant_data)
if not self.get("corrections.offset.available"):
self.set("corrections.offset.available", True)
self._update_correction_flags()
self.log_status_info(f"Done loading {constant.name} to GPU")
import numpy as np
from karabo.bound import (
FLOAT_ELEMENT,
KARABO_CLASSINFO,
NODE_ELEMENT,
STRING_ELEMENT,
ChannelMetaData,
Epochstamp,
Hash,
MetricPrefix,
Schema,
Timestamp,
Trainstamp,
Unit,
)
from TrainMatcher import TrainMatcher
from ._version import version as deviceVersion
@KARABO_CLASSINFO("ModuleStacker", deviceVersion)
class ModuleStacker(TrainMatcher.TrainMatcher):
"""This will be deprecated: now just equivalent to ModuleMatcher except this
stacks `image.data` instead of `data.adc`
"""
def __init__(self, conf):
super().__init__(conf)
self.info.merge(Hash("timeOfFlight", 0))
@staticmethod
def expectedParameters(expected):
(
FLOAT_ELEMENT(expected)
.key("timeOfFlight")
.displayedName("Time of flight")
.description(
"Time elapsed from DAQ sent data until train was matched and ready to "
"send from here. Measured for latest train matched. Maximum over all "
"sources included in said train."
)
.unit(Unit.SECOND)
.metricPrefix(MetricPrefix.MILLI)
.readOnly()
.commit(),
STRING_ELEMENT(expected)
.key("pathToStack")
.displayedName("Data path to stack")
.description(
"Typically, image.data will be used for full data going through the "
"pipeline. Set this when input is dataOutput from a correction device "
"and output goes to a bridge. For previews, data.adc is used as part "
"of the combiner format. Set to data.adc when input is a preview "
"output and output goes to a femDataAssembler."
)
.options("image.data,data.adc")
.assignmentOptional()
.defaultValue("image.data")
.commit(),
)
def initialization(self):
"""Apply configuration and automatically start.
Upon instantiation, the device will automatically start matching data
from the specified data sources.
"""
super().initialization()
# Disable the start and stop slots.
# It's not possible to pop items from the full schema. The slots are
# therefore converted to unused nodes.
desc = (
"Disable slots from the parent class. The acquisition start automatically "
"on instantiation. Check that the `fastSources` table is populated and "
"its booleans set to True the project configuration (not instantiated). "
"These nodes are not used."
)
schema = Schema()
(
NODE_ELEMENT(schema).key("start").description(desc).commit(),
NODE_ELEMENT(schema).key("stop").description(desc).commit(),
)
self.path_to_stack = self.get("pathToStack")
self.updateSchema(schema)
super().start()
def _send(self, tid, sources):
# Add control data
timestamp = Timestamp(Epochstamp(), Trainstamp(tid))
# Reuse arbitrary hash from existing ones (among the ones to stack)
try:
out_hash = next(
data
for (data, _) in iter(sources.values())
if data.has(self.path_to_stack)
)
except StopIteration:
out_hash = Hash()
# TODO: handle missing modules properly (track all sources)
stacked_data = []
stacked_sources = []
stacked_present = []
# TODO: should this be threaded?
time_of_flight = 0
for source, (data, metadata) in sources.items():
if not data.has(self.path_to_stack):
# may add sources not for stacking
# TODO: make stack or no part of source configuration
out_hash[f"unstacked.{source}"] = data
continue
old_ts = metadata.getTimestamp()
elapsed = timestamp.toTimestamp() - old_ts.toTimestamp()
time_of_flight = max(time_of_flight, elapsed)
image_data = data.get(self.path_to_stack)
stacked_data.append(image_data)
stacked_sources.append(source)
stacked_present.append(True)
for source, data in self.ctrlmon.get(tid):
out_hash[f"unstacked.{source}"] = data
if stacked_data:
if not isinstance(stacked_data[0], str):
# strings (like shmem handles) should stay list
stacked_data = np.stack(stacked_data, axis=0)
# TODO: merge with super().update_info (throttled updates)
self.info["timeOfFlight"] = time_of_flight * 1000
out_hash[self.path_to_stack] = stacked_data
out_hash["sources"] = stacked_sources
out_hash["modulesPresent"] = stacked_present
channel = self.signalSlotable.getOutputChannel("output")
channel.write(out_hash, ChannelMetaData(self.getInstanceId(), timestamp))
channel.update()
self.rate_out.update()
from time import time
from karabo.bound import KARABO_CLASSINFO
from PipeToZeroMQ import PipeToZeroMQ, conversion, device_schema
from . import shmem_utils
from ._version import version as deviceVersion
@KARABO_CLASSINFO("ShmemToZMQ", deviceVersion)
class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
def initialization(self):
super().initialization()
self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver()
def onInput(self, input_channel):
actual = self.getActualTimestamp()
input_tic = time()
self.info["inputUpdated"] += 1
self.info["dataRecv"] = input_channel.size()
all_meta = input_channel.getMetaData()
data = {}
meta = {}
for idx in range(input_channel.size()):
# Read metadata
metadata = self._extract_metadata(all_meta, idx)
source = metadata["source"]
if source not in self.sources:
self.appendSchema(device_schema.timestamp_schema(source))
self.sources.add(source)
self._time_info(metadata, actual, self.info)
# Read data
hash_data = input_channel.read(idx)
# filters
if self.allowed_sources and source not in self.allowed_sources:
continue
forward, ignore = self._filter_properties(hash_data, source)
meta[source] = conversion.meta_to_dict(metadata)
meta[source]["ignored_keys"] = ignore
# only this bit differs from PipeToZeroMQ.onInput
dic, arr = conversion.hash_to_dict(
hash_data, paths=forward, version=self.version
)
for shmem_handle_path in dic.pop("calngShmemPaths", []):
shmem_handle = dic.pop(shmem_handle_path, None)
if shmem_handle_path is None:
self.log.INFO(
f"Hash from {source} did not have {shmem_handle_path}"
)
continue
elif shmem_handle_path == "":
self.log.INFO(
f"Hash from {source} had empty {shmem_handle_path}"
)
continue
actual_data = self._shmem_handler.get(shmem_handle)
arr[shmem_handle_path] = actual_data
data[source] = (dic, arr)
# forward data to all connected ZMQ sockets
self._send(data, meta)
output_tic = time()
self.info["onInputTotal"] = 1000 * (output_tic - input_tic)
# update properties on device
self._updateProperties(output_tic)
# block if device is in passive state
self.monitoring.wait()
This diff is collapsed.
import pathlib
import cupy
import jinja2
import numpy as np
from . import utils
class BaseGpuRunner:
"""Class to handle GPU buffers and execution of CUDA kernels on image data
All GPU buffers are kept within this class and it is intentionally very stateful.
This generally means that you will want to load data into it and then do something.
Typical usage in correct order:
1. instantiate
2. load constants
3. load_data
4. load_cell_table
5. correct
6a. reshape (only here does data transfer back to host)
6b. compute_preview (optional)
repeat from 2. or 3.
In case no constants are available / correction is not desired, can skip 3 and 4 and
pass CorrectionFlags.NONE to correct(...). Generally, user must handle which
correction steps are appropriate given the constants loaded so far.
"""
# These must be set by subclass
_kernel_source_filename = None
_corrected_axis_order = None
def __init__(
self,
pixels_x,
pixels_y,
memory_cells,
constant_memory_cells,
input_data_dtype=np.uint16,
output_data_dtype=np.float32,
):
_src_dir = pathlib.Path(__file__).absolute().parent
# subclass must define _kernel_source_filename
with (_src_dir / "kernels" / self._kernel_source_filename).open("r") as fd:
self._kernel_template = jinja2.Template(fd.read())
self.pixels_x = pixels_x
self.pixels_y = pixels_y
self.memory_cells = memory_cells
if constant_memory_cells == 0:
# if not set, guess same as input; may save one recompilation
self.constant_memory_cells = memory_cells
else:
self.constant_memory_cells = constant_memory_cells
# preview will only be single memory cell
self.preview_shape = (self.pixels_x, self.pixels_y)
self.input_data_dtype = input_data_dtype
self.output_data_dtype = output_data_dtype
self._init_kernels()
# reuse buffers for input / output
self.cell_table_gpu = cupy.empty(self.memory_cells, dtype=np.uint16)
self.input_data_gpu = cupy.empty(self.input_shape, dtype=input_data_dtype)
self.processed_data_gpu = cupy.empty(
self.processed_shape, dtype=output_data_dtype
)
self.reshaped_data_gpu = None # currently not reusing buffer
# default preview layers: raw and corrected (subclass can extend)
self.preview_buffer_getters = [
self._get_raw_for_preview,
self._get_corrected_for_preview,
]
# to get data from respective buffers to cell, x, y shape for preview computation
def _get_raw_for_preview(self):
"""Should return view of self.input_data_gpu with shape (cell, x/y, x/y)"""
raise NotImplementedError()
def _get_corrected_for_preview(self):
"""Should return view of self.processed_data_gpu with shape (cell, x/y, x/y)"""
raise NotImplementedError()
def flush_buffers(self):
"""Optional reset GPU buffers (implement in subclasses which need this)"""
pass
def correct(self, flags):
"""Correct (already loaded) image data according to flags
Subclass must define this method. It should assume that image data, cell table,
and other data (including constants) has already been loaded. It should
probably run some GPU kernel and output should go into self.processed_data_gpu.
Keep in mind that user only gets output from compute_preview or reshape
(either of these should come after correct).
The submodules providing subclasses should have some IntFlag enums defining
which flags are available to pass along to the kernel. A zero flag should allow
the kernel to do no actual correction - but still copy the data between buffers
and cast it to desired output type.
"""
raise NotImplementedError()
def reshape(self, output_order, out=None):
"""Move axes to desired output order and copy to host memory
The out parameter is passed directly to the get function of GPU array: if
None, then a new ndarray (in host memory) is returned. If not None, then data
will be loaded into the provided array, which must match shape / dtype.
"""
# TODO: avoid copy
if output_order == self._corrected_axis_order:
self.reshaped_data_gpu = self.processed_data_gpu
else:
self.reshaped_data_gpu = cupy.transpose(
self.processed_data_gpu,
utils.transpose_order(self._corrected_axis_order, output_order),
)
return self.reshaped_data_gpu.get(out=out)
def load_data(self, raw_data):
self.input_data_gpu.set(np.squeeze(raw_data))
def load_cell_table(self, cell_table):
self.cell_table_gpu.set(cell_table)
def compute_previews(self, preview_index):
"""Generate single slice or reduction preview of raw and corrected data
Special values of preview_index are -1 for max, -2 for mean, -3 for sum, and
-4 for stdev (across cells).
Note that preview_index is taken from data without checking cell table.
Caller has to figure out which index along memory cell dimension they
actually want to preview in case it needs to be a specific pulse.
Will reuse data from corrected output buffer. Therefore, correct(...) must have
been called with the appropriate flags before compute_preview(...).
"""
if preview_index < -4:
raise ValueError(f"No statistic with code {preview_index} defined")
elif preview_index >= self.memory_cells:
raise ValueError(f"Memory cell index {preview_index} out of range")
# TODO: enum around reduction type
return tuple(
self._compute_a_preview(image_data=getter(), preview_index=preview_index)
for getter in self.preview_buffer_getters
)
def _compute_a_preview(self, image_data, preview_index):
"""image_data must have cells on first axis; X and Y order is not important
here for now (and can differ between AGIPD and DSSC)"""
if preview_index >= 0:
# TODO: reuse pinned buffers for this
return image_data[preview_index].astype(np.float32).get()
elif preview_index == -1:
# TODO: confirm that max is pixel and not integrated intensity
# separate from next case because dtype not applicable here
return cupy.nanmax(image_data, axis=0).astype(cupy.float32).get()
elif preview_index in (-2, -3, -4):
stat_fun = {
-2: cupy.nanmean,
-3: cupy.nansum,
-4: cupy.nanstd,
}[preview_index]
return stat_fun(image_data, axis=0, dtype=cupy.float32).get()
def update_block_size(self, full_block):
"""Set execution grid such that it covers processed_shape with full_blocks
Execution is scheduled with 3d "blocks" of CUDA threads. Tuning can affect
performance. Correction kernels are "monolithic" for simplicity (i.e. each
logical thread handles one entry in output data), so in each dimension we
parallelize, grid * block >= length to cover all entries.
Note that individual kernels must themselves check whether they go out of
bounds; grid dimensions get rounded up in case ndarray size is not multiple of
block size.
"""
assert len(full_block) == 3
self.full_block = tuple(full_block)
self.full_grid = tuple(
utils.ceil_div(a_length, block_length)
for (a_length, block_length) in zip(self.processed_shape, full_block)
)
This diff is collapsed.
#include <cuda_fp16.h>
{{corr_enum}}
extern "C" {
/*
Perform corrections; see agipd_gpu.CorrectionFlags
Note that THRESHOLD and OFFSET should for any later corrections to make sense
Will take cell_table into account when getting correction values
Will convert from input dtype to float for correction
Will convert to output dtype for output
*/
__global__ void correct(const {{input_data_dtype}}* data,
const unsigned short* cell_table,
const unsigned char corr_flags,
// default_gain can be 0, 1, or 2, and is relevant for fixed gain mode (no THRESHOLD)
const unsigned char default_gain,
const float* threshold_map,
const float* offset_map,
const float* rel_gain_pc_map,
const float* md_additional_offset,
const float* rel_gain_xray_map,
const float g_gain_value,
const unsigned int* bad_pixel_map,
const float bad_pixel_mask_value,
float* gain_map, // TODO: more compact yet plottable representation
{{output_data_dtype}}* output) {
const size_t X = {{pixels_x}};
const size_t Y = {{pixels_y}};
const size_t input_cells = {{data_memory_cells}};
const size_t map_cells = {{constant_memory_cells}};
const size_t cell = blockIdx.x * blockDim.x + threadIdx.x;
const size_t x = blockIdx.y * blockDim.y + threadIdx.y;
const size_t y = blockIdx.z * blockDim.z + threadIdx.z;
if (cell >= input_cells || y >= Y || x >= X) {
return;
}
// data shape: memory cell, data/raw_gain (dim size 2), x, y
const size_t data_stride_y = 1;
const size_t data_stride_x = Y * data_stride_y;
const size_t data_stride_raw_gain = X * data_stride_x;
const size_t data_stride_cell = 2 * data_stride_raw_gain;
const size_t data_index = cell * data_stride_cell +
0 * data_stride_raw_gain +
y * data_stride_y +
x * data_stride_x;
const size_t raw_gain_index = cell * data_stride_cell +
1 * data_stride_raw_gain +
y * data_stride_y +
x * data_stride_x;
float corrected = (float)data[data_index];
const float raw_gain_val = (float)data[raw_gain_index];
const size_t output_stride_y = 1;
const size_t output_stride_x = output_stride_y * Y;
const size_t output_stride_cell = output_stride_x * X;
const size_t output_index = cell * output_stride_cell + x * output_stride_x + y * output_stride_y;
// per-pixel only constant: cell, x, y
const size_t map_stride_y = 1;
const size_t map_stride_x = Y * map_stride_y;
const size_t map_stride_cell = X * map_stride_x;
// threshold constant shape: cell, x, y, threshold (dim size 2)
const size_t threshold_map_stride_threshold = 1;
const size_t threshold_map_stride_y = 2 * threshold_map_stride_threshold;
const size_t threshold_map_stride_x = Y * threshold_map_stride_y;
const size_t threshold_map_stride_cell = X * threshold_map_stride_x;
// gain mapped constant shape: cell, x, y, gain_level (dim size 3)
const size_t gm_map_stride_gain = 1;
const size_t gm_map_stride_y = 3 * gm_map_stride_gain;
const size_t gm_map_stride_x = Y * gm_map_stride_y;
const size_t gm_map_stride_cell = X * gm_map_stride_x;
// note: assuming all maps have same shape (in terms of cells / x / y)
const size_t map_cell = cell_table[cell];
if (map_cell < map_cells) {
unsigned char gain = default_gain;
if (corr_flags & THRESHOLD) {
const float threshold_0 = threshold_map[0 * threshold_map_stride_threshold +
map_cell * threshold_map_stride_cell +
y * threshold_map_stride_y +
x * threshold_map_stride_x];
const float threshold_1 = threshold_map[1 * threshold_map_stride_threshold +
map_cell * threshold_map_stride_cell +
y * threshold_map_stride_y +
x * threshold_map_stride_x];
// could consider making this const using ternaries / tiny function
if (raw_gain_val <= threshold_0) {
gain = 0;
} else if (raw_gain_val <= threshold_1) {
gain = 1;
} else {
gain = 2;
}
}
gain_map[output_index] = (float)gain;
const size_t map_index = map_cell * map_stride_cell +
y * map_stride_y +
x * map_stride_x;
const size_t gm_map_index = gain * gm_map_stride_gain +
map_cell * gm_map_stride_cell +
y * gm_map_stride_y +
x * gm_map_stride_x;
if ((corr_flags & BPMASK) && bad_pixel_map[gm_map_index]) {
corrected = bad_pixel_mask_value;
gain_map[output_index] = bad_pixel_mask_value;
} else {
if (corr_flags & OFFSET) {
corrected -= offset_map[gm_map_index];
// TODO: optionally reassign gain stage for this pixel based on new value
}
// TODO: baseline shift
if (corr_flags & REL_GAIN_PC) {
corrected *= rel_gain_pc_map[gm_map_index];
if (gain == 1) {
corrected += md_additional_offset[map_index];
}
}
if (corr_flags & GAIN_XRAY) {
corrected = (corrected / rel_gain_xray_map[map_index]) * g_gain_value;
}
}
{% if output_data_dtype == "half" %}
output[output_index] = __float2half(corrected);
{% else %}
output[output_index] = ({{output_data_dtype}})corrected;
{% endif %}
} else {
// TODO: decide what to do when we cannot threshold
{% if output_data_dtype == "half" %}
output[data_index] = __float2half(corrected);
{% else %}
output[data_index] = ({{output_data_dtype}})corrected;
{% endif %}
gain_map[data_index] = 255;
}
}
}
#include <cuda_fp16.h>
{{corr_enum}}
extern "C" {
/*
Perform corrections: NONE or OFFSET
Take cell_table into account when getting correction values
Converting to float while correcting
Converting to output dtype at the end
Shape of input data: memory cell, 1, y, x
Shape of offset constant: x, y, memory cell
*/
__global__ void correct(const {{input_data_dtype}}* data, // shape: memory cell, 1, y, x
const unsigned short* cell_table,
const unsigned char corr_flags,
const float* offset_map,
{{output_data_dtype}}* output) {
const size_t X = {{pixels_x}};
const size_t Y = {{pixels_y}};
const size_t memory_cells = {{data_memory_cells}};
const size_t map_memory_cells = {{constant_memory_cells}};
const size_t memory_cell = blockIdx.x * blockDim.x + threadIdx.x;
const size_t y = blockIdx.y * blockDim.y + threadIdx.y;
const size_t x = blockIdx.z * blockDim.z + threadIdx.z;
if (memory_cell >= memory_cells || y >= Y || x >= X) {
return;
}
// note: strides differ from numpy strides because unit here is sizeof(...), not byte
const size_t data_stride_x = 1;
const size_t data_stride_y = X * data_stride_x;
const size_t data_stride_cell = Y * data_stride_y;
const size_t data_index = memory_cell * data_stride_cell + y * data_stride_y + x * data_stride_x;
const float raw = (float)data[data_index];
const size_t map_stride_x = 1;
const size_t map_stride_y = X * map_stride_x;
const size_t map_stride_cell = Y * map_stride_y;
const size_t map_cell = cell_table[memory_cell];
if (map_cell < map_memory_cells) {
const size_t map_index = map_cell * map_stride_cell + y * map_stride_y + x * map_stride_x;
float corrected = raw;
if (corr_flags & OFFSET) {
corrected -= offset_map[map_index];
}
{% if output_data_dtype == "half" %}
output[data_index] = __float2half(corrected);
{% else %}
output[data_index] = ({{output_data_dtype}})corrected;
{% endif %}
} else {
{% if output_data_dtype == "half" %}
output[data_index] = __float2half(raw);
{% else %}
output[data_index] = ({{output_data_dtype}})raw;
{% endif %}
}
}
}
This diff is collapsed.
import numpy as np
import posixshmem
def parse_shmem_handle(handle_string):
buffer_name, dtype, shape, index = handle_string.split("$")
dtype = getattr(np, dtype)
shape = tuple(int(n) for n in shape.split(","))
index = int(index)
return buffer_name, dtype, shape, index
def open_shmem_from_handle(handle_string):
"""Conveniently open readonly SharedMemory with ndarray view from a handle."""
buffer_name, dtype, shape, _ = parse_shmem_handle(handle_string)
buffer_mem = posixshmem.SharedMemory(name=buffer_name, rw=False)
array = buffer_mem.ndarray(
shape=shape,
dtype=dtype,
)
return buffer_mem, array
class ShmemCircularBufferReceiver:
def __init__(self):
self._name_to_mem = {}
self._name_to_ary = {}
def get(self, handle_string):
name, dtype, shape, index = parse_shmem_handle(handle_string)
if name not in self._name_to_mem:
mem = posixshmem.SharedMemory(name=name, rw=False)
self._name_to_mem[name] = mem
ary = mem.ndarray(shape=shape, dtype=dtype)
self._name_to_ary[name] = ary
return ary[index]
ary = self._name_to_ary[name]
if ary.shape != shape or ary.dtype != dtype:
del ary
mem = self._name_to_mem[name]
ary = mem.ndarray(shape=shape, dtype=dtype)
self._name_to_ary[name] = ary
return ary[index]
class ShmemCircularBuffer:
"""Convenience wrapper around posixshmem-backed ndarray buffers
The underlying memory will be opened as an ndarray with shape (buffer_size, ) +
array_shape where buffer_size is memory_budget // dtype * array size. Each call
to next_slot will return the next entry along the first dimension of this array
(both a handle for IPC usage and the ndarray view).
"""
def __init__(self, memory_budget, array_shape, dtype, shmem_name):
# for portable use: name has leading slash and no other slashes
self.shmem_name = "/" + shmem_name.lstrip("/").replace("/", "_")
self._shared_memory = posixshmem.SharedMemory(
name=self.shmem_name,
size=memory_budget,
rw=True,
)
self._buffer_ary = None
self._update_shape(array_shape, dtype)
self._cuda_pinned = False
# important for performance and pinning: touch memory to actually allocate
self._buffer_ary.fill(0)
def _update_shape(self, array_shape, dtype):
array_shape = tuple(array_shape)
array_bytes = np.dtype(dtype).itemsize * np.product(array_shape)
num_slots = self._shared_memory.size // array_bytes
if num_slots == 0:
raise ValueError("Array size exceeds size of allocated memory block")
full_shape = (num_slots,) + array_shape
if self._buffer_ary is not None:
del self._buffer_ary
self._buffer_ary = self._shared_memory.ndarray(
shape=full_shape,
dtype=dtype,
)
shape_str = ",".join(str(n) for n in full_shape)
self.shmem_handle_template = (
f"{self.shmem_name}${np.dtype(dtype)}${shape_str}${{index}}"
)
self._next_slot_index = 0
def change_shape(self, array_shape, dtype=None):
"""Set new array shape to buffer. Note that the existing SharedMemory object is
still used. Old data in there will be mangled and number of slots will depend
upon new array shape and original memory budget.
"""
if dtype is None:
dtype = self._buffer_ary.dtype
self._update_shape(array_shape, dtype)
def cuda_pin(self):
import cupy
self._memory_pointer = self._buffer_ary.ctypes.get_data()
cupy.cuda.runtime.hostRegister(
self._memory_pointer,
self._shared_memory.size,
0
)
def __del__(self):
if self._cuda_pinned:
import cupy
cupy.cuda.runtime.hostUnregister(self._memory_pointer)
del self._buffer_ary
del self._shared_memory
@property
def num_slots(self):
return self._buffer_ary.shape[0]
def next_slot(self):
current_index = self._next_slot_index
self._next_slot_index = (self._next_slot_index + 1) % self.num_slots
shmem_handle = self.shmem_handle_template.format(index=current_index)
data = self._buffer_ary[current_index]
return shmem_handle, data
This diff is collapsed.
from calng import utils
calls = 0
@utils.threadsafe_cache
def will_raise_once(argument):
global calls
calls += 1
if calls == 1:
raise Exception("That's just what I do")
return argument + 1
try:
will_raise_once(0)
except Exception as ex:
print("As expected, firs call raised:", ex)
print("Now calling again:")
print(will_raise_once(0))
This diff is collapsed.
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment