diff --git a/cal_tools/cal_tools/agipdlib.py b/cal_tools/cal_tools/agipdlib.py index 212e5c64eb6231f042baf157281fff6db76e220a..c2c738e9a61429c2097e107ef99bbc019fa29666 100644 --- a/cal_tools/cal_tools/agipdlib.py +++ b/cal_tools/cal_tools/agipdlib.py @@ -5,10 +5,16 @@ from typing import Any, Dict, Optional, Tuple import h5py import numpy as np import sharedmem -from cal_tools.agipdutils import * +from cal_tools.agipdutils import (assemble_constant_dict, + baseline_correct_via_noise, + baseline_correct_via_stripe, + correct_baseline_via_hist, + correct_baseline_via_hist_asic, + make_noisy_adc_mask, match_asic_borders, + melt_snowy_pixels) from cal_tools.enums import BadPixels, SnowResolution from cal_tools.tools import get_constant_from_db_and_time -from iCalibrationDB import Conditions, Constants, Detectors +from iCalibrationDB import Conditions, Constants from cal_tools.cython import agipdalgs as calgs @@ -80,7 +86,8 @@ def get_acq_rate(fast_paths: Tuple[str, str, int], def get_gain_setting(fname: str, h5path_ctrl: str) -> int: - """ + """Retrieve Gain setting. + If the data is available from the middlelayer FPGA_COMP device, then it is retrieved from there. If not, the setting is calculated off `setupr` and `patternTypeIndex` @@ -160,8 +167,8 @@ class AgipdCorrections: image/data section :param h5_index_path: path in HDF5 file which is prefixed to the index section - :param corr_bools: A dict with all of the correction booleans requested or - available + :param corr_bools: A dict with all of the correction booleans requested + or available The following example shows a typical use case: .. code-block:: python @@ -203,7 +210,7 @@ class AgipdCorrections: self.rng_pulses = max_pulses # avoid list(range(*[0]])) self.pulses_lst = list(range(*max_pulses)) \ - if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses #noqa + if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses # noqa self.max_cells = max_cells # Correction parameters @@ -498,11 +505,11 @@ class AgipdCorrections: # force into high or medium gain if requested if self.corr_bools.get('force_mg_if_below'): gain[(gain == 2) & ( - (data - offsetb[1]) < self.mg_hard_threshold)] = 1 + (data - offsetb[1]) < self.mg_hard_threshold)] = 1 if self.corr_bools.get('force_hg_if_below'): gain[(gain > 0) & ( - (data - offsetb[0]) < self.hg_hard_threshold)] = 0 + (data - offsetb[0]) < self.hg_hard_threshold)] = 0 # choose constants according to gain setting off = calgs.gain_choose(gain, offsetb) @@ -512,7 +519,7 @@ class AgipdCorrections: data -= off del off - def baseline_correction(self, i_proc:int, first:int, last:int): + def baseline_correction(self, i_proc: int, first: int, last: int): """ Perform image-wise base-line shift correction for data in shared memory via histogram or stripe @@ -536,14 +543,12 @@ class AgipdCorrections: # output is saved in sharedmem to pass for correct_agipd() # as this function takes about 3 seconds. self.shared_dict[i_proc]['msk'][first:last] = \ - calgs.gain_choose_int(gain, - self.mask[module_idx][:, cellid]) # noqa + calgs.gain_choose_int(gain, self.mask[module_idx][:, cellid]) if hasattr(self, "rel_gain"): # Get the correct rel_gain depending on cell-id self.shared_dict[i_proc]['rel_corr'][first:last] = \ - calgs.gain_choose(gain, - self.rel_gain[module_idx][:, cellid]) # noqa + calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # do this image wise, as the shift is per image for i in range(data.shape[0]): @@ -647,9 +652,9 @@ class AgipdCorrections: # after calculating it while offset correcting. if self.corr_bools.get('melt_snow'): _ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last], # noqa - data, gain, - self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa - self.snow_resolution) + data, gain, + self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa + self.snow_resolution) # Inner ASIC borders are matched to the same signal level if self.corr_bools.get("match_asics"): @@ -712,7 +717,7 @@ class AgipdCorrections: valid_indices = np.concatenate([np.arange(validf[i], validf[i]+validc[i]) for i in range(validf.size)], - axis=0) + axis=0) valid_indices = np.squeeze(valid_indices).astype(np.int32) elif index_v == 1: @@ -753,8 +758,8 @@ class AgipdCorrections: allpulses = data_dict['pulseId'][:n_img] # Initializing can_calibrate array - can_calibrate = self.choose_selected_pulses(allpulses, - can_calibrate=[True]*len(allpulses)) + can_calibrate = self.choose_selected_pulses( + allpulses, can_calibrate=[True]*len(allpulses)) # Only select data corresponding to selected pulses # and overwrite data in shared-memory leaving @@ -779,7 +784,7 @@ class AgipdCorrections: return n_img def validate_selected_pulses(self, allpulses: np.array - ) -> Tuple[int, int, int]: + ) -> Tuple[int, int, int]: """Validate the selected pulses given from the notebook Validate that the given range of pulses to correct @@ -816,7 +821,6 @@ class AgipdCorrections: def choose_selected_pulses(self, allpulses: np.array, can_calibrate: np.array) -> np.array: - """ Choose given selected pulse from pulseId array of raw data. The selected pulses range is validated then @@ -831,7 +835,7 @@ class AgipdCorrections: """ (first_pulse, last_pulse, - pulse_step) = self.validate_selected_pulses(allpulses) + pulse_step) = self.validate_selected_pulses(allpulses) # collect the pulses to be calibrated cal_pulses = allpulses[first_pulse: last_pulse: pulse_step] @@ -853,7 +857,8 @@ class AgipdCorrections: return can_calibrate def gen_valid_range(self, first_index: int, last_index: int, - max_cells: int, allcells: np.array, allpulses: np.array, + max_cells: int, allcells: np.array, + allpulses: np.array, valid_indices: Optional[np.array] = None, apply_sel_pulses: Optional[bool] = True ) -> np.array: @@ -890,8 +895,8 @@ class AgipdCorrections: return if apply_sel_pulses: - can_calibrate = self.choose_selected_pulses(allpulses, - can_calibrate=can_calibrate) + can_calibrate = self.choose_selected_pulses( + allpulses, can_calibrate=can_calibrate) if valid_indices is None: firange = np.arange(first_index, last_index) else: @@ -1075,7 +1080,7 @@ class AgipdCorrections: self.offset[module_idx][...] = cons_data["Offset"].transpose()[...] self.noise[module_idx][...] = cons_data["Noise"].transpose()[...] - self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3,...] # noqa + self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3, ...] # noqa if self.corr_bools.get("low_medium_gap"): t0 = self.thresholds[module_idx][0] @@ -1090,7 +1095,7 @@ class AgipdCorrections: :bpixels.shape[2], # noqa None] - if when["SlopesFF"]: # Checking if constant was retrieved + if when["SlopesFF"]: # Checking if constant was retrieved slopesFF = cons_data["SlopesFF"] # This could be used for backward compatibility @@ -1100,18 +1105,20 @@ class AgipdCorrections: # This is for backward compatability for old FF constants # (128, 512, mem_cells) if slopesFF.shape[-1] == 2: - xray_cor = np.squeeze(slopesFF[...,0]) + xray_cor = np.squeeze(slopesFF[..., 0]) xray_cor_med = np.nanmedian(xray_cor) - xray_cor[np.isnan(xray_cor)]= xray_cor_med - xray_cor[(xray_cor<0.8) | (xray_cor>1.2)] = xray_cor_med + xray_cor[np.isnan(xray_cor)] = xray_cor_med + xray_cor[(xray_cor < 0.8) | ( + xray_cor > 1.2)] = xray_cor_med xray_cor = np.dstack([xray_cor]*self.max_cells) else: # Memory cell resolved xray_cor correction xray_cor = slopesFF # (128, 512, mem_cells) if xray_cor.shape[-1] < self.max_cells: - # In case of having new constant with less memory cells, - # due to lack of enough FF data or during development. - # xray_cor should be expanded by last memory cell. + # When working with new constant with fewer memory + # cells, eg. lacking enough FF data or during + # development, xray_cor must be expand its last memory + # cell to maintain a consistent shape. xray_cor = np.dstack(xray_cor, np.dstack([xray_cor[..., -1]] * (self.max_cells - xray_cor.shape[-1]))) # noqa @@ -1151,11 +1158,11 @@ class AgipdCorrections: pc_med_l = slopesPC[..., :self.max_cells, 4] # calculate median for slopes - pc_high_med = np.nanmedian(pc_high_m, axis=(0,1)) - pc_med_med = np.nanmedian(pc_med_m, axis=(0,1)) + pc_high_med = np.nanmedian(pc_high_m, axis=(0, 1)) + pc_med_med = np.nanmedian(pc_med_m, axis=(0, 1)) # calculate median for intercepts: - pc_high_l_med = np.nanmedian(pc_high_l, axis=(0,1)) - pc_med_l_med = np.nanmedian(pc_med_l, axis=(0,1)) + pc_high_l_med = np.nanmedian(pc_high_l, axis=(0, 1)) + pc_med_l_med = np.nanmedian(pc_med_l, axis=(0, 1)) # sanitize PC data # (it should be done already on the level of constants)