From 572b96de9d77493f411926670a6945c444dd0a0e Mon Sep 17 00:00:00 2001 From: Egor Sobolev <egor.sobolev@xfel.eu> Date: Tue, 16 Nov 2021 09:36:24 +0100 Subject: [PATCH] Add selection of lit frames using AgipdLitFrameFinder device --- .../AGIPD/AGIPD_Correct_and_Verify.ipynb | 49 ++- src/cal_tools/agipdlib.py | 293 +++++++++++------- 2 files changed, 220 insertions(+), 122 deletions(-) diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index 668f0c3cd..26ce0bcb1 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -17,22 +17,22 @@ "metadata": {}, "outputs": [], "source": [ - "in_folder = \"/gpfs/exfel/exp/HED/202031/p900174/raw\" # the folder to read data from, required\n", - "out_folder = \"/gpfs/exfel/data/scratch/ahmedk/test/hibef_agipd2\" # the folder to output to, required\n", + "in_folder = \"/gpfs/exfel/exp/SPB/202131/p900230/raw\" # the folder to read data from, required\n", + "out_folder = \"/gpfs/exfel/data/scratch/esobolev/pycal_litfrm/p900230\" # the folder to output to, required\n", "sequences = [-1] # sequences to correct, set to -1 for all, range allowed\n", "modules = [-1] # modules to correct, set to -1 for all, range allowed\n", "train_ids = [-1] # train IDs to correct, set to -1 for all, range allowed\n", - "run = 155 # runs to process, required\n", + "run = 275 # runs to process, required\n", "\n", - "karabo_id = \"HED_DET_AGIPD500K2G\" # karabo karabo_id\n", + "karabo_id = \"SPB_DET_AGIPD1M-1\" # karabo karabo_id\n", "karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators\n", "receiver_id = \"{}CH0\" # inset for receiver devices\n", "path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data\n", "h5path = 'INSTRUMENT/{}/DET/{}:xtdf/' # path in the HDF5 file to images\n", "h5path_idx = 'INDEX/{}/DET/{}:xtdf/' # path in the HDF5 file to images\n", "h5path_ctrl = '/CONTROL/{}/MDL/FPGA_COMP' # path to control information\n", - "karabo_id_control = \"HED_EXP_AGIPD500K2G\" # karabo-id for control device\n", - "karabo_da_control = 'AGIPD500K2G00' # karabo DA for control infromation\n", + "karabo_id_control = \"SPB_IRU_AGIPD1M1\" # karabo-id for control device\n", + "karabo_da_control = 'AGIPD1MCTRL00' # karabo DA for control infromation\n", "\n", "slopes_ff_from_files = \"\" # Path to locally stored SlopesFF and BadPixelsFF constants\n", "\n", @@ -44,6 +44,9 @@ "use_ppu_device = '' # Device ID for a pulse picker device to only process picked trains, empty string to disable\n", "ppu_train_offset = 0 # When using the pulse picker, offset between the PPU's sequence start and actually picked train\n", "\n", + "use_litframe_device = '' # Device ID for a lit frame finder device to only process illuminated frames, empty string to disable\n", + "energy_threshold = None # Energy threshold is the low limit for the energy exposed by frames subject to processing\n", + "\n", "max_cells = 0 # number of memory cells used, set to 0 to automatically infer\n", "bias_voltage = 300 # Bias voltage\n", "acq_rate = 0. # the detector acquisition rate, use 0 to try to auto-determine\n", @@ -146,6 +149,8 @@ "from cal_tools import agipdalgs as calgs\n", "from cal_tools.agipdlib import (\n", " AgipdCorrections,\n", + " CellRange,\n", + " LitFrameSelection,\n", " get_acq_rate,\n", " get_gain_mode,\n", " get_integration_time,\n", @@ -383,6 +388,32 @@ "print(f\"Maximum memory cells to calibrate: {max_cells}\")" ] }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if use_litframe_device:\n", + " # check run for the AgipdLitFrameFinder device\n", + " try: dc\n", + " except NameError: dc = RunDirectory(in_folder / f'r{run:04d}')\n", + "\n", + " if use_litframe_device + ':output' in dc.instrument_sources:\n", + " # Use selection provided by the AgipdLitFrameFinder (if the device is recorded)\n", + " cell_sel = LitFrameSelection(use_litframe_device, dc, train_ids, max_pulses, energy_threshold)\n", + " train_ids = cell_sel.train_ids\n", + " else:\n", + " # Use range selection (if the device is not recorded)\n", + " print(\"WARNING: LitFrameFinder device is not found.\")\n", + " cell_sel = CellRange(max_pulses, max_cells=max_cells)\n", + "else:\n", + " # Use range selection\n", + " cell_sel = CellRange(max_pulses, max_cells=max_cells)\n", + "\n", + "print(cell_sel.mesg())" + ] + }, { "cell_type": "code", "execution_count": null, @@ -464,7 +495,7 @@ "source": [ "agipd_corr = AgipdCorrections(\n", " max_cells,\n", - " max_pulses,\n", + " cell_sel,\n", " h5_data_path=h5path,\n", " h5_index_path=h5path_idx,\n", " corr_bools=corr_bools,\n", @@ -1216,9 +1247,9 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.6.7" + "version": "3.8.11" } }, "nbformat": 4, "nbformat_minor": 2 -} \ No newline at end of file +} diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index feb279622..286ae4dc4 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -191,7 +191,7 @@ class AgipdCorrections: def __init__( self, max_cells, - max_pulses, + cell_sel, h5_data_path="INSTRUMENT/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", h5_index_path="INDEX/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", corr_bools: Optional[dict] = None, @@ -204,8 +204,7 @@ class AgipdCorrections: :param max_cells: maximum number of memory cells to handle, e.g. if calibration constants only exist for a subset of cells - :param max_pulses: a range list of pulse indices used for - calibration and histogram. [start, end, step] + :param cell_sel: the CellSelection indicates cells selected for calibration :param h5_data_path: path in HDF5 file which is prefixed to the image/data section :param h5_index_path: path in HDF5 file which is prefixed to the @@ -218,7 +217,8 @@ class AgipdCorrections: The following example shows a typical use case: .. code-block:: python - agipd_corr = AgipdCorrections(max_cells, max_pulses, + cell_sel = CellRange(max_pulses, max_cells) + agipd_corr = AgipdCorrections(max_cells, cell_sel, h5_data_path=h5path, h5_index_path=h5path_idx, corr_bools=corr_bools) @@ -256,8 +256,7 @@ class AgipdCorrections: self.comp_threads = comp_threads self.train_ids = np.array(train_ids) if train_ids is not None else None - self.start, self.last, self.step = self._validate_selected_pulses( - max_pulses, max_cells) + self.cell_sel = cell_sel # Correction parameters self.baseline_corr_noise_threshold = -1000 @@ -331,14 +330,29 @@ class AgipdCorrections: try: f = h5py.File(file_name, "r") - (_, first_index, last_index, - _, valid_indices) = self.get_valid_image_idx(idx_base, f) + (valid, first_index, last_index, + train_ids, valid_indices) = self.get_valid_image_idx(idx_base, f) if len(valid_indices) == 0: # If there's not a single valid index, exit early. data_dict['nImg'][0] = 0 return 0 + # store valid trains in shared memory + valid_train_ids = train_ids[valid] + n_valid_trains = len(valid_train_ids) + data_dict["n_valid_trains"][0] = n_valid_trains + data_dict["valid_trains"][:n_valid_trains] = valid_train_ids + + # get cell selection for the images in this file + if self.cell_sel is None: + data_dict["cm_presel"][0] = False + self.cm_presel_train_ids = None + else: + cm = self.cell_sel.CM_NONE if apply_sel_pulses else self.cell_sel.CM_PRESEL + img_selected = self.cell_sel.select_trains(valid_train_ids, cm=cm) + data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) + group = f[agipd_base]['image'] allcells = np.squeeze(group['cellId']) allpulses = np.squeeze(group['pulseId']) @@ -346,7 +360,7 @@ class AgipdCorrections: firange = self.gen_valid_range(first_index, last_index, self.max_cells, allcells, allpulses, valid_indices, - apply_sel_pulses) + img_selected) if firange is None: # gen_valid_range() returns None if there are no cells @@ -847,20 +861,22 @@ class AgipdCorrections: """Select sharedmem data indices to correct based on selected pulses indices. - :param i_proc: the index of sharedmem for a given file/module :return n_img: number of images to correct """ - data_dict = self.shared_dict[i_proc] n_img = data_dict['nImg'][0] - allpulses = data_dict['pulseId'][:n_img] + if not data_dict["cm_presel"][0]: + return n_img + + ntrains = data_dict["n_valid_trains"][0] + train_ids = data_dict["valid_trains"][:ntrains] - # Initializing can_calibrate array - can_calibrate = self.choose_selected_pulses( - allpulses, - can_calibrate=np.ones(shape=(len(allpulses),), dtype=np.bool)) + ## Initializing can_calibrate array + can_calibrate = self.cell_sel.select_trains(train_ids, cm=self.cell_sel.CM_FINSEL) + if np.all(can_calibrate): + return n_img # Only select data corresponding to selected pulses # and overwrite data in shared-memory leaving @@ -884,99 +900,11 @@ class AgipdCorrections: return n_img - def _validate_selected_pulses( - self, max_pulses: List[int], - max_cells: int, - ) -> List[int]: - """Validate the selected pulses given from the notebook - - Validate the selected range of pulses to correct raw data - of at least one image. - - 1) A pulse indices within one train can't be greater - than the operating memory cells. - 2) Validate the order of the given raneg of pulses. - 3) Raise value error if generate list of pulses is empty. - - :param max_pulses: a list of at most 3 elements defining the - range of pulses to calibrate. - :param max_cells: operating memory cells. - - :return adjusted_range: An adjusted range of pulse indices to correct. - """ - - # Validate selected pulses range: - # 1) A pulseId can't be greater than the operating memory cells. - pulses_range = [max_cells if p > max_cells else p for p in max_pulses] - - if pulses_range != max_pulses: - print( - "WARNING: \"max_pulses\" list has been modified from " - f"{max_pulses} to {pulses_range}. As the number of " - "operating memory cells are less than the selected " - "maximum pulse." - ) - - if len(pulses_range) == 1: - adjusted_range = (0, pulses_range[0], 1) - elif len(pulses_range) == 2: - adjusted_range = (pulses_range[0], pulses_range[1], 1) - elif len(pulses_range) == 3: - adjusted_range = tuple(pulses_range) - else: - raise ValueError( - "ERROR: Wrong length for the list of pulses indices range. " - "Please check the given range for \"max_pulses\":" - f"{max_pulses}. \"max_pulses\" needs to be a list of " - "3 elements, [start, last, step]") - - if adjusted_range[0] > adjusted_range[1]: - raise ValueError( - "ERROR: Pulse range start is greater than range end. " - "Please check the given range for \"max_pulses\":" - f"{max_pulses}. \"max_pulses\" needs to be a list of " - "3 elements, [start, last, step]") - - if not np.all([isinstance(p, int) for p in max_pulses]): - raise TypeError( - "ERROR: \"max_pulses\" elements needs to be integers:" - f" {max_pulses}.") - - print( - "A range of pulse indices is selected to correct:" - f" {pulses_range}") - - return adjusted_range - - def choose_selected_pulses(self, allpulses: np.array, - can_calibrate: np.array) -> np.array: - """ - Choose given selected pulse from pulseId array of - raw data. The selected pulses range is validated then - used to add a booleans in can_calibrate and guide the - later appliance. - - - :param allpulses: array of pulseIds from raw data - :param can_calibrate: array of booleans for indices to calibrate - :return can_calibrate: array of booleans to calibrate depending on - selected pulses - """ - - # Check interesection between array of booleans and - # array of pulses to calibrate. - can_calibrate &= ( - (allpulses >= allpulses[self.start]) & - (allpulses <= allpulses[self.last-1]) & - (((allpulses - allpulses[self.start]) % allpulses[self.step]) == 0) # noqa - ) - return can_calibrate - def gen_valid_range(self, first_index: int, last_index: int, max_cells: int, allcells: np.array, allpulses: np.array, valid_indices: Optional[np.array] = None, - apply_sel_pulses: Optional[bool] = True + img_selected: Optional[np.array] = None, ) -> np.array: """ Validate the arrays of image.cellId and image.pulseId to check presence of data and to avoid empty trains. @@ -990,12 +918,8 @@ class AgipdCorrections: :param allcells: array of image.cellsIds of raw data :param allpulses: array of image.pulseIds of raw data :param valid_indices: validated indices of image.data - :param apply_sel_pulses: A flag for applying selected pulses - after validation for correction :return firange: An array of validated image.data indices to correct - # TODO: Ignore rows (32 pulse) of empty pulses even if - common-mode is selected """ if valid_indices is not None: @@ -1010,9 +934,9 @@ class AgipdCorrections: if not np.any(can_calibrate): return - if apply_sel_pulses: - can_calibrate = self.choose_selected_pulses( - allpulses, can_calibrate=can_calibrate) + if img_selected is not None: + can_calibrate &= img_selected + if valid_indices is None: firange = np.arange(first_index, last_index) else: @@ -1466,7 +1390,6 @@ class AgipdCorrections: :param shape: Shape of expected data (nImg, x, y) :param n_cores_files: Number of files, handled in parallel """ - self.shared_dict = [] for i in range(n_cores_files): self.shared_dict.append({}) @@ -1485,3 +1408,147 @@ class AgipdCorrections: self.shared_dict[i]["raw_data"] = sharedmem.empty(shape, dtype="f4") # noqa self.shared_dict[i]["rel_corr"] = sharedmem.empty(shape, dtype="f4") # noqa self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2") # noqa + # Valid trains + self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b") + self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4") + self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8") + + +class CellSelection: + """Selection of detector memory cells (abstract class)""" + row_size = 32 + ncell_max = 352 + CM_NONE = 0 + CM_PRESEL = 1 + CM_FINSEL = 2 + + def select_trains(self, trains_sel, cm=0): + """Returns mask of cells selected for processing + + :param train_sel: list of a train ids selected for processing + :param cm: flag indicates the final selection or interim selection for cm correction + """ + raise NotImplemented + + def mesg(self): + """Return log message on initialization""" + raise NotImplemented + + @staticmethod + def _sel_for_cm(flag, flag_cm, cm): + if cm == CellSelection.CM_NONE: + return flag + elif cm == CellSelection.CM_PRESEL: + return flag_cm + elif cm == CellSelection.CM_FINSEL: + return flag[flag_cm] + else: + raise ValueError("param 'cm' takes only 0,1,2") + + +class CellRange(CellSelection): + """Selection of detector memory cells given as a range""" + + def __init__(self, crange, max_cells): + """Initialize range selection + + :param crange: range parameters of selected cells, list up to 3 elements + :param max_cells: number of exposed cells + """ + self.max_cells = max_cells + self.crange = crange + self.flag = np.zeros(self.max_cells, bool) + self.flag_cm = np.zeros(self.ncell_max, bool) + self.flag[slice(*crange)] = True + self.flag_cm[:self.max_cells] = self.flag + self.flag_cm = self.flag_cm.reshape(-1, self.row_size).any(1).repeat(self.row_size)[:self.max_cells] + + def mesg(self): + return ( + f"Use range selection with crange={self.crange}, max_cells={self.max_cells}\n" + f"Frames per train: {self.flag.sum()}" + ) + def select_trains(self, train_sel, cm=0): + return np.tile(self._sel_for_cm(self.flag, self.flag_cm, cm), len(train_sel)) + + +class LitFrameSelection(CellSelection): + """Selection of detector memery cells indicated as lit frames by the AgipdLitFrameFinder""" + + def __init__(self, dev, dc, train_ids, crange=None, energy_threshold=None): + """Initialize lit frame selection + + :param dev: AgipdLitFrameFinder device name + :param dc: EXtra-data DataCollection of a run + :param train_ids: the list of selected trains + :param crange: range parameters of selected cells, list up to 3 elements + """ + # read AgipdLitFrameFinder data + self.dev = dev + self.crange = crange + self.ethr = energy_threshold + intr_src = dev + ':output' + nfrm = dc[intr_src, 'data.nFrame'].ndarray() + litfrm_train_ids = dc[intr_src, 'data.trainId'].ndarray() + litfrm = dc[intr_src, 'data.nPulsePerFrame'].ndarray() > 0 + if energy_threshold is not None and 'data.energyPerFrame' in dc.keys_for_source(intr_src): + litfrm &= dc[intr_src, 'data.energyPerFrame'].ndarray() > energy_threshold + + # apply range selection + if crange is None: + cellsel = np.ones(self.ncell_max, bool) + else: + cellsel = np.zeros(self.ncell_max, bool) + cellsel[slice(*crange)] = True + + # update train selection, removing trains without litframe data + if train_ids is None: + train_ids = np.unique(litfrm_train_ids) + ntrain = len(train_ids) + else: + ntrain_orig = len(train_ids) + train_ids, _, litfrm_train_idx = np.intersect1d(train_ids, litfrm_train_ids, return_indices=True) + litfrm = litfrm[litfrm_train_idx] + nfrm = nfrm[litfrm_train_idx] + ntrain = len(train_ids) + if ntrain != ntrain_orig: + print(f"Lit frame data missed for {ntrain_orig - ntrain}. Skip them.") + + # initiate instance attributes + nimg = np.sum(nfrm) + self.train_ids = train_ids + self.count = nfrm + self.first = np.zeros(ntrain, int) + self.flag = np.zeros(nimg, bool) + self.flag_cm = np.zeros(nimg, bool) + self.mn_sel = nfrm.max() + self.mx_sel = 0 + # compress frame pattern + i0 = 0 + for i in range(ntrain): + n = nfrm[i] + iN = i0 + n + trnflg = litfrm[i] & cellsel + trnflg[n:] = False + self.first[i] = i0 + self.flag[i0:iN] = trnflg[:n] + self.flag_cm[i0:iN] = trnflg.reshape(-1, self.row_size).any(1).repeat(self.row_size)[:n] + nlit = trnflg[:n].sum() + self.mn_sel = min(self.mn_sel, nlit) + self.mx_sel = max(self.mx_sel, nlit) + + def mesg(self): + mn, mx = self.count.min(), self.count.max() + srng = f"{self.mn_sel}" if self.mn_sel == self.mx_sel else f"{self.mn_sel}~{self.mx_sel}" + return ( + f"Use lit frame selection from {self.dev}, crange={self.crange}\n" + f"Frames per train: {srng}" + ) + + def select_trains(self, train_sel, cm=0): + train_idx = np.flatnonzero(np.in1d(self.train_ids, train_sel)) + i0 = self.first[train_idx] + iN = i0 + self.count[train_idx] + idx = np.concatenate([np.arange(i0[i], iN[i], dtype=int) for i in range(train_idx.size)]) + + return self._sel_for_cm(self.flag[idx], self.flag_cm[idx], cm) -- GitLab