diff --git a/cal_tools/cal_tools/agipdlib.py b/cal_tools/cal_tools/agipdlib.py index 4f6200fa0e705c44bbef1bb169dba31b2ca33d5a..6eb9d6f60a7b68f24a76cf7380a626e3481270f5 100644 --- a/cal_tools/cal_tools/agipdlib.py +++ b/cal_tools/cal_tools/agipdlib.py @@ -164,10 +164,15 @@ def get_bias_voltage(fname: str, karabo_id_control: str, class AgipdCorrections: - def __init__(self, max_cells, max_pulses, - h5_data_path="INSTRUMENT/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", - h5_index_path="INDEX/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", - corr_bools: Optional[dict] = None): + def __init__( + self, + max_cells, + max_pulses, + h5_data_path="INSTRUMENT/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", + h5_index_path="INDEX/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", + corr_bools: Optional[dict] = None, + gain_mode: AgipdGainMode = AgipdGainMode.ADAPTIVE_GAIN, + ): """ Initialize an AgipdCorrections Class @@ -222,6 +227,7 @@ class AgipdCorrections: self.pulses_lst = list(range(*max_pulses)) \ if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses # noqa self.max_cells = max_cells + self.gain_mode = gain_mode # Correction parameters self.baseline_corr_noise_threshold = -1000 @@ -490,35 +496,35 @@ class AgipdCorrections: gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] - # first evaluate the gain into 0, 1, 2 --> high, medium, low - t0 = self.thresholds[module_idx][0] - t1 = self.thresholds[module_idx][1] + if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: + # first evaluate the gain into 0, 1, 2 --> high, medium, low + t0 = self.thresholds[module_idx][0] + t1 = self.thresholds[module_idx][1] - # load raw_data and rgain to be used during gain_correction, - # if requested - if self.corr_bools.get('melt_snow'): - self.shared_dict[i_proc]['t0_rgain'][first:last] = \ - rawgain / t0[cellid, ...] - self.shared_dict[i_proc]['raw_data'][first:last] = np.copy(data) - - # Often most pixels are in high-gain, so it's more efficient to - # set the whole output block to zero than select the right pixels. - gain[:] = 0 - # exceeding first threshold means data is medium or low gain - gain[rawgain > t0[cellid, ...]] = 1 - # exceeding also second threshold means data is low gain - gain[rawgain > t1[cellid, ...]] = 2 + # load raw_data and rgain to be used during gain_correction if requested + if self.corr_bools.get("melt_snow"): + self.shared_dict[i_proc]["t0_rgain"][first:last] = rawgain / t0[cellid, ...] + self.shared_dict[i_proc]["raw_data"][first:last] = np.copy(data) + + # Often most pixels are in high-gain, so it's more efficient to + # set the whole output block to zero than select the right pixels. + gain[:] = 0 + # exceeding first threshold means data is medium or low gain + gain[rawgain > t0[cellid, ...]] = 1 + # exceeding also second threshold means data is low gain + gain[rawgain > t1[cellid, ...]] = 2 + else: + # the enum values map 1, 2, 3 to (fixed) gain modes + gain[:] = self.gain_mode - 1 offsetb = self.offset[module_idx][:, cellid] # force into high or medium gain if requested - if self.corr_bools.get('force_mg_if_below'): - gain[(gain == 2) & ( - (data - offsetb[1]) < self.mg_hard_threshold)] = 1 + if self.corr_bools.get("force_mg_if_below"): + gain[(gain == 2) & ((data - offsetb[1]) < self.mg_hard_threshold)] = 1 - if self.corr_bools.get('force_hg_if_below'): - gain[(gain > 0) & ( - (data - offsetb[0]) < self.hg_hard_threshold)] = 0 + if self.corr_bools.get("force_hg_if_below"): + gain[(gain > 0) & ((data - offsetb[0]) < self.hg_hard_threshold)] = 0 # choose constants according to gain setting off = calgs.gain_choose(gain, offsetb) @@ -1088,7 +1094,8 @@ class AgipdCorrections: self.offset[module_idx][...] = cons_data["Offset"].transpose()[...] self.noise[module_idx][...] = cons_data["Noise"].transpose()[...] - self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3, ...] # noqa + if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: + self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3,...] # noqa if self.corr_bools.get("low_medium_gap"): t0 = self.thresholds[module_idx][0] @@ -1232,20 +1239,18 @@ class AgipdCorrections: return - def initialize_from_yaml(self, karabo_da: str, - const_yaml: Dict[str, Any], - module_idx: int - ) -> Dict[str, Any]: + def initialize_from_yaml( + self, karabo_da: str, const_yaml: Dict[str, Any], module_idx: int + ) -> Dict[str, Any]: """Initialize calibration constants from a yaml file - :param karabo-da: karabo data aggerator - :param const_yaml: (Dict) from the "retrieved-constants" part of a yaml - file in pre-notebook, which consists of metadata of either the constant + :param karabo_da: a karabo data aggregator + :param const_yaml: from the "retrieved-constants" part of a yaml + file from pre-notebook, which consists of metadata of either the constant file path or the empty constant shape, and the creation-time of the retrieved constants :param module_idx: Index of module. - :return when: Dictionary of retrieved constants with - their creation-time. + :return when: Dictionary of retrieved constants with their creation-time. """ # string of the device name. @@ -1351,23 +1356,17 @@ class AgipdCorrections: :param constant_shape: Shape of expected constants (gain, cells, x, y) """ for module_idx in modules: - self.offset[module_idx] = sharedmem.empty(constant_shape, - dtype='f4') - self.thresholds[module_idx] = sharedmem.empty(constant_shape, - dtype='f4') - self.noise[module_idx] = sharedmem.empty(constant_shape, - dtype='f4') - - self.md_additional_offset[module_idx] = sharedmem.empty( - constant_shape[1:], dtype='f4') - self.rel_gain[module_idx] = sharedmem.empty(constant_shape, - dtype='f4') - self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], - dtype='f4') + self.offset[module_idx] = sharedmem.empty(constant_shape, dtype="f4") + if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: + self.thresholds[module_idx] = sharedmem.empty(constant_shape, dtype="f4") + self.noise[module_idx] = sharedmem.empty(constant_shape, dtype="f4") + + self.md_additional_offset[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") + self.rel_gain[module_idx] = sharedmem.empty(constant_shape, dtype="f4") + self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], dtype="f4") - self.mask[module_idx] = sharedmem.empty(constant_shape, dtype='i4') - self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], - dtype='f4') + self.mask[module_idx] = sharedmem.empty(constant_shape, dtype="i4") + self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") def allocate_images(self, shape, n_cores_files): """