From b816be129feae79913f276b849c64eaee8e6ec80 Mon Sep 17 00:00:00 2001 From: Egor Sobolev <egor.sobolev@xfel.eu> Date: Wed, 27 Mar 2024 23:02:21 +0100 Subject: [PATCH] Add ShimadzuHVX2 conditions and detector abstraction --- src/cal_tools/shimadzu.py | 52 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100644 src/cal_tools/shimadzu.py diff --git a/src/cal_tools/shimadzu.py b/src/cal_tools/shimadzu.py new file mode 100644 index 000000000..8366c95b5 --- /dev/null +++ b/src/cal_tools/shimadzu.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass +from cal_tools.calcat_interface2 import ConditionsBase + + +@dataclass +class ShimadzuHPVX2Conditions(ConditionsBase): + frame_size: float + + calibration_types = { + "Offset": ["Frame Size"], + "DynamicFF": ["Frame Size"], + } + + +class ShimadzuHPVX2: + channel = "daqOutput" + image_key = "data.image.pixels" + + def __init__(self, source_name_pattern: str, channel=None, image_key=None): + self.source_name_pattern = source_name_pattern + if channel is not None: + self.channel = channel + if image_key is not None: + self.image_key = image_key + self.image_index_group = self.image_key.partition('.')[0] + self.instrument = source_name_pattern.split('_')[0] + + def conditions(self, dc: "DataCollection", module=None): # noqa: F821 + if module is None: + source_pattern = self.source_name_pattern.format( + f"*:{self.channel}") + det_dc = dc.select(source_pattern) + if not det_dc.instrument_sources: + raise ValueError("No detector sources are found") + + source_name = list(det_dc.instrument_sources)[0] + else: + source_name = self.instrument_source(module) + keydata = dc[source_name, self.image_key] + num_frames = keydata.shape[-3] + return ShimadzuHPVX2Conditions(frame_size=num_frames / 256) + + def instrument_source(self, module: int): + source_name = self.source_name_pattern.format(module) + return f"{source_name}:{self.channel}" + + def corrected_source(self, module: int): + source_name = self.source_name_pattern.format(module) + parts = source_name.split('/') + parts[1] = "CORR" + source_name = '/'.join(parts) + return f"{source_name}:output" -- GitLab