diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index 91eebf23d81f656fecfd91acc92dda1fc1de7475..712301aa90e6631d54fcb471ac8c963c75e1d3ee 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -3,7 +3,7 @@ import traceback import zlib from multiprocessing.pool import ThreadPool from pathlib import Path -from typing import Any, Dict, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple import h5py import numpy as np @@ -247,42 +247,13 @@ class AgipdCorrections: # Data description self.h5_data_path = h5_data_path self.h5_index_path = h5_index_path - # avoid list(range(*[0]])) - - self.rng_pulses = [202 if p > 202 else p for p in max_pulses] - - self.pulses_lst = ( - list(range(*self.rng_pulses)) - if self.rng_pulses != [0] - else self.rng_pulses - ) - if self.rng_pulses != max_pulses: - print( - "WARNING: \"max_pulses\" list has been modified from " - f"{max_pulses} to {self.rng_pulses}. As the number of " - "operating memory cells are less than the selected " - "maximum pulse." - ) - - try: - if len(self.pulses_lst) > 1: - print( - f"A range of {len(self.pulses_lst)} pulse indices " - f"is selected: from {self.pulses_lst[0]} to " - f"{self.pulses_lst[-1]+(self.pulses_lst[1]-self.pulses_lst[0])}" # noqa - f" with a step of {self.pulses_lst[1] - self.pulses_lst[0]}." # noqa - ) - else: - print( - "one pulse is selected: a pulse of idx" - f" {self.pulses_lst[0]}.") - except Exception as e: - raise ValueError(f"\"max_pulses\" input Error: {e}") self.max_cells = max_cells self.gain_mode = gain_mode self.comp_threads = comp_threads + self.pulses_lst = self._validate_selected_pulses(max_pulses, max_cells) + # Correction parameters self.baseline_corr_noise_threshold = -1000 self.snow_resolution = SnowResolution.INTERPOLATE @@ -883,37 +854,74 @@ class AgipdCorrections: return n_img - def validate_selected_pulses(self, allpulses: np.array - ) -> Tuple[int, int, int]: + def _validate_selected_pulses( + self, max_pulses: List[int], + max_cells: int, + ) -> List[int]: """Validate the selected pulses given from the notebook - Validate that the given range of pulses to correct - are within the cells of raw data. + Validate the selected range of pulses to correct raw data + of at least one image. + + 1) A pulseId can't be greater than the operating memory cells. + 2) Validate the order of the given raneg of pulses. + 3) Raise value error if generate list of pulses is empty. + + :param max_pulses: a list of at most 3 elements defining the + range of pulses to calibrate. + :param max_cells: operating memory cells. - :param allpulses: :return : - - first_pulse: first pulse index - - last_pulse: last pulse index - - pulse_step: step range for the selected pulse indices + - pulses_list: a list of pulses to calibrate. """ - # Calculate the pulse step from the chosen max_pulse range - pulse_step = self.rng_pulses[2] if len(self.rng_pulses) == 3 else 1 # Validate selected pulses range: - # 1) Make sure the range max doesn't have non-valid idx. - if self.pulses_lst[-1] + pulse_step > int(allpulses[-1]): - last_pulse = int(allpulses[-1]) + pulse_step - else: - last_pulse = int(self.pulses_lst[-1]) + pulse_step + # 1) A pulseId can't be greater than the operating memory cells. + pulses_range = [max_cells if p > max_cells else p for p in max_pulses] - # 2) Check if 1st pulse index was out of valid range. - # If that is the case only calibrate the last step pulse. - if self.pulses_lst[0] >= last_pulse: - first_pulse = last_pulse - pulse_step - else: - first_pulse = self.pulses_lst[0] + if pulses_range != max_pulses: + print( + "WARNING: \"max_pulses\" list has been modified from " + f"{max_pulses} to {pulses_range}. As the number of " + "operating memory cells are less than the selected " + "maximum pulse." + ) - return first_pulse, last_pulse, pulse_step + # Create list of pulses out of selected range. + try: + pulses_lst = ( + list(range(*pulses_range)) + if pulses_range != [0] + else pulses_range + ) + except TypeError as e: + print( + f"ERROR: Wrong type for \"max_pulses\": {max_pulses}." + f"\n{traceback.format_exc()}" + ) + + try: + if len(pulses_lst) > 1: + print( + f"A range of {len(pulses_lst)} pulse indices " + f"is selected: from {pulses_lst[0]} to " + f"{pulses_lst[-1] +(pulses_lst[1] - pulses_lst[0])}" + f" with a step of {pulses_lst[1] - pulses_lst[0]}." + ) + else: + print( + "One pulse is selected: a pulse of idx" + f" {pulses_lst[0]}." + ) + except ValueError as e: + print( + f"ERROR: A list of pulses: {pulses_lst}, is given to" + "calibrate. Please check the given value for " + f"\"max_pulses\": {max_pulses}.\n" + f"{traceback.format_exc()}." + ) + + return pulses_lst def choose_selected_pulses(self, allpulses: np.array, can_calibrate: np.array) -> np.array: @@ -930,25 +938,26 @@ class AgipdCorrections: selected pulses """ - (first_pulse, last_pulse, - pulse_step) = self.validate_selected_pulses(allpulses) - # collect the pulses to be calibrated - cal_pulses = allpulses[first_pulse: last_pulse: pulse_step] - - # Check if a specific pulses need to be calibrated - # or with only simple pulse ranging. - if pulse_step > 1 or \ - allpulses[first_pulse] >= allpulses[last_pulse]: - can_calibrate = np.logical_and(can_calibrate, - np.isin(allpulses, cal_pulses)) + cal_pulses = allpulses[self.pulses_lst] + + # Check if a non-contiguous pulses list will be calibrated. + if ( + len(self.pulses_lst) > 1 and + self.pulses_lst[1] - self.pulses_lst[0] > 1 + ): + can_calibrate = np.logical_and( + can_calibrate, + np.isin(allpulses, cal_pulses), + ) else: # Check interesection between array of booleans and # array of pulses to calibrate. - can_calibrate = np.logical_and(can_calibrate, - (allpulses <= np.max(cal_pulses)), - (allpulses >= np.min(cal_pulses)) - ) + can_calibrate = np.logical_and( + can_calibrate, + (allpulses <= np.max(cal_pulses)), + (allpulses >= np.min(cal_pulses)), + ) return can_calibrate