From 94fa8e44481352c6878563253eecf1574d622150 Mon Sep 17 00:00:00 2001 From: Karim Ahmed <karim.ahmed@xfel.eu> Date: Mon, 8 Feb 2021 17:04:24 +0100 Subject: [PATCH] [AGIPD][CORRECT] Apply pulse index selection/validation after common-mode correction --- cal_tools/cal_tools/agipdlib.py | 219 ++++++++++++++---- .../AGIPD/AGIPD_Correct_and_Verify.ipynb | 43 ++-- 2 files changed, 191 insertions(+), 71 deletions(-) diff --git a/cal_tools/cal_tools/agipdlib.py b/cal_tools/cal_tools/agipdlib.py index e52ed7435..050bf902c 100644 --- a/cal_tools/cal_tools/agipdlib.py +++ b/cal_tools/cal_tools/agipdlib.py @@ -185,7 +185,7 @@ class AgipdCorrections: agipd_corr.allocate_images(data_shape, n_cores_files) i_proc=0 - agipd_corr.read_file(i_proc, file_name) + agipd_corr.read_file(i_proc, file_name, apply_sel_pulses) # Correct first 1000 images agipd_corr.correct_agipd(i_proc, first=0, last=1000) @@ -201,8 +201,6 @@ class AgipdCorrections: # avoid list(range(*[0]])) self.pulses_lst = list(range(*max_pulses)) \ if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses #noqa - self.min_pulse = self.pulses_lst[0] - self.max_pulse = self.pulses_lst[-1] self.max_cells = max_cells # Correction parameters @@ -257,17 +255,17 @@ class AgipdCorrections: self.corr_bools.get('blc_hmatch'), self.corr_bools.get('blc_stripes')] - def read_file(self, i_proc, file_name): - """ - Read file with raw data to shared memory + def read_file(self, i_proc: int, file_name: str, + apply_sel_pulses: Optional[bool] = True + ) -> int: + """Read file with raw data to shared memory - :param file_name: Name of input file including path - :param i_proc: Index of shared memory array + :param file_name: Name of input file including path. + :param i_proc: Index of shared memory array. + :param apply_sel_pulses: apply selected pulses before + all corrections. :return: - - first_index - Index of the first image to be corrected - - last_index - Index of the last image to be corrected - - firange - List of images to be corrected - + - n_img: The number of images to correct. """ module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) agipd_base = self.h5_data_path.format(module_idx) @@ -281,9 +279,14 @@ class AgipdCorrections: (_, first_index, last_index, _, valid_indices) = self.get_valid_image_idx(idx_base, f) + allcells = np.squeeze(group['cellId']) + allpulses = np.squeeze(group['pulseId']) + firange = self.gen_valid_range(first_index, last_index, - self.max_cells, agipd_base, f, - valid_indices) + self.max_cells, allcells, + allpulses, valid_indices, + apply_sel_pulses) + n_img = firange.shape[0] data_dict['nImg'][0] = n_img if np.all(np.diff(firange) == 1): @@ -295,16 +298,12 @@ class AgipdCorrections: # Avoid very slow performance using fancing indexing, # if firange consists of non-contiguous indices. raw_data = group['data'][:][firange] - data_dict['data'][:n_img] = raw_data[:, 0] data_dict['rawgain'][:n_img] = raw_data[:, 1] - # All below arrays are one dimensional and they were not - # affected much by the non-contiguous fancy indexing. - data_dict['cellId'][:n_img] = np.squeeze(group['cellId'][firange]) - data_dict['pulseId'][:n_img] = np.squeeze( - group['pulseId'][firange]) + data_dict['cellId'][:n_img] = allcells[firange] + data_dict['pulseId'][:n_img] = allpulses[firange] data_dict['trainId'][:n_img] = np.squeeze( - group['trainId'][firange]) + group['trainId'][:][firange]) except Exception as e: print(f'Error during reading data from file {file_name}: {e}') print(f'Error traceback: {traceback.format_exc()}') @@ -374,6 +373,16 @@ class AgipdCorrections: """ Perform common-mode correction of data in shared memory + In a given code a complete file is loaded to the memory. + Asics common mode correction is calculated based on single image. + Cell common mode is calculated across trains and groups of 32 cells. + Both corrections are iterative and requires 4 iterations. + + Correction is performed in chunks of (e.g. 512 images). + A complete array of data from one file + (256 trains, 352 cells) will take + 256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory + :param i_proc: Index of shared memory array to process :param asic: Asic number to process """ @@ -395,6 +404,8 @@ class AgipdCorrections: # Loop over iterations for _ in range(n_itr): # Loop over rows of cells + # TODO: check what occurs in case of 64 memory cells + # as it will have less than 11 iterations first = 0 for cell_row in range(11): last = first + cell_ids[(cell_ids // 32) == cell_row].size @@ -408,6 +419,7 @@ class AgipdCorrections: dark_max) cell_cm = cell_cm_sum / cell_cm_count + # TODO: check files with less 256 trains cell_cm[cell_cm_count < fraction * 32 * 256] = 0 asic_data[...] -= cell_cm[None, None, :, :] @@ -665,7 +677,8 @@ class AgipdCorrections: # Copy the data across into the existing shared-memory array mask[...] = msk[...] - def get_valid_image_idx(self, idx_base, infile, index_v=2): + def get_valid_image_idx(self, idx_base: str, infile: str, + index_v: Optional[int] = 2): """ Return the indices of valid data """ if index_v == 2: @@ -675,16 +688,24 @@ class AgipdCorrections: raise IOError("File has no valid counts") valid = count != 0 idxtrains = np.squeeze(infile["/INDEX/trainId"]) + + # Train indices are of type=f32 + # Validate that train indices values fall + # between medianTrain +- 1e4 medianTrain = np.nanmedian(idxtrains) lowok = (idxtrains > medianTrain - 1e4) highok = (idxtrains < medianTrain + 1e4) valid &= lowok & highok + # Last index = last valid train + max. number of memory cells last_index = int(first[valid][-1] + count[valid][-1]) first_index = int(first[valid][0]) - # do actual validity filtering: validc, validf = count[valid], first[valid] + + # Creating an array of validated indices. + # If all indices were validated this array will be the same, + # as what is stored at /DET/image/trainId valid_indices = np.concatenate([np.arange(validf[i], validf[i]+validc[i]) for i in range(validf.size)], @@ -714,19 +735,59 @@ class AgipdCorrections: return (valid, first_index, last_index, idxtrains, valid_indices) - def gen_valid_range(self, first_index, last_index, max_cells, agipd_base, - infile, valid_indices=None): - """ Generate an index range to pass to `correctAGIPD`. + def apply_selected_pulses(self, i_proc: int) -> int: + """Select sharedmem data indices to correct based on selected + pulses indices. + + + :param i_proc: the index of sharedmem for a given file/module + :return n_img: number of images to correct """ + + data_dict = self.shared_dict[i_proc] + n_img = data_dict['nImg'][0] - allcells = np.squeeze(infile[agipd_base + "image/cellId"]) - allpulses = np.squeeze(infile[agipd_base + "image/pulseId"]) - if valid_indices is not None: - allcells = allcells[valid_indices] - allpulses = allpulses[valid_indices] - else: - allcells = allcells[first_index:last_index] - allpulses = allpulses[first_index:last_index] + allpulses = data_dict['pulseId'][:n_img] + + # Initializing can_calibrate array + can_calibrate = self.choose_selected_pulses(allpulses, + can_calibrate=[True]*len(allpulses)) + + # Only select data corresponding to selected pulses + # and overwrite data in shared-memory leaving + # the required indices to correct + data = data_dict['data'][:n_img][can_calibrate] + rawgain = data_dict['rawgain'][:n_img][can_calibrate] + cellId = data_dict['cellId'][:n_img][can_calibrate] + pulseId = data_dict['pulseId'][:n_img][can_calibrate] + trainId = data_dict['trainId'][:n_img][can_calibrate] + + # Overwrite selected number of images based on + # selected pulses to correct + n_img = np.count_nonzero(can_calibrate) + + data_dict['nImg'][0] = n_img + data_dict['data'][: n_img] = data + data_dict['rawgain'][:n_img] = rawgain + data_dict['cellId'][:n_img] = cellId + data_dict['pulseId'][:n_img] = pulseId + data_dict['trainId'][:n_img] = trainId + + return n_img + + def validate_selected_pulses(self, allpulses: np.array + ) -> Tuple[int, int, int]: + """Validate the selected pulses given from the notebook + + Validate that the given range of pulses to correct + are within the cells of raw data. + + :param allpulses: + :return : + - first_pulse: first pulse index + - last_pulse: last pulse index + - pulse_step: step range for the selected pulse indices + """ # Calculate the pulse step from the chosen max_pulse range if len(self.rng_pulses) == 3: @@ -734,38 +795,95 @@ class AgipdCorrections: else: pulse_step = 1 - # Make sure the range max doesn't have non-valid idx. - if self.pulses_lst[-1] + pulse_step > last_index-1: - last_pulse = last_index-1 + # Validate selected pulses range: + # 1) Make sure the range max doesn't have non-valid idx. + if self.pulses_lst[-1] + pulse_step > int(allpulses[-1]): + last_pulse = int(allpulses[-1]) + pulse_step else: - last_pulse = self.pulses_lst[-1] + pulse_step - # Check if 1st pulse idx was out of valid range. + last_pulse = int(self.pulses_lst[-1]) + pulse_step + + # 2) Check if 1st pulse index was out of valid range. # If that is the case only calibrate the last step pulse. if self.pulses_lst[0] >= last_pulse: first_pulse = last_pulse - pulse_step else: first_pulse = self.pulses_lst[0] - # collect the pulses to be calibrated - cal_pulses = allpulses[first_pulse:last_pulse:pulse_step] + return first_pulse, last_pulse, pulse_step - # These parameters required also for histogram pulse range. - self.min_pulse = np.min(cal_pulses) - self.max_pulse = np.max(cal_pulses) + def choose_selected_pulses(self, allpulses: np.array, + can_calibrate: np.array) -> np.array: - can_calibrate = (allcells < max_cells) + """ + Choose given selected pulse from pulseId array of + raw data. The selected pulses range is validated then + used to add a booleans in can_calibrate and guide the + later appliance. + + + :param allpulses: array of pulseIds from raw data + :param can_calibrate: array of booleans for indices to calibrate + :return can_calibrate: array of booleans to calibrate depending on + selected pulses + """ + + (first_pulse, last_pulse, + pulse_step) = self.validate_selected_pulses(allpulses) + + # collect the pulses to be calibrated + cal_pulses = allpulses[first_pulse: last_pulse: pulse_step] # Check if a specific pulses need to be calibrated # or with only simple pulse ranging. if pulse_step > 1 or \ allpulses[first_pulse] >= allpulses[last_pulse]: - can_calibrate &= np.isin(allpulses, cal_pulses) + can_calibrate = can_calibrate and np.isin(allpulses, cal_pulses) else: - can_calibrate &= (allpulses <= self.max_pulse) - can_calibrate &= (allpulses >= self.min_pulse) + can_calibrate = can_calibrate and (allpulses <= np.min(cal_pulses)) # noqa + can_calibrate = can_calibrate and (allpulses >= np.max(cal_pulses)) # noqa + + return can_calibrate + + def gen_valid_range(self, first_index: int, last_index: int, + max_cells: int, allcells: np.array, allpulses: np.array, + valid_indices: Optional[np.array] = None, + apply_sel_pulses: Optional[bool] = True + ) -> np.array: + """ Validate the arrays of image.cellId and image.pulseId + to check presence of data and to avoid empty trains. + + selected pulses range given from the AGIPD correction notebook + is taken into account if apply_sel_pulses is True + + :param first_index: first index of image data + :param last_index: last index of image data + :param max_cells: number of memory cells to correct + :param allcells: array of image.cellsIds of raw data + :param allpulses: array of image.pulseIds of raw data + :param valid_indices: validated indices of image.data + :param apply_sel_pulses: A flag for applying selected pulses + after validation for correction + :return firange: An array of validated image.data + indices to correct + # TODO: Ignore rows (32 pulse) of empty pulses even if + common-mode is selected + """ - if np.count_nonzero(can_calibrate) == 0: + if valid_indices is not None: + allcells = allcells[valid_indices] + allpulses = allpulses[valid_indices] + else: + allcells = allcells[first_index:last_index] + allpulses = allpulses[first_index:last_index] + + can_calibrate = (allcells < max_cells) + + if not np.any(can_calibrate): return + + if apply_sel_pulses: + can_calibrate = self.choose_selected_pulses(allpulses, + can_calibrate=can_calibrate) if valid_indices is None: firange = np.arange(first_index, last_index) else: @@ -779,7 +897,6 @@ class AgipdCorrections: """ Copy and sanitize data in `infile` that is not touched by `correctAGIPD` """ - # these are touched in the correct function, do not copy them here dont_copy = ["data", "cellId", "trainId", "pulseId", "status", "length"] @@ -1257,4 +1374,4 @@ class AgipdCorrections: self.shared_dict[i]['rel_corr'] = sharedmem.empty(shape, dtype='f4') self.shared_dict[i]['t0_rgain'] = sharedmem.empty(shape, - dtype='u2') + dtype='u2') \ No newline at end of file diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index ef66f4a21..ec243d179 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -517,21 +517,21 @@ " for file_name in file_batch:\n", " print(\" \", file_name)\n", " step_timer.start()\n", - " \n", - " img_counts = pool.starmap(agipd_corr.read_file, enumerate(file_batch))\n", - " step_timer.done_step('Loading data from files')\n", - " \n", - " # Evaluate zero-data-std mask\n", - " pool.starmap(agipd_corr.mask_zero_std, itertools.product(\n", - " range(len(file_batch)), np.array_split(np.arange(agipd_corr.max_cells), n_cores_correct)\n", - " ))\n", - " step_timer.done_step('Mask 0 std')\n", + " img_counts = pool.starmap(agipd_corr.read_file, zip(range(len(file_batch)), file_batch,\n", + " [not common_mode]*len(file_batch)))\n", + " step_timer.done_step(f'Loading data from files')\n", + "\n", + " if mask_zero_std:\n", + " # Evaluate zero-data-std mask\n", + " pool.starmap(agipd_corr.mask_zero_std, itertools.product(\n", + " range(len(file_batch)), np.array_split(np.arange(agipd_corr.max_cells), n_cores_correct)\n", + " ))\n", + " step_timer.done_step('Mask 0 std')\n", "\n", " # Perform offset image-wise correction\n", " pool.starmap(agipd_corr.offset_correction, imagewise_chunks(img_counts))\n", " step_timer.done_step(\"Offset correction\")\n", - " \n", - " \n", + "\n", " if blc_noise or blc_stripes or blc_hmatch:\n", " # Perform image-wise correction\n", " pool.starmap(agipd_corr.baseline_correction, imagewise_chunks(img_counts))\n", @@ -543,11 +543,14 @@ " range(len(file_batch)), range(16) # 16 ASICs per module\n", " ))\n", " step_timer.done_step(\"Common-mode correction\")\n", - " \n", + "\n", + " img_counts = pool.map(agipd_corr.apply_selected_pulses, range(len(file_batch)))\n", + " step_timer.done_step(\"Applying selected pulses after common mode correction\")\n", + "\n", " # Perform image-wise correction\n", " pool.starmap(agipd_corr.gain_correction, imagewise_chunks(img_counts))\n", - " step_timer.done_step(\"Image-wise correction\")\n", - " \n", + " step_timer.done_step(\"Gain corrections\")\n", + "\n", " # Save corrected data\n", " pool.starmap(agipd_corr.write_file, [\n", " (i_proc, file_name, os.path.join(out_folder, os.path.basename(file_name).replace(\"RAW\", \"CORR\")))\n", @@ -718,12 +721,12 @@ "include = '*S00000*' if sequences is None else f'*S{sequences[0]:05d}*'\n", "tid, corrected = get_trains_data(out_folder, 'image.data', include, karabo_id, modules=nmods)\n", "\n", - "_, gains = get_trains_data(out_folder, 'image.gain', include, tid, karabo_id, modules=nmods)\n", - "_, mask = get_trains_data(out_folder, 'image.mask', include, tid, karabo_id, modules=nmods)\n", - "_, blshift = get_trains_data(out_folder, 'image.blShift', include, tid, karabo_id, modules=nmods)\n", - "_, cellId = get_trains_data(out_folder, 'image.cellId', include, tid, karabo_id, modules=nmods)\n", - "_, pulseId = get_trains_data(out_folder, 'image.pulseId', include, tid, karabo_id, modules=nmods, fillvalue=0)\n", - "_, raw = get_trains_data(f'{in_folder}/r{run:04d}/', 'image.data', include, tid, karabo_id, modules=nmods)" + "_, gains = get_trains_data(out_folder, 'image.gain', include, karabo_id, tid, modules=nmods)\n", + "_, mask = get_trains_data(out_folder, 'image.mask', include, karabo_id, tid, modules=nmods)\n", + "_, blshift = get_trains_data(out_folder, 'image.blShift', include, karabo_id, tid, modules=nmods)\n", + "_, cellId = get_trains_data(out_folder, 'image.cellId', include, karabo_id, tid, modules=nmods)\n", + "_, pulseId = get_trains_data(out_folder, 'image.pulseId', include, karabo_id, tid, modules=nmods, fillvalue=0)\n", + "_, raw = get_trains_data(f'{in_folder}/r{run:04d}/', 'image.data', include, karabo_id, tid, modules=nmods)" ] }, { -- GitLab