diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index cbb323604ffc01f73d5be95e2a3044562f530ef2..9770751f9a3bc86acfe33f00c52f71cf52d7369b 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -91,6 +91,7 @@ "use_litframe_finder = 'off' # Process only illuminated frames: 'off' - disable, 'device' - use online device data, 'offline' - use offline algorithm, 'auto' - choose online/offline source automatically (default)\n", "litframe_device_id = '' # Device ID for a lit frame finder device, empty string to auto detection\n", "energy_threshold = -1000 # The low limit for the energy (uJ) exposed by frames subject to processing. If -1000, selection by pulse energy is disabled\n", + "use_super_selection = 'cm' # Make a common selection for entire run: 'off' - disable, 'final' - enable for final selection, 'cm' - enable only for common mode correction\n", "\n", "use_xgm_device = '' # DoocsXGM device ID to obtain actual photon energy, operating condition else.\n", "\n", @@ -454,35 +455,17 @@ "source": [ "if use_litframe_finder != 'off':\n", " from extra_redu import make_litframe_finder, LitFrameFinderError\n", - " from extra_redu.litfrm.utils import litfrm_run_report\n", - "\n", + " \n", " if use_litframe_finder not in ['auto', 'offline', 'online']:\n", " raise ValueError(\"Unexpected value in 'use_litframe_finder'.\")\n", "\n", " inst = karabo_id_control[:3]\n", " litfrm = make_litframe_finder(inst, dc, litframe_device_id)\n", " try:\n", - " if use_litframe_finder == 'auto':\n", - " r = litfrm.read_or_process()\n", - " elif use_litframe_finder == 'offline':\n", - " r = litfrm.process()\n", - " elif use_litframe_finder == 'online':\n", - " r = litfrm.read()\n", - "\n", - " report = litfrm_run_report(r)\n", - " print(\"Lit-frame patterns:\")\n", - " print(\" # trains Np Nd Nf lit frames\")\n", - " for rec in report:\n", - " frmintf = ', '.join(\n", - " [':'.join([str(n) for n in slc]) for slc in rec['frames']]\n", - " )\n", - " trsintf = ':'.join([str(n) for n in rec['trains']])\n", - " print(\n", - " (\"{pattern_no:2d} {trsintf:25s} {npulse:4d} \"\n", - " \"{ndataframe:3d} {nframe:3d} [{frmintf}]\"\n", - " ).format(frmintf=frmintf, trsintf=trsintf, **rec)\n", - " )\n", - " cell_sel = LitFrameSelection(r, train_ids, max_pulses, energy_threshold)\n", + " get_data = {'auto': litfrm.read_or_process, 'offline': litfrm.process, 'online': litfrm.read}\n", + " r = get_data[use_litframe_finder]()\n", + " cell_sel = LitFrameSelection(r, train_ids, max_pulses, energy_threshold, use_super_selection)\n", + " cell_sel.print_report()\n", " except LitFrameFinderError as err:\n", " warn(f\"Cannot use AgipdLitFrameFinder due to:\\n{err}\")\n", " cell_sel = CellRange(max_pulses, max_cells=mem_cells)\n", @@ -732,12 +715,13 @@ " # In common mode corrected is enabled.\n", " # Cell selection is only activated after common mode correction.\n", " # Perform cross-file correction parallel over asics\n", + " image_files_idx = [i_proc for i_proc, n_img in enumerate(img_counts) if n_img > 0]\n", " pool.starmap(agipd_corr.cm_correction, itertools.product(\n", - " range(len(file_batch)), range(16) # 16 ASICs per module\n", + " image_files_idx, range(16) # 16 ASICs per module\n", " ))\n", " step_timer.done_step(\"Common-mode correction\")\n", "\n", - " img_counts = pool.map(agipd_corr.apply_selected_pulses, range(len(file_batch)))\n", + " img_counts = pool.map(agipd_corr.apply_selected_pulses, image_files_idx)\n", " step_timer.done_step(\"Applying selected cells after common mode correction\")\n", " \n", " # Perform image-wise correction\"\n", diff --git a/setup.py b/setup.py index e5561837146b2b89f3cf75bcb9d62dcf07840ed4..81573a90432076ddb9254a29c79bf1393b939f2e 100644 --- a/setup.py +++ b/setup.py @@ -109,7 +109,7 @@ install_requires = [ "tabulate==0.8.6", "traitlets==4.3.3", "xarray==2022.3.0", - "EXtra-redu==0.0.5", + "EXtra-redu==0.0.6", ] if "readthedocs.org" not in sys.executable: diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py index 862864080e8b84a169bad003710c11b45b35f491..fe7e1079d9e4d278a6c1156411bead1397a9b0c5 100644 --- a/src/cal_tools/agipdlib.py +++ b/src/cal_tools/agipdlib.py @@ -289,19 +289,34 @@ class CellSelection: CM_PRESEL = 1 CM_FINSEL = 2 + def filter_trains(self, train_sel: np.ndarray): + """Filters out trains that will not be processed + + :param train_sel: list of a train ids selected for processing + :return: array of filtered trains + """ + raise NotImplementedError + def get_cells_on_trains( - self, trains_sel: List[int], cm: int = 0 + self, train_sel: np.ndarray, nfrm: np.ndarray, cm: int = 0 ) -> np.array: """Returns mask of cells selected for processing :param train_sel: list of a train ids selected for processing + :param nfrm: the number of frames expected for every train in + the list `train_sel` :param cm: flag indicates the final selection or interim selection for common-mode correction + + :return: boolean array with flags indicating images for processing """ raise NotImplementedError def msg(self): - """Return log message on initialization""" + """Returns log message on initialization + + :return: message + """ raise NotImplementedError @staticmethod @@ -479,28 +494,30 @@ class AgipdCorrections: valid_train_ids = self.get_valid_image_idx( im_dc[agipd_base, "image.trainId"]) + # filter out trains which will not be selected + valid_train_ids = self.cell_sel.filter_trains( + np.array(valid_train_ids)).tolist() + if not valid_train_ids: # If there's not a single valid train, exit early. print(f"WARNING: No valid trains for {im_dc.files} to process.") data_dict['nImg'][0] = 0 return 0 + # Exclude non_valid trains from the selected data collection. + im_dc = im_dc.select_trains(by_id(valid_train_ids)) + + # Just want to be sure that order is correct + valid_train_ids = im_dc.train_ids + # Get a count of images in each train + nimg_in_trains = im_dc[agipd_base, "image.trainId"].data_counts(False) + nimg_in_trains = nimg_in_trains.astype(int) + # store valid trains in shared memory - # valid_train_ids = train_ids[valid] n_valid_trains = len(valid_train_ids) data_dict["n_valid_trains"][0] = n_valid_trains data_dict["valid_trains"][:n_valid_trains] = valid_train_ids - - # get cell selection for the images in this file - cm = (self.cell_sel.CM_NONE if apply_sel_pulses - else self.cell_sel.CM_PRESEL) - - img_selected = self.cell_sel.get_cells_on_trains( - valid_train_ids, cm=cm) - data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) - - # Exclude non_valid trains from the selected data collection. - im_dc = im_dc.select_trains(by_id(valid_train_ids)) + data_dict["nimg_in_trains"][:n_valid_trains] = nimg_in_trains if "AGIPD500K" in agipd_base: agipd_comp = components.AGIPD500K(im_dc) @@ -510,11 +527,23 @@ class AgipdCorrections: kw = { "unstack_pulses": False, } + + # get selection for the images in this file + cm = (self.cell_sel.CM_NONE if apply_sel_pulses + else self.cell_sel.CM_PRESEL) + + img_selected = self.cell_sel.get_cells_on_trains( + np.array(valid_train_ids), nimg_in_trains, cm=cm) + + frm_ix = np.flatnonzero(img_selected) + data_dict["cm_presel"][0] = (cm == self.cell_sel.CM_PRESEL) + n_img = len(frm_ix) + + # read raw data # [n_modules, n_imgs, 2, x, y] raw_data = agipd_comp.get_array("image.data", **kw)[0] - frm_ix = np.flatnonzero(img_selected) - n_img = frm_ix.size + # store in shmem only selected images data_dict['nImg'][0] = n_img data_dict['data'][:n_img] = raw_data[frm_ix, 0] data_dict['rawgain'][:n_img] = raw_data[frm_ix, 1] @@ -524,6 +553,7 @@ class AgipdCorrections: "image.pulseId", **kw)[0, frm_ix] data_dict['trainId'][:n_img] = agipd_comp.get_array( "image.trainId", **kw)[0, frm_ix] + return n_img def write_file(self, i_proc, file_name, ofile_name): @@ -968,15 +998,16 @@ class AgipdCorrections: data_dict = self.shared_dict[i_proc] n_img = data_dict['nImg'][0] - if not data_dict["cm_presel"][0]: + if not data_dict["cm_presel"][0] or n_img == 0: return n_img ntrains = data_dict["n_valid_trains"][0] train_ids = data_dict["valid_trains"][:ntrains] + nimg_in_trains = data_dict["nimg_in_trains"][:ntrains] # Initializing can_calibrate array can_calibrate = self.cell_sel.get_cells_on_trains( - train_ids, cm=self.cell_sel.CM_FINSEL + train_ids, nimg_in_trains, cm=self.cell_sel.CM_FINSEL ) if np.all(can_calibrate): return n_img @@ -988,7 +1019,8 @@ class AgipdCorrections: # Only select data corresponding to selected pulses # and overwrite data in shared-memory leaving # the required indices to correct - array_names = ["data", "rawgain", "cellId", "pulseId", "trainId", "gain"] + array_names = [ + "data", "rawgain", "cellId", "pulseId", "trainId", "gain"] # if AGIPD in fixed gain mode or melting snow was not requested # `t0_rgain` and `raw_data` will be empty shared_mem arrays @@ -1482,6 +1514,7 @@ class AgipdCorrections: self.shared_dict[i]["cm_presel"] = sharedmem.empty(1, dtype="b") self.shared_dict[i]["n_valid_trains"] = sharedmem.empty(1, dtype="i4") # noqa self.shared_dict[i]["valid_trains"] = sharedmem.empty(1024, dtype="u8") # noqa + self.shared_dict[i]["nimg_in_trains"] = sharedmem.empty(1024, dtype="i8") # noqa if self.corr_bools.get("round_photons"): self.shared_hist_preround = sharedmem.empty(len(self.hist_bins_preround) - 1, dtype="i8") @@ -1579,11 +1612,14 @@ class CellRange(CellSelection): ) def get_cells_on_trains( - self, train_sel: List[int], cm: int = 0 + self, train_sel: np.ndarray, nfrm: np.ndarray, cm: int = 0 ) -> np.array: return np.tile(self._sel_for_cm(self.flag, self.flag_cm, cm), len(train_sel)) + def filter_trains(self, train_sel: np.ndarray): + return train_sel + class LitFrameSelection(CellSelection): """Selection of detector memery cells indicated as lit frames @@ -1593,88 +1629,75 @@ class LitFrameSelection(CellSelection): litfrmdata: 'AgipdLitFrameFinderOffline', train_ids: List[int], crange: Optional[List[int]] = None, - energy_threshold: float = -1000): + energy_threshold: float = -1000, + use_super_selection: str = 'off'): """Initialize lit frame selection :param litfrmdata: AgipdLitFrameFinder output data :param train_ids: the list of selected trains :param crange: range parameters of selected cells, list up to 3 elements + :param energy_threshold: the minimum allowed value for + pulse energy + :param use_super_selection: the stage when super selection + should be applied: `off`, `cm` or `final` """ - # read AgipdLitFrameFinder data + from extra_redu import FrameSelection, SelType self.dev = litfrmdata.meta.litFrmDev self.crange = validate_selected_pulses(crange, self.ncell_max) self.energy_threshold = energy_threshold - - nfrm = litfrmdata.output.nFrame - litfrm_train_ids = litfrmdata.meta.trainId - litfrm = litfrmdata.output.nPulsePerFrame > 0 - if energy_threshold != -1000: - litfrm &= litfrmdata.output.energyPerFrame > energy_threshold - - # apply range selection - if crange is None: - cellsel = np.ones(self.ncell_max, bool) + self.use_super_selection = use_super_selection + + if use_super_selection == 'off': + self.cm_sel_type = SelType.ROW + self.final_sel_type = SelType.CELL + elif use_super_selection == 'cm': + self.cm_sel_type = SelType.SUPER_ROW + self.final_sel_type = SelType.CELL + elif use_super_selection == 'final': + self.cm_sel_type = SelType.SUPER_ROW + self.final_sel_type = SelType.SUPER_CELL else: - cellsel = np.zeros(self.ncell_max, bool) - cellsel[slice(*crange)] = True + raise ValueError("param 'use_super_selection' takes only " + "'off', 'cm' or 'final'") - # update train selection, removing trains without litframe data - if train_ids is None: - train_ids = np.unique(litfrm_train_ids) - ntrain = len(train_ids) - else: - ntrain_orig = len(train_ids) - train_ids, _, litfrm_train_idx = np.intersect1d( - train_ids, litfrm_train_ids, return_indices=True - ) - litfrm = litfrm[litfrm_train_idx] - nfrm = nfrm[litfrm_train_idx] - ntrain = len(train_ids) - if ntrain != ntrain_orig: - print(f"Lit frame data missed for {ntrain_orig - ntrain}. " - "Skip them.") - - # initiate instance attributes - nimg = np.sum(nfrm) - self.train_ids = train_ids - self.count = nfrm - self.first = np.zeros(ntrain, int) - self.flag = np.zeros(nimg, bool) - self.flag_cm = np.zeros(nimg, bool) - self.min_sel = nfrm.max() - self.max_sel = 0 - # compress frame pattern - i0 = 0 - for i in range(ntrain): - n = nfrm[i] - iN = i0 + n - trnflg = litfrm[i] & cellsel - trnflg[n:] = False - self.first[i] = i0 - self.flag[i0:iN] = trnflg[:n] - self.flag_cm[i0:iN] = (trnflg.reshape(-1, self.row_size).any(1) - .repeat(self.row_size)[:n]) - nlit = trnflg[:n].sum() - self.min_sel = min(self.min_sel, nlit) - self.max_sel = max(self.max_sel, nlit) - i0 = iN + self._sel = FrameSelection( + litfrmdata, guess_missed=True, crange=slice(*self.crange), + energy_threshold=energy_threshold, select_litframes=True + ) + + def print_report(self, max_lines=25): + rep = self._sel.report() + nrec = len(rep) + s = slice(max_lines - 1) if nrec > max_lines else slice(None) + print(" # trains " + " Ntrn Nmis Np Nd Nf lit frames") + for rec in rep[s]: + frmintf = ', '.join([':'.join([str(n) for n in slc]) + for slc in rec['litframe_slice']]) + t0, tN, st = (rec['train_range'] + (1,))[:3] + ntrain = max((int(tN) - int(t0)) // int(st), 1) + trsintf = ':'.join([str(n) for n in rec['train_range']]) + print(("{pattern_no:2d} {trsintf:25s} {ntrain:5d} " + "{nmissed_trains:4d} {npulse_exposed:4d} {ndataframe:3d} " + "{nframe_total:3d} [{frmintf}]" + ).format(frmintf=frmintf, ntrain=ntrain, + trsintf=trsintf, **rec)) + if nrec > max_lines: + print(f"... {nrec - max_lines + 1} more lines skipped") def msg(self): - srng = (f"{self.min_sel}" if self.min_sel == self.max_sel - else f"{self.min_sel}~{self.max_sel}") return ( f"Use lit frame selection from {self.dev}, crange={self.crange}\n" - f"Frames per train: {srng}" ) def get_cells_on_trains( - self, train_sel: List[int], cm: int = 0 + self, train_sel: np.ndarray, nfrm: np.ndarray, cm: int = 0 ) -> np.array: - train_idx = np.flatnonzero(np.in1d(self.train_ids, train_sel)) - i0 = self.first[train_idx] - iN = i0 + self.count[train_idx] - idx = np.concatenate( - [np.arange(i0[i], iN[i], dtype=int) for i in range(train_idx.size)] - ) - return self._sel_for_cm(self.flag[idx], self.flag_cm[idx], cm) + + cell_flags, cm_flags = self._sel.litframes_on_trains( + train_sel, nfrm, [self.final_sel_type, self.cm_sel_type]) + return self._sel_for_cm(cell_flags, cm_flags, cm) + + def filter_trains(self, train_sel: np.ndarray): + return self._sel.filter_trains(train_sel, drop_empty=True)