Skip to content
Snippets Groups Projects
Commit 4178b8de authored by David Hammer's avatar David Hammer
Browse files

Merge branch 'feat/gh2-calcat-friendship' into 'master'

GH2: CalCat support and updates for next run

See merge request karaboDevices/calng!25
parents 127c99a9 1c172dca
No related branches found
No related tags found
1 merge request!25GH2: CalCat support and updates for next run
......@@ -2,6 +2,7 @@ import enum
import numpy as np
from karabo.bound import (
DOUBLE_ELEMENT,
FLOAT_ELEMENT,
KARABO_CLASSINFO,
OUTPUT_CHANNEL,
......@@ -19,10 +20,18 @@ from .._version import version as deviceVersion
_pretend_pulse_table = np.arange(2720, dtype=np.uint8)
class Gotthard2Constants(enum.Enum):
Lut = enum.auto()
Offset = enum.auto()
Gain = enum.auto()
class Constants(enum.Enum):
LUTGotthard2 = enum.auto()
OffsetGotthard2 = enum.auto()
RelativeGainGotthard2 = enum.auto()
BadPixelsDarkGotthard2 = enum.auto()
BadPixelsFFGotthard2 = enum.auto()
bp_constant_types = {
Constants.BadPixelsDarkGotthard2,
Constants.BadPixelsFFGotthard2,
}
class CorrectionFlags(enum.IntFlag):
......@@ -30,6 +39,7 @@ class CorrectionFlags(enum.IntFlag):
LUT = 1
OFFSET = 2
GAIN = 4
BPMASK = 8
class Gotthard2CpuRunner(base_kernel_runner.BaseKernelRunner):
......@@ -42,6 +52,7 @@ class Gotthard2CpuRunner(base_kernel_runner.BaseKernelRunner):
input_data_dtype=np.uint16,
output_data_dtype=np.float32,
bad_pixel_mask_value=np.nan,
bad_pixel_subset=None,
):
super().__init__(
pixels_x,
......@@ -64,16 +75,40 @@ class Gotthard2CpuRunner(base_kernel_runner.BaseKernelRunner):
self.lut = np.empty(self.lut_shape, dtype=np.uint16)
self.offset_map = np.empty(self.map_shape, dtype=np.float32)
self.rel_gain_map = np.empty(self.map_shape, dtype=np.float32)
self.bad_pixel_map = np.empty(self.map_shape, dtype=np.uint32)
self.bad_pixel_mask_value = bad_pixel_mask_value
self.flush_buffers()
self._bad_pixel_subset = bad_pixel_subset
self.input_data = None # will just point to data coming in
self.input_gain_stage = None # will just point to data coming in
self.processed_data = None # will just point to buffer we're given
@property
def input_data_views(self):
def preview_data_views(self):
return (self.input_data, self.processed_data)
def load_constant(self, constant_type, data):
if constant_type is Constants.LUTGotthard2:
self.lut[:] = np.transpose(data.astype(np.uint16, copy=False), (1, 2, 0))
elif constant_type is Constants.OffsetGotthard2:
self.offset_map[:] = np.transpose(data.astype(np.float32, copy=False))
elif constant_type is Constants.RelativeGainGotthard2:
self.rel_gain_map[:] = np.transpose(data.astype(np.float32, copy=False))
elif constant_type in bp_constant_types:
# TODO: add the regular bad pixel subset configuration
data = data.astype(np.uint32, copy=False)
if self._bad_pixel_subset is not None:
data = data & self._bad_pixel_subset
self.bad_pixel_map |= np.transpose(data)
else:
raise ValueError(f"What is this constant '{constant_type}'?")
def set_bad_pixel_subset(self, subset, apply_now=True):
self._bad_pixel_subset = subset
if apply_now:
self.bad_pixel_map &= self._bad_pixel_subset
def load_data(self, image_data, input_gain_stage):
"""Experiment: loading both in one function as they are tied"""
self.input_data = image_data.astype(np.uint16, copy=False)
......@@ -87,6 +122,7 @@ class Gotthard2CpuRunner(base_kernel_runner.BaseKernelRunner):
self.lut[:] = np.stack([np.stack([default_lut] * 2)] * self.pixels_x, axis=2)
self.offset_map.fill(0)
self.rel_gain_map.fill(1)
self.bad_pixel_map.fill(0)
def correct(self, flags, out=None):
if out is None:
......@@ -99,6 +135,8 @@ class Gotthard2CpuRunner(base_kernel_runner.BaseKernelRunner):
self.lut,
self.offset_map,
self.rel_gain_map,
self.bad_pixel_map,
self.bad_pixel_mask_value,
out,
)
self.processed_data = out
......@@ -106,16 +144,25 @@ class Gotthard2CpuRunner(base_kernel_runner.BaseKernelRunner):
class Gotthard2CalcatFriend(base_calcat.BaseCalcatFriend):
_constant_enum_class = Gotthard2Constants
_constant_enum_class = Constants
def __init__(self, device, *args, **kwargs):
super().__init__(device, *args, **kwargs)
self._constants_need_conditions = {} # TODO
self._constants_need_conditions = {c: self.condition for c in Constants}
def condition(self):
res = base_calcat.OperatingConditions()
res["Exposure time"] = self._get_param("exposureTime")
res["Exposure period"] = self._get_param("exposurePeriod")
res["Sensor Bias Voltage"] = self._get_param("biasVoltage")
res["Single photon"] = self._get_param("singlePhoton")
res["Acquisition rate"] = self._get_param("acquisitionRate")
return res
@staticmethod
def add_schema(schema, managed_keys):
super(Gotthard2CalcatFriend, Gotthard2CalcatFriend).add_schema(
schema, managed_keys, "gotthard-Type"
schema, managed_keys, "Gotthard2-Type"
)
# set some defaults for common parameters
......@@ -134,10 +181,45 @@ class Gotthard2CalcatFriend(base_calcat.BaseCalcatFriend):
.key("constantParameters.memoryCells")
.setNewDefaultValue(2)
.commit(),
OVERWRITE_ELEMENT(schema)
.key("constantParameters.biasVoltage")
.setNewDefaultValue(200)
.commit()
)
(
DOUBLE_ELEMENT(schema)
.key("constantParameters.exposureTime")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(schema)
.key("constantParameters.exposurePeriod")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(schema)
.key("constantParameters.singlePhoton")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(schema)
.key("constantParameters.acquisitionRate")
.assignmentOptional()
.defaultValue(1.1)
.reconfigurable()
.commit(),
)
base_calcat.add_status_schema_from_enum(
schema, Gotthard2Constants
schema, Constants
)
......@@ -148,22 +230,30 @@ class Gotthard2Correction(base_correction.BaseCorrection):
(
"lut",
CorrectionFlags.LUT,
{Gotthard2Constants.Lut},
{Constants.LUTGotthard2},
),
(
"offset",
CorrectionFlags.OFFSET,
{Gotthard2Constants.Offset},
{Constants.OffsetGotthard2},
),
(
"gain",
CorrectionFlags.GAIN,
{Gotthard2Constants.Gain},
{Constants.RelativeGainGotthard2},
),
(
"badPixels",
CorrectionFlags.BPMASK,
{
Constants.BadPixelsDarkGotthard2,
Constants.BadPixelsFFGotthard2,
}
),
)
_kernel_runner_class = Gotthard2CpuRunner
_calcat_friend_class = Gotthard2CalcatFriend
_constant_enum_class = Gotthard2Constants
_constant_enum_class = Constants
_managed_keys = base_correction.BaseCorrection._managed_keys.copy()
_image_data_path = "data.adc"
_cell_table_path = "data.memoryCell"
......@@ -231,6 +321,9 @@ class Gotthard2Correction(base_correction.BaseCorrection):
Gotthard2Correction._managed_keys,
Gotthard2Correction._correction_steps,
)
base_correction.add_bad_pixel_config_node(
expected, Gotthard2Correction._managed_keys
)
# mandatory: manager needs this in schema
(
......@@ -257,12 +350,50 @@ class Gotthard2Correction(base_correction.BaseCorrection):
@property
def _kernel_runner_init_args(self):
return {"bad_pixel_mask_value": self.bad_pixel_mask_value}
return {
"bad_pixel_mask_value": self._bad_pixel_mask_value,
"bad_pixel_subset": self._bad_pixel_subset,
}
@property
def bad_pixel_mask_value(self):
def _bad_pixel_mask_value(self):
return np.float32(self.unsafe_get("corrections.badPixels.maskingValue"))
_bad_pixel_subset = property(base_correction.get_bad_pixel_field_selection)
def postReconfigure(self):
super().postReconfigure()
update = self._prereconfigure_update_hash
if update.has("corrections.badPixels.subsetToUse"):
if any(
update.get(
f"corrections.badPixels.subsetToUse.{field.name}", default=False
)
for field in utils.BadPixelValues
):
self.log_status_info(
"Some fields reenabled, reloading cached bad pixel constants"
)
self.kernel_runner.set_bad_pixel_subset(
self._bad_pixel_subset, apply_now=False
)
with self.calcat_friend.cached_constants_lock:
self.kernel_runner.flush_buffers(bp_constant_types)
for constant_type in (
bp_constant_types & self.calcat_friend.cached_constants.keys()
):
self._load_constant_to_runner(
constant_type,
self.calcat_friend.cached_constants[constant_type],
)
else:
# just narrowing the subset - no reload, just AND
self.kernel_runner.set_bad_pixel_subset(
self._bad_pixel_subset, apply_now=True
)
if update.has("corrections.badPixels.maskingvalue"):
self.kernel_runner.bad_pixel_mask_value = self._bad_pixel_mask_value
def __init__(self, config):
super().__init__(config)
try:
......@@ -345,13 +476,4 @@ class Gotthard2Correction(base_correction.BaseCorrection):
)
def _load_constant_to_runner(self, constant, constant_data):
if constant is Gotthard2Constants.Lut:
self.kernel_runner.lut[:] = constant_data.astype(np.uint16, copy=False)
elif constant is Gotthard2Constants.Offset:
self.kernel_runner.offset_map[:] = constant_data.astype(
np.float32, copy=False
)
elif constant is Gotthard2Constants.Gain:
self.kernel_runner.rel_gain_map[:] = constant_data.astype(
np.float32, copy=False
)
self.kernel_runner.load_constant(constant, constant_data)
......@@ -7,6 +7,7 @@ cdef unsigned char NONE = 0
cdef unsigned char LUT = 1
cdef unsigned char OFFSET = 2
cdef unsigned char GAIN = 4
cdef unsigned char BPMASK = 8
def correct(
unsigned short[:, :] raw_data,
......@@ -15,6 +16,8 @@ def correct(
unsigned short[:, :, :] lut,
float[:, :, :] offset_map,
float[:, :, :] gain_map,
unsigned[:, :, :] badpixel_map,
float badpixel_fill_value,
float[:, :] output,
):
cdef unsigned frame, x
......@@ -30,15 +33,18 @@ def correct(
raw_value = raw_data[frame, x]
if (flags & LUT):
res = <float>lut[cell, raw_value, x]
if (flags & BPMASK) and badpixel_map[gain, cell, x] != 0:
res = badpixel_fill_value
else:
res = <float>raw_value
if (flags & LUT):
res = <float>lut[cell, raw_value, x]
else:
res = <float>raw_value
if (flags & OFFSET):
res -= offset_map[gain, cell, x]
if (flags & OFFSET):
res -= offset_map[gain, cell, x]
if (flags & GAIN):
res /= gain_map[gain, cell, x]
if (flags & GAIN):
res /= gain_map[gain, cell, x]
output[frame, x] = res
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment