diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index 286ae4dc40bece6e24d8b0ebce7d1cf68f517e02..b06bfda5342c8a353f742b8d6f76585c60135cce 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -204,7 +204,8 @@ class AgipdCorrections: :param max_cells: maximum number of memory cells to handle, e.g. if calibration constants only exist for a subset of cells - :param cell_sel: the CellSelection indicates cells selected for calibration + :param cell_sel: the CellSelection indicates cells selected for + calibration :param h5_data_path: path in HDF5 file which is prefixed to the image/data section :param h5_index_path: path in HDF5 file which is prefixed to the @@ -343,14 +344,16 @@ class AgipdCorrections: n_valid_trains = len(valid_train_ids) data_dict["n_valid_trains"][0] = n_valid_trains data_dict["valid_trains"][:n_valid_trains] = valid_train_ids - + # get cell selection for the images in this file if self.cell_sel is None: data_dict["cm_presel"][0] = False self.cm_presel_train_ids = None else: - cm = self.cell_sel.CM_NONE if apply_sel_pulses else self.cell_sel.CM_PRESEL - img_selected = self.cell_sel.select_trains(valid_train_ids, cm=cm) + cm = (self.cell_sel.CM_NONE if apply_sel_pulses + else self.cell_sel.CM_PRESEL) + img_selected = self.cell_sel.select_trains( + valid_train_ids, cm=cm) data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) group = f[agipd_base]['image'] @@ -873,8 +876,10 @@ class AgipdCorrections: ntrains = data_dict["n_valid_trains"][0] train_ids = data_dict["valid_trains"][:ntrains] - ## Initializing can_calibrate array - can_calibrate = self.cell_sel.select_trains(train_ids, cm=self.cell_sel.CM_FINSEL) + # Initializing can_calibrate array + can_calibrate = self.cell_sel.select_trains( + train_ids, cm=self.cell_sel.CM_FINSEL + ) if np.all(can_calibrate): return n_img @@ -1410,8 +1415,8 @@ class AgipdCorrections: self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2") # noqa # Valid trains self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b") - self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4") - self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8") + self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4") # noqa + self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8") # noqa class CellSelection: @@ -1426,13 +1431,14 @@ class CellSelection: """Returns mask of cells selected for processing :param train_sel: list of a train ids selected for processing - :param cm: flag indicates the final selection or interim selection for cm correction + :param cm: flag indicates the final selection or interim selection + for common-mode correction """ - raise NotImplemented + raise NotImplementedError def mesg(self): """Return log message on initialization""" - raise NotImplemented + raise NotImplementedError @staticmethod def _sel_for_cm(flag, flag_cm, cm): @@ -1452,7 +1458,8 @@ class CellRange(CellSelection): def __init__(self, crange, max_cells): """Initialize range selection - :param crange: range parameters of selected cells, list up to 3 elements + :param crange: range parameters of selected cells, + list up to 3 elements :param max_cells: number of exposed cells """ self.max_cells = max_cells @@ -1461,19 +1468,25 @@ class CellRange(CellSelection): self.flag_cm = np.zeros(self.ncell_max, bool) self.flag[slice(*crange)] = True self.flag_cm[:self.max_cells] = self.flag - self.flag_cm = self.flag_cm.reshape(-1, self.row_size).any(1).repeat(self.row_size)[:self.max_cells] + self.flag_cm = (self.flag_cm.reshape(-1, self.row_size).any(1) + .repeat(self.row_size)[:self.max_cells]) def mesg(self): return ( - f"Use range selection with crange={self.crange}, max_cells={self.max_cells}\n" + f"Use range selection with crange={self.crange}, " + f"max_cells={self.max_cells}\n" f"Frames per train: {self.flag.sum()}" ) + def select_trains(self, train_sel, cm=0): - return np.tile(self._sel_for_cm(self.flag, self.flag_cm, cm), len(train_sel)) + return np.tile(self._sel_for_cm(self.flag, self.flag_cm, cm), + len(train_sel)) class LitFrameSelection(CellSelection): - """Selection of detector memery cells indicated as lit frames by the AgipdLitFrameFinder""" + """Selection of detector memery cells indicated as lit frames + by the AgipdLitFrameFinder + """ def __init__(self, dev, dc, train_ids, crange=None, energy_threshold=None): """Initialize lit frame selection @@ -1481,7 +1494,8 @@ class LitFrameSelection(CellSelection): :param dev: AgipdLitFrameFinder device name :param dc: EXtra-data DataCollection of a run :param train_ids: the list of selected trains - :param crange: range parameters of selected cells, list up to 3 elements + :param crange: range parameters of selected cells, + list up to 3 elements """ # read AgipdLitFrameFinder data self.dev = dev @@ -1491,8 +1505,11 @@ class LitFrameSelection(CellSelection): nfrm = dc[intr_src, 'data.nFrame'].ndarray() litfrm_train_ids = dc[intr_src, 'data.trainId'].ndarray() litfrm = dc[intr_src, 'data.nPulsePerFrame'].ndarray() > 0 - if energy_threshold is not None and 'data.energyPerFrame' in dc.keys_for_source(intr_src): - litfrm &= dc[intr_src, 'data.energyPerFrame'].ndarray() > energy_threshold + if (energy_threshold is not None and + 'data.energyPerFrame' in dc.keys_for_source(intr_src)): + + litfrm &= (dc[intr_src, 'data.energyPerFrame'].ndarray() + > energy_threshold) # apply range selection if crange is None: @@ -1507,12 +1524,15 @@ class LitFrameSelection(CellSelection): ntrain = len(train_ids) else: ntrain_orig = len(train_ids) - train_ids, _, litfrm_train_idx = np.intersect1d(train_ids, litfrm_train_ids, return_indices=True) + train_ids, _, litfrm_train_idx = np.intersect1d( + train_ids, litfrm_train_ids, return_indices=True + ) litfrm = litfrm[litfrm_train_idx] nfrm = nfrm[litfrm_train_idx] ntrain = len(train_ids) if ntrain != ntrain_orig: - print(f"Lit frame data missed for {ntrain_orig - ntrain}. Skip them.") + print(f"Lit frame data missed for {ntrain_orig - ntrain}. " + "Skip them.") # initiate instance attributes nimg = np.sum(nfrm) @@ -1532,14 +1552,15 @@ class LitFrameSelection(CellSelection): trnflg[n:] = False self.first[i] = i0 self.flag[i0:iN] = trnflg[:n] - self.flag_cm[i0:iN] = trnflg.reshape(-1, self.row_size).any(1).repeat(self.row_size)[:n] + self.flag_cm[i0:iN] = (trnflg.reshape(-1, self.row_size).any(1) + .repeat(self.row_size)[:n]) nlit = trnflg[:n].sum() self.mn_sel = min(self.mn_sel, nlit) self.mx_sel = max(self.mx_sel, nlit) def mesg(self): - mn, mx = self.count.min(), self.count.max() - srng = f"{self.mn_sel}" if self.mn_sel == self.mx_sel else f"{self.mn_sel}~{self.mx_sel}" + srng = (f"{self.mn_sel}" if self.mn_sel == self.mx_sel + else f"{self.mn_sel}~{self.mx_sel}") return ( f"Use lit frame selection from {self.dev}, crange={self.crange}\n" f"Frames per train: {srng}" @@ -1549,6 +1570,7 @@ class LitFrameSelection(CellSelection): train_idx = np.flatnonzero(np.in1d(self.train_ids, train_sel)) i0 = self.first[train_idx] iN = i0 + self.count[train_idx] - idx = np.concatenate([np.arange(i0[i], iN[i], dtype=int) for i in range(train_idx.size)]) - + idx = np.concatenate( + [np.arange(i0[i], iN[i], dtype=int) for i in range(train_idx.size)] + ) return self._sel_for_cm(self.flag[idx], self.flag_cm[idx], cm) diff --git a/tests/test_cal_tools.py b/tests/test_cal_tools.py index 026e7512beaf9d0a5a6e455e8cd757b862b0462b..102f613b7b9f3b9bdb032f5ebe5ab218a8d1e098 100644 --- a/tests/test_cal_tools.py +++ b/tests/test_cal_tools.py @@ -7,7 +7,7 @@ import pytest import zmq from iCalibrationDB import Conditions, ConstantMetaData, Constants -from cal_tools.agipdlib import AgipdCorrections +from cal_tools.agipdlib import AgipdCorrections, CellRange from cal_tools.plotting import show_processed_modules from cal_tools.tools import ( get_dir_creation_date, @@ -344,7 +344,7 @@ def test_initialize_from_db(): agipd_corr = AgipdCorrections( max_cells=MEM_CELLS, - max_pulses=[0, 500, 1]) + cell_sel=CellRange([0, 500, 1], MEM_CELLS)) agipd_corr.allocate_constants( modules=[0],