Skip to content
Snippets Groups Projects

[AGIPD] Split AGIPDCtrl function for clear code and adding tests later.

Merged Karim Ahmed requested to merge refactor/split_AGIPDCtrl_functions into master
+ 93
56
import os
import posixpath
import zlib
from datetime import datetime
from multiprocessing.pool import ThreadPool
from typing import Any, Dict, List, Optional, Tuple
@@ -26,6 +27,7 @@ from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution
from cal_tools.h5_copy_except import h5_copy_except_paths
from cal_tools.tools import get_constant_from_db_and_time
class AgipdCtrl:
def __init__(
self,
@@ -34,22 +36,26 @@ class AgipdCtrl:
ctrl_src: str,
raise_error: bool = True,
):
"""
Initialize AgipdCondition class to read all required AGIPD parameters.
""" Initialize AgipdCondition class to read
all required AGIPD parameters.
:param run_dc: Run data collection with expected sources
to read needed parameters.
:param image_src: H5 source for image data.
:param ctrl_src: H5 source for control (slow) data.
:param raise_error: Boolean to raise errors for missing
sources and keys.
"""
self.run_dc = run_dc
self.image_src = image_src
self.ctrl_src = ctrl_src
self.raise_error = raise_error
def get_num_cells(self) -> Optional[int]:
"""
:return mem_cells: Number of memory cells.
return None, if no data available.
"""Read number of memory cells from fast data.
:return mem_cells: Number of memory cells
return None, if no data available.
"""
cells = np.squeeze(
self.run_dc[
@@ -62,38 +68,26 @@ class AgipdCtrl:
dists = [abs(o - maxcell) for o in options]
return options[np.argmin(dists)]
def get_acq_rate(self) -> Optional[float]:
"""Get the acquisition rate from said detector module.
If the data is available from the middlelayer FPGA_COMP device,
then it is retrieved from there.
If not, the rate is calculated from two different pulses time.
The first entry is deliberately not used, as the detector just began
operating, and it might have skipped a train.
:return acq_rate: the acquisition rate.
return None, if not available.
"""
def _get_acq_rate_ctrl(self) -> Optional[float]:
"""Get acquisition (repetition) rate from CONTROL source."""
# Attempt to look for acquisition rate in slow data
rep_rate_src = (
self.ctrl_src, "bunchStructure.repetitionRate.value")
if (
rep_rate_src[0] in self.run_dc.all_sources and
rep_rate_src[1] in self.run_dc.keys_for_source(rep_rate_src[0])
):
# The acquisition rate value is stored in a 1D array of type
# float.
# It is desired to loose precision here because the usage is
# about bucketing the rate for managing meta-data.
return round(float(self.run_dc[rep_rate_src].as_single_value()), 1)
def _get_acq_rate_instr(self) -> Optional[float]:
"""Get acquisition (repetition rate) from INSTRUMENT source."""
train_pulses = np.squeeze(
self.run_dc[
self.image_src, "image.pulseId"
].drop_empty_trains().train_from_index(0)[1]
self.image_src,
"image.pulseId"].drop_empty_trains().train_from_index(0)[1]
)
# Compute acquisition rate from fast data
@@ -101,15 +95,35 @@ class AgipdCtrl:
options = {8: 0.5, 4: 1.1, 2: 2.2, 1: 4.5}
return options.get(diff, None)
def get_gain_setting(
self,
creation_time: "datetime.datetime",
) -> Optional[int]:
"""Retrieve Gain setting.
def get_acq_rate(self) -> Optional[float]:
"""Read the acquisition rate for the selected detector module.
The value is read from CONTROL source e.g.`CONTROL/../MDL/FPGA_COMP`,
If key is not available, the rate is calculated from
two consecutive pulses of the same trainId.
:return acq_rate: the acquisition rate.
return None, if not available.
"""
acq_rate = self._get_acq_rate_ctrl()
if acq_rate is not None:
return acq_rate
# For AGIPD500K this function would produce wrong value (always 4.5)
# before W10/2022.
# TODO: Confirm leaving this as it is.
return self._get_acq_rate_instr()
If the data is available from the middlelayer FPGA_COMP device,
then it is retrieved from there.
If not, the setting is calculated off `setupr` and `patternTypeIndex`
def _get_gain_setting_ctrl(self) -> Optional[int]:
"""Read gain_settings from CONTROL source and gain.value key."""
return int(self.run_dc[self.ctrl_src, "gain.value"].as_single_value())
def _get_gain_setting_ctrl_old(self) -> Optional[int]:
"""Read gain_settings from setupr and patterTypeIndex
for old RAW data.
If `gain.value` isn't available in MDL source,
the setting is calculated from `setupr` and `patternTypeIndex`
gain-setting 1: setupr@dark=8, setupr@slopespc=40
gain-setting 0: setupr@dark=0, setupr@slopespc=32
@@ -118,18 +132,9 @@ class AgipdCtrl:
patternTypeIndex 2: Medium-gain
patternTypeIndex 3: Low-gain
patternTypeIndex 4: SlopesPC
:return: gain setting.
return 0, if not available.
Returns:
int: gain_setting value.
"""
# TODO: remove after fixing get_possible_conditions
if creation_time and creation_time.replace(tzinfo=None) < parser.parse('2020-01-31'):
print("Set gain-setting to None for runs taken before 2020-01-31")
return
if "gain.value" in self.run_dc.keys_for_source(self.ctrl_src):
return self.run_dc[self.ctrl_src, "gain.value"].as_single_value()
setupr = self.run_dc[self.ctrl_src, "setupr.value"].as_single_value()
pattern_type_idx = self.run_dc[
self.ctrl_src, "patternTypeIndex.value"].as_single_value()
@@ -141,29 +146,58 @@ class AgipdCtrl:
setupr == 40 and pattern_type_idx == 4):
return 1
else:
# TODO: Confirm that this can be removed.
if self.raise_error:
raise ValueError(
"Could not derive gain setting from"
" setupr and patternTypeIndex"
)
return
def get_gain_setting(
self,
creation_time: Optional[datetime] = None,
) -> Optional[int]:
"""Read Gain setting from CONTROL sources.
if key `gain.value` is not available, calculate gain_setting from
setupr and patterTypeIndex. If it failed raise ValueError.
:param creation_time: datetime object for the data creation time.
:return: gain setting.
return 0, if not available.
"""
# TODO: remove after fixing get_possible_conditions
if (
creation_time and
creation_time.replace(tzinfo=None) < parser.parse('2020-01-31')
):
print("Set gain-setting to None for runs taken before 2020-01-31")
return
if "gain.value" in self.run_dc.keys_for_source(self.ctrl_src):
return self._get_gain_setting_ctrl()
gain_setting = self._get_gain_setting_ctrl_old()
if gain_setting is not None:
return gain_setting
else:
# TODO: confirm that this can be removed.
print(
"WARNING: gain_setting is not available "
"ERROR: gain_setting is not available "
f"at source {self.ctrl_src}.\nSet gain_setting to 0.")
# TODO: why return 0 and not None?
return 0
def get_gain_mode(self) -> AgipdGainMode:
"""Returns the gain mode (adaptive or fixed) from slow data"""
def get_gain_mode(self) -> int:
"""Returns the gain mode (adaptive or fixed) from slow data."""
if (
self.ctrl_src in self.run_dc.all_sources and
"gainModeIndex.value" in self.run_dc.keys_for_source(
self.ctrl_src)
):
return AgipdGainMode(int(
self.run_dc.get_run_value(
self.ctrl_src, "gainModeIndex.value")))
return AgipdGainMode(int(self.run_dc[
self.ctrl_src, "gainModeIndex.value"].as_single_value()))
return AgipdGainMode.ADAPTIVE_GAIN
@@ -193,8 +227,11 @@ class AgipdCtrl:
voltage_src[0] in self.run_dc.all_sources and
voltage_src[1] in self.run_dc.keys_for_source(voltage_src[0])
):
return self.run_dc[voltage_src].as_single_value(atol=1, reduce_by='max')
# Use RUN source for reading the bias voltage value.
# As HED_DET_AGIPD500K2G has a hardware issue that leads
# to storing arbitrary voltage values in the CONTROL source
# array. e.g. /gpfs/exfel/exp/HED/202230/p900248/raw
return int(self.run_dc.get_run_value(*voltage_src))
else:
print(
"WARNING: Unable to read bias_voltage from"
@@ -216,8 +253,8 @@ class AgipdCtrl:
'integrationTime.value' in self.run_dc.keys_for_source(
self.ctrl_src)
):
return int(self.run_dc.get_run_value(
self.ctrl_src, 'integrationTime.value'))
return int(self.run_dc[
self.ctrl_src, 'integrationTime.value'].as_single_value())
return 12
@@ -1257,7 +1294,7 @@ class AgipdCorrections:
def initialize_from_db(self, karabo_id: str, karabo_da: str,
cal_db_interface: str,
creation_time: 'datetime.datetime',
creation_time: datetime,
memory_cells: float, bias_voltage: int,
photon_energy: float, gain_setting: float,
acquisition_rate: float, integration_time: int,
Loading