import traceback import zlib from multiprocessing.pool import ThreadPool from pathlib import Path from typing import Any, Dict, Optional, Tuple import h5py import numpy as np import sharedmem from cal_tools.agipdutils import ( assemble_constant_dict, baseline_correct_via_noise, baseline_correct_via_stripe, correct_baseline_via_hist, correct_baseline_via_hist_asic, make_noisy_adc_mask, match_asic_borders, melt_snowy_pixels, ) from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution from cal_tools.tools import get_constant_from_db_and_time from iCalibrationDB import Conditions, Constants from cal_tools import agipdalgs as calgs def get_num_cells(fname, loc, module): with h5py.File(fname, "r") as f: cells = f[f"INSTRUMENT/{loc}/DET/{module}CH0:xtdf/image/cellId"][()] if cells.shape[0] == 0: return None maxcell = np.max(cells) options = [4, 32, 64, 76, 128, 176, 202, 250, 352] dists = [abs(o - maxcell) for o in options] return options[np.argmin(dists)] def get_acq_rate(fast_paths: Tuple[str, str, int], slow_paths: Optional[Tuple[str, str]] = ('', '') ) -> Optional[float]: """Get the acquisition rate from said detector module. If the data is available from the middlelayer FPGA_COMP device, then it is retrieved from there. If not, the rate is calculated from two different pulses time. The first entry is deliberatly not used, as the detector just began operating, and it might have skipped a train. :param slow_paths: in which file and h5 path to look for slow data. The first string is the filename with complete path, the second string is the key `karabo_id_control` :param fast_paths: in which module file and h5 path to look for pulses. The first string is the filename with complete path, the second string is the module device name `karabo_id`, the third parameter is the module number, used to navigate through the h5 file structure. :return acq_rate: the acquisition rate. If not found in either files, return None. """ # Attempt to look for acquisition rate in slow data slow_data_file, karabo_id_control = slow_paths slow_data_file = Path(slow_data_file) if slow_data_file.is_file(): slow_data_path = f'CONTROL/{karabo_id_control}/MDL/FPGA_COMP/bunchStructure/repetitionRate/value' # noqa with h5py.File(slow_data_file, "r") as fin: if slow_data_path in fin: # The acquisition rate value is stored in a 1D array of type # float. Use the 3rd value, arbitrarily chosen. # It is desired to loose precision here because the usage is # about bucketing the rate for managing meta-data. return round(float(fin[slow_data_path][3]), 1) # Compute acquisition rate from fast data fast_data_file, karabo_id, module = fast_paths fast_data_file = Path(fast_data_file) if fast_data_file.is_file(): fast_data_path = f'INSTRUMENT/{karabo_id}/DET/{module}CH0:xtdf/image/pulseId' # noqa with h5py.File(fast_data_file, "r") as fin: if fast_data_path in fin: # pulses is of shape (NNNN, 1), of type uint8. # Squeeze out the data, and subtract the 3rd entry from the 2nd # to get a rate. pulses = np.squeeze(fin[fast_data_path][1:3]) diff = pulses[1] - pulses[0] options = {8: 0.5, 4: 1.1, 2: 2.2, 1: 4.5} return options.get(diff, None) def get_gain_setting(fname: str, h5path_ctrl: str) -> int: """Retrieve Gain setting. If the data is available from the middlelayer FPGA_COMP device, then it is retrieved from there. If not, the setting is calculated off `setupr` and `patternTypeIndex` gain-setting 1: setupr@dark=8, setupr@slopespc=40 gain-setting 0: setupr@dark=0, setupr@slopespc=32 patternTypeIndex 1: High-gain patternTypeIndex 2: Medium-gain patternTypeIndex 3: Low-gain patternTypeIndex 4: SlopesPC :param fname: path to file with control information :param h5path_ctrl: path to control information inside the file :return: gain setting """ gain_path = f'{h5path_ctrl}/gain/value' with h5py.File(fname, "r") as fin: if gain_path in fin: return fin[gain_path][0] # Get the index at which the train is not zero. train_id = fin["INDEX/trainId"][()] idx = np.nonzero(train_id)[0][0] setupr = fin[f'{h5path_ctrl}/setupr/value'][idx] pattern_type_idx = fin[f'{h5path_ctrl}/patternTypeIndex/value'][idx] if (setupr == 0 and pattern_type_idx < 4) or ( setupr == 32 and pattern_type_idx == 4): return 0 elif (setupr == 8 and pattern_type_idx < 4) or ( setupr == 40 and pattern_type_idx == 4): return 1 else: raise ValueError('Could not derive gain setting from setupr and patternTypeIndex') # noqa def get_gain_mode(fname: str, h5path_ctrl: str) -> AgipdGainMode: """Returns the gain mode (adaptive or fixed) from slow data""" h5path_run = h5path_ctrl.replace("CONTROL/", "RUN/", 1) h5path_gainmode = f'{h5path_run}/gainModeIndex/value' with h5py.File(fname, "r") as fd: if h5path_gainmode in fd: return AgipdGainMode(fd[h5path_gainmode][0]) return AgipdGainMode.ADAPTIVE_GAIN def get_bias_voltage(fname: str, karabo_id_control: str, module: Optional[int] = 0) -> int: """Read the voltage information from the FPGA device of module 0. Different modules may operate at different voltages. In practice, they all operate at the same voltage. As such, it is okay to read a single module's value. This value is read from slow data. If the file cannot be accessed, an OSError will be raised. If the hdf5 path cannot be accessed, None will be returned. :param fname: path to slow data file with control information :param karabo_id: The detector Karabo id, for creating the hdf5 path :param module: defaults to module 0 :return: voltage, a uint16 """ voltage_path = f'/CONTROL/{karabo_id_control}/FPGA/M_{module}/highVoltage/actual/value' # noqa with h5py.File(fname, "r") as fin: if voltage_path in fin: return fin[voltage_path][0] class AgipdCorrections: def __init__( self, max_cells, max_pulses, h5_data_path="INSTRUMENT/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", h5_index_path="INDEX/SPB_DET_AGIPD1M-1/DET/{}CH0:xtdf/", corr_bools: Optional[dict] = None, gain_mode: AgipdGainMode = AgipdGainMode.ADAPTIVE_GAIN, comp_threads=1, ): """ Initialize an AgipdCorrections Class :param max_cells: maximum number of memory cells to handle, e.g. if calibration constants only exist for a subset of cells :param max_pulses: a range list of pulse indices used for calibration and histogram. [start, end, step] :param h5_data_path: path in HDF5 file which is prefixed to the image/data section :param h5_index_path: path in HDF5 file which is prefixed to the index section :param corr_bools: A dict with all of the correction booleans requested or available :param comp_threads: Number of threads to use for compressing gain/mask The following example shows a typical use case: .. code-block:: python agipd_corr = AgipdCorrections(max_cells, max_pulses, h5_data_path=h5path, h5_index_path=h5path_idx, corr_bools=corr_bools) agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128)) metadata = cal_tools.tools.CalibrationMetadata(out_folder) const_yaml = metadata["retrieved-constants"] for mod in modules: agipd_corr.initialize_from_yaml(karabo_da, const_yaml, mod) data_shape = (n_images_max, 512, 128) agipd_corr.allocate_images(data_shape, n_cores_files) i_proc=0 agipd_corr.read_file(i_proc, file_name, apply_sel_pulses) # Correct first 1000 images agipd_corr.correct_agipd(i_proc, first=0, last=1000) agipd_corr.write_file(i_proc, file_name, ofile_name) """ if corr_bools is None: corr_bools = {} # Data description self.h5_data_path = h5_data_path self.h5_index_path = h5_index_path self.rng_pulses = max_pulses # avoid list(range(*[0]])) self.pulses_lst = list(range(*max_pulses)) \ if not (len(max_pulses) == 1 and max_pulses[0] == 0) else max_pulses # noqa self.max_cells = max_cells self.gain_mode = gain_mode self.comp_threads = comp_threads # Correction parameters self.baseline_corr_noise_threshold = -1000 self.snow_resolution = SnowResolution.INTERPOLATE self.cm_dark_fraction = 0.66 self.cm_dark_min = -25. self.cm_dark_max = 25. self.cm_n_itr = 4 self.mg_hard_threshold = 100 self.hg_hard_threshold = 100 self.noisy_adc_threshold = 0.25 self.ff_gain = 1 # Shared variables for data and constants self.shared_dict = [] self.offset = {} self.noise = {} self.thresholds = {} self.frac_high_med = {} self.md_additional_offset = {} self.rel_gain = {} self.mask = {} self.xray_cor = {} # check if given corr_bools are correct tot_corr_bools = ['only_offset', 'adjust_mg_baseline', 'rel_gain', 'xray_corr', 'blc_noise', 'blc_hmatch', 'blc_stripes', 'blc_set_min', 'match_asics', 'corr_asic_diag', 'zero_nans', 'zero_orange', 'force_hg_if_below', 'force_mg_if_below', 'mask_noisy_adc', 'melt_snow', 'common_mode', 'mask_zero_std', 'low_medium_gap'] if set(corr_bools).issubset(tot_corr_bools): self.corr_bools = corr_bools else: bools = list(set(corr_bools) - set(tot_corr_bools)) raise Exception('Correction Booleans: ' f'{bools} are not available!') # Flags allowing for pulse capacitor constant retrieval. self.pc_bools = [self.corr_bools.get("rel_gain"), self.corr_bools.get("adjust_mg_baseline"), self.corr_bools.get('blc_noise'), self.corr_bools.get('blc_hmatch'), self.corr_bools.get('blc_stripes'), self.corr_bools.get('melt_snow')] self.blc_bools = [self.corr_bools.get('blc_noise'), self.corr_bools.get('blc_hmatch'), self.corr_bools.get('blc_stripes')] def read_file(self, i_proc: int, file_name: str, apply_sel_pulses: Optional[bool] = True ) -> int: """Read file with raw data to shared memory :param file_name: Name of input file including path. :param i_proc: Index of shared memory array. :param apply_sel_pulses: apply selected pulses before all corrections. :return: - n_img: The number of images to correct. """ module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) agipd_base = self.h5_data_path.format(module_idx) idx_base = self.h5_index_path.format(module_idx) data_dict = self.shared_dict[i_proc] data_dict['moduleIdx'][0] = module_idx try: f = h5py.File(file_name, "r") group = f[agipd_base]["image"] (_, first_index, last_index, _, valid_indices) = self.get_valid_image_idx(idx_base, f) allcells = np.squeeze(group['cellId']) allpulses = np.squeeze(group['pulseId']) firange = self.gen_valid_range(first_index, last_index, self.max_cells, allcells, allpulses, valid_indices, apply_sel_pulses) n_img = firange.shape[0] data_dict['nImg'][0] = n_img if np.all(np.diff(firange) == 1): # if firange consists of contiguous indices # convert firange from fancy indexing to slicing firange = slice(firange[0], firange[-1]+1) raw_data = group['data'][firange] else: # Avoid very slow performance using fancing indexing, # if firange consists of non-contiguous indices. raw_data = group['data'][:][firange] data_dict['data'][:n_img] = raw_data[:, 0] data_dict['rawgain'][:n_img] = raw_data[:, 1] data_dict['cellId'][:n_img] = allcells[firange] data_dict['pulseId'][:n_img] = allpulses[firange] data_dict['trainId'][:n_img] = np.squeeze(group['trainId'][:][firange]) # noqa except Exception as e: print(f'Error during reading data from file {file_name}: {e}') print(f'Error traceback: {traceback.format_exc()}') n_img = 0 data_dict['nImg'][0] = 0 return n_img def write_file(self, i_proc, file_name, ofile_name): """ Create output file and write corrected data to it :param file_name: Name of input file including path :param ofile_name: Name of output file including path :param i_proc: Index of shared memory array """ module_idx = int(file_name.split('/')[-1].split('-')[2][-2:]) agipd_base = self.h5_data_path.format(module_idx) idx_base = self.h5_index_path.format(module_idx) data_path = f'{agipd_base}/image' data_dict = self.shared_dict[i_proc] image_fields = [ 'trainId', 'pulseId', 'cellId', 'data', 'gain', 'mask', 'blShift', ] compress_fields = ['gain', 'mask'] n_img = data_dict['nImg'][0] if n_img == 0: return trains = data_dict['trainId'][:n_img] with h5py.File(ofile_name, "w") as outfile: # Copy any other data from the input file. # This includes indexes, so it's important that the corrected data # we write is aligned with the raw data. with h5py.File(file_name, "r") as infile: self.copy_and_sanitize_non_cal_data( infile, outfile, agipd_base, idx_base, trains ) # All corrected data goes in a /INSTRUMENT/.../image group image_grp = outfile[data_path] # Set up all the datasets before filling them. This puts the # metadata about the datasets together at the start of the file, # so it's efficient to examine the file structure. for field in image_fields: arr = data_dict[field][:n_img] if field in compress_fields: # gain/mask compressed with gzip level 1, but not # checksummed as we would have to implement this. kw = dict( compression='gzip', compression_opts=1, shuffle=True ) else: # Uncompressed data can easily be checksummed by HDF5's # filter pipeline. This should be cheap to compute. kw = {'fletcher32': True} if arr.ndim > 1: kw['chunks'] = (1,) + arr.shape[1:] # 1 chunk = 1 image image_grp.create_dataset( field, shape=arr.shape, dtype=arr.dtype, **kw ) # Write the corrected data for field in image_fields: if field in compress_fields: self._write_compressed_frames( image_grp[field], data_dict[field][:n_img], ) else: image_grp[field][:] = data_dict[field][:n_img] def _write_compressed_frames(self, dataset, arr): """Compress gain/mask frames in multiple threads, and save their data This is significantly faster than letting HDF5 do the compression in a single thread. """ def _compress_frame(i): # Equivalent to the HDF5 'shuffle' filter: transpose bytes for better # compression. shuffled = np.ascontiguousarray( arr[i].view(np.uint8).reshape((-1, arr.itemsize)).transpose() ) return i, zlib.compress(shuffled, level=1) with ThreadPool(self.comp_threads) as pool: for i, compressed in pool.imap(_compress_frame, range(len(arr))): # Each frame is 1 complete chunk dataset.id.write_direct_chunk((i, 0, 0), compressed) def cm_correction(self, i_proc, asic): """ Perform common-mode correction of data in shared memory In a given code a complete file is loaded to the memory. Asics common mode correction is calculated based on single image. Cell common mode is calculated across trains and groups of 32 cells. Both corrections are iterative and requires 4 iterations. Correction is performed in chunks of (e.g. 512 images). A complete array of data from one file (256 trains, 352 cells) will take 256 * 352 * 128 * 512 * 4 // 1024**3 = 22 Gb in memory :param i_proc: Index of shared memory array to process :param asic: Asic number to process """ if not self.corr_bools.get("common_mode"): return dark_min = self.cm_dark_min dark_max = self.cm_dark_max fraction = self.cm_dark_fraction n_itr = self.cm_n_itr n_img = self.shared_dict[i_proc]['nImg'][0] cell_id = self.shared_dict[i_proc]['cellId'][:n_img] train_id = self.shared_dict[i_proc]['trainId'][:n_img] cell_ids = cell_id[train_id == train_id[0]] n_cells = cell_ids.size data = self.shared_dict[i_proc]['data'][:n_img].reshape(-1, n_cells, 8, 64, 2, 64) # Loop over iterations for _ in range(n_itr): # Loop over rows of cells # TODO: check what occurs in case of 64 memory cells # as it will have less than 11 iterations first = 0 for cell_row in range(11): last = first + cell_ids[(cell_ids // 32) == cell_row].size if first == last: continue asic_data = data[:, first:last, asic % 8, :, asic // 8, :] # Cell common mode cell_cm_sum, cell_cm_count = \ calgs.sum_and_count_in_range_cell(asic_data, dark_min, dark_max) cell_cm = cell_cm_sum / cell_cm_count # TODO: check files with less 256 trains cell_cm[cell_cm_count < fraction * 32 * 256] = 0 asic_data[...] -= cell_cm[None, None, :, :] # Asics common mode asic_cm_sum, asic_cm_count = \ calgs.sum_and_count_in_range_asic(asic_data, dark_min, dark_max) asic_cm = asic_cm_sum / asic_cm_count asic_cm[asic_cm_count < fraction * 64 * 64] = 0 asic_data[...] -= asic_cm[:, :, None, None] first = last def mask_zero_std(self, i_proc, cells): """ Add bad pixel bit: DATA_STD_IS_ZERO to the mask of bad pixels Pixel is bad if standard deviation for a given pixel and given memory cell is zero :param i_proc: Index of shared memory array to process :param cells: List of cells to be considered """ if not self.corr_bools.get("mask_zero_std"): return module_idx = self.shared_dict[i_proc]['moduleIdx'][0] n_img = self.shared_dict[i_proc]['nImg'][0] data = self.shared_dict[i_proc]['data'][:n_img] cellid = self.shared_dict[i_proc]['cellId'][:n_img] mask_std = self.mask[module_idx] # shape of n_cells, x, y for c in cells: std = np.nanstd(data[cellid == c, ...], axis=0) mask_std[:, c, std == 0] |= BadPixels.DATA_STD_IS_ZERO def offset_correction(self, i_proc: int, first: int, last: int): """ Perform image-wise offset correction for data in shared memory :param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process """ module_idx = self.shared_dict[i_proc]['moduleIdx'][0] data = self.shared_dict[i_proc]['data'][first:last] rawgain = self.shared_dict[i_proc]['rawgain'][first:last] gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: # first evaluate the gain into 0, 1, 2 --> high, medium, low t0 = self.thresholds[module_idx][0] t1 = self.thresholds[module_idx][1] if self.corr_bools.get("melt_snow"): # load raw_data and rgain to be used during gain_correction self.shared_dict[i_proc]["t0_rgain"][first:last] = rawgain / t0[cellid, ...] # noqa self.shared_dict[i_proc]["raw_data"][first:last] = np.copy(data) # noqa # Often most pixels are in high-gain, so it's more efficient to # set the whole output block to zero than select the right pixels. gain[:] = 0 # exceeding first threshold means data is medium or low gain gain[rawgain > t0[cellid, ...]] = 1 # exceeding also second threshold means data is low gain gain[rawgain > t1[cellid, ...]] = 2 else: # the enum values map 1, 2, 3 to (fixed) gain modes gain[:] = self.gain_mode - 1 offsetb = self.offset[module_idx][:, cellid] # force into high or medium gain if requested if self.corr_bools.get("force_mg_if_below"): gain[(gain == 2) & ((data - offsetb[1]) < self.mg_hard_threshold)] = 1 # noqa if self.corr_bools.get("force_hg_if_below"): gain[(gain > 0) & ((data - offsetb[0]) < self.hg_hard_threshold)] = 0 # noqa # choose constants according to gain setting off = calgs.gain_choose(gain, offsetb) del offsetb # subtract offset data -= off del off def baseline_correction(self, i_proc: int, first: int, last: int): """ Perform image-wise base-line shift correction for data in shared memory via histogram or stripe :param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process """ # before doing relative gain correction we need to evaluate any # baseline shifts # as they are effectively and additional offset in the data if not any(self.blc_bools): return module_idx = self.shared_dict[i_proc]['moduleIdx'][0] data = self.shared_dict[i_proc]['data'][first:last] bl_shift = self.shared_dict[i_proc]['blShift'][first:last] gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] # output is saved in sharedmem to pass for correct_agipd() # as this function takes about 3 seconds. self.shared_dict[i_proc]["msk"][first:last] = calgs.gain_choose_int( gain, self.mask[module_idx][:, cellid] ) if hasattr(self, "rel_gain"): # Get the correct rel_gain depending on cell-id self.shared_dict[i_proc]['rel_corr'][first:last] = \ calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # do this image wise, as the shift is per image for i in range(data.shape[0]): # first correction requested may be to evaluate shift via # noise peak if self.corr_bools.get('blc_noise'): mn_noise = np.nanmean(self.noise[module_idx][0, cellid[i]]) dd, sh = baseline_correct_via_noise(data[i], mn_noise, gain[i], self.baseline_corr_noise_threshold) # noqa # if not we continue with initial data else: dd = data[i] sh = 0 # if we have enough pixels in medium or low gain and # correction via hist matching is requested to this now gcrit = np.count_nonzero(gain[i] > 0) > 1000 if (gcrit and self.corr_bools.get('blc_hmatch') and hasattr(self, "rel_gain")): dd2, sh2 = correct_baseline_via_hist(data[i], self.shared_dict[i_proc]['rel_corr'][first:last][i], # noqa gain[i]) data[i] = np.maximum(dd, dd2) sh = np.minimum(sh, sh2) # finally correct diagonal effects if requested if self.corr_bools.get('corr_asic_diag'): ii = data[i, ...] gg = gain[i, ...] adim = correct_baseline_via_hist_asic(ii, gg) data[i, ...] = adim # if there is not enough medium or low gain data to do an # evaluation, do nothing else: data[i, ...] = dd bl_shift[i] = sh if self.corr_bools.get('blc_stripes'): fmh = self.frac_high_med[module_idx][cellid[i]] dd, sh = baseline_correct_via_stripe(data[i, ...], gain[i, ...], self.shared_dict[i_proc]['msk'][first:last][i, ...], # noqa fmh) data[i, ...] = dd bl_shift[i] = sh def gain_correction(self, i_proc: int, first: int, last: int): """ Perform several image-wise corrections for data in shared memory e.g. Relative gain, FlatField xray correction, ..... :param first: Index of the first image to be corrected :param last: Index of the last image to be corrected :param i_proc: Index of shared memory array to process """ module_idx = self.shared_dict[i_proc]['moduleIdx'][0] data = self.shared_dict[i_proc]['data'][first:last] gain = self.shared_dict[i_proc]['gain'][first:last] cellid = self.shared_dict[i_proc]['cellId'][first:last] mask = self.shared_dict[i_proc]['mask'][first:last] rel_corr = self.shared_dict[i_proc]['rel_corr'][first:last] msk = self.shared_dict[i_proc]['msk'][first:last] # if baseline correction was not requested # msk and rel_corr will still be empty shared_mem arrays if not any(self.blc_bools): msk = calgs.gain_choose_int(gain, self.mask[module_idx][:, cellid]) # same for relative gain and then bad pixel mask if hasattr(self, "rel_gain"): # Get the correct rel_gain depending on cell-id rel_corr = calgs.gain_choose(gain, self.rel_gain[module_idx][:, cellid]) # noqa # Correct for relative gain if self.corr_bools.get("rel_gain") and hasattr(self, "rel_gain"): data *= rel_corr del rel_corr # Adjust medium gain baseline to match highest high gain value if self.corr_bools.get("adjust_mg_baseline"): mgbc = self.md_additional_offset[module_idx][cellid, ...] data[gain == 1] += mgbc[gain == 1] del mgbc # Set negative values for medium gain to 0 # TODO: Probably it would be better to add it to badpixel maps, # not just set to 0 if self.corr_bools.get('blc_set_min'): data[(data < 0) & (gain == 1)] = 0 # Do xray correction if requested # The slopes we have in our constants are already relative # slopeFF = slopeFFpix/avarege(slopeFFpix) # To apply them we have to / not * if self.corr_bools.get("xray_corr"): data /= self.xray_cor[module_idx][cellid, ...] # use sharedmem raw_data and t0_rgain # after calculating it while offset correcting. if self.corr_bools.get('melt_snow'): _ = melt_snowy_pixels(self.shared_dict[i_proc]['raw_data'][first:last], # noqa data, gain, self.shared_dict[i_proc]['t0_rgain'][first:last], # noqa self.snow_resolution) # Inner ASIC borders are matched to the same signal level if self.corr_bools.get("match_asics"): data = match_asic_borders(data, 8, module_idx) # Add any non-finite values to the mask, zero them if self.corr_bools.get("zero_nans"): bidx = ~np.isfinite(data) data[bidx] = 0 msk[bidx] |= BadPixels.VALUE_IS_NAN del bidx # Add pixels with unrealistically high and low values to the mask. # Zero them. if self.corr_bools.get("zero_orange"): bidx = (data < -1e7) | (data > 1e7) data[bidx] = 0 msk[bidx] |= BadPixels.VALUE_OUT_OF_RANGE del bidx # Mask entire ADC if they are noise above a threshold # TODO: Needs clarification if needed, # the returned arg is not used. if self.corr_bools.get("mask_noisy_adc"): _ = make_noisy_adc_mask(msk, self.noisy_adc_threshold) # Copy the data across into the existing shared-memory array mask[...] = msk[...] def get_valid_image_idx( self, idx_base: str, infile: str, raw_format_version: int = 2 ): """Return the indices of valid data""" if raw_format_version == 2: count = np.squeeze(infile[idx_base + "image/count"]) first = np.squeeze(infile[idx_base + "image/first"]) if np.count_nonzero(count != 0) == 0: raise IOError("File has no valid counts") valid = count != 0 idxtrains = np.squeeze(infile["/INDEX/trainId"]) # Train indices are of type=f32 # Validate that train indices values fall # between medianTrain +- 1e4 medianTrain = np.nanmedian(idxtrains) lowok = (idxtrains > medianTrain - 1e4) highok = (idxtrains < medianTrain + 1e4) valid &= lowok & highok # Last index = last valid train + max. number of memory cells last_index = int(first[valid][-1] + count[valid][-1]) first_index = int(first[valid][0]) # do actual validity filtering: validc, validf = count[valid], first[valid] # Creating an array of validated indices. # If all indices were validated this array will be the same, # as what is stored at /DET/image/trainId valid_indices = np.concatenate( [ np.arange(validf[i], validf[i] + validc[i]) for i in range(validf.size) ], axis=0, ) valid_indices = np.squeeze(valid_indices).astype(np.int32) elif raw_format_version == 1: status = np.squeeze(infile[idx_base + "image/status"]) if np.count_nonzero(status != 0) == 0: raise IOError(f"File {infile} has no valid counts") last = np.squeeze(infile[idx_base + "image/last"]) first = np.squeeze(infile[idx_base + "image/first"]) valid = status != 0 last_index = int(last[status != 0][-1]) + 1 first_index = int(first[status != 0][0]) idxtrains = np.squeeze(infile["/INDEX/trainId"]) medianTrain = np.nanmedian(idxtrains) lowok = (idxtrains > medianTrain - 1e4) highok = (idxtrains < medianTrain + 1e4) valid &= lowok & highok valid_indices = None else: raise AttributeError( f"Not a known raw format version: {raw_format_version}") return (valid, first_index, last_index, idxtrains, valid_indices) def apply_selected_pulses(self, i_proc: int) -> int: """Select sharedmem data indices to correct based on selected pulses indices. :param i_proc: the index of sharedmem for a given file/module :return n_img: number of images to correct """ data_dict = self.shared_dict[i_proc] n_img = data_dict['nImg'][0] allpulses = data_dict['pulseId'][:n_img] # Initializing can_calibrate array can_calibrate = self.choose_selected_pulses( allpulses, can_calibrate=[True]*len(allpulses)) # Only select data corresponding to selected pulses # and overwrite data in shared-memory leaving # the required indices to correct data = data_dict['data'][:n_img][can_calibrate] rawgain = data_dict['rawgain'][:n_img][can_calibrate] cellId = data_dict['cellId'][:n_img][can_calibrate] pulseId = data_dict['pulseId'][:n_img][can_calibrate] trainId = data_dict['trainId'][:n_img][can_calibrate] # Overwrite selected number of images based on # selected pulses to correct n_img = np.count_nonzero(can_calibrate) data_dict['nImg'][0] = n_img data_dict['data'][: n_img] = data data_dict['rawgain'][:n_img] = rawgain data_dict['cellId'][:n_img] = cellId data_dict['pulseId'][:n_img] = pulseId data_dict['trainId'][:n_img] = trainId return n_img def validate_selected_pulses(self, allpulses: np.array ) -> Tuple[int, int, int]: """Validate the selected pulses given from the notebook Validate that the given range of pulses to correct are within the cells of raw data. :param allpulses: :return : - first_pulse: first pulse index - last_pulse: last pulse index - pulse_step: step range for the selected pulse indices """ # Calculate the pulse step from the chosen max_pulse range if len(self.rng_pulses) == 3: pulse_step = self.rng_pulses[2] else: pulse_step = 1 # Validate selected pulses range: # 1) Make sure the range max doesn't have non-valid idx. if self.pulses_lst[-1] + pulse_step > int(allpulses[-1]): last_pulse = int(allpulses[-1]) + pulse_step else: last_pulse = int(self.pulses_lst[-1]) + pulse_step # 2) Check if 1st pulse index was out of valid range. # If that is the case only calibrate the last step pulse. if self.pulses_lst[0] >= last_pulse: first_pulse = last_pulse - pulse_step else: first_pulse = self.pulses_lst[0] return first_pulse, last_pulse, pulse_step def choose_selected_pulses(self, allpulses: np.array, can_calibrate: np.array) -> np.array: """ Choose given selected pulse from pulseId array of raw data. The selected pulses range is validated then used to add a booleans in can_calibrate and guide the later appliance. :param allpulses: array of pulseIds from raw data :param can_calibrate: array of booleans for indices to calibrate :return can_calibrate: array of booleans to calibrate depending on selected pulses """ (first_pulse, last_pulse, pulse_step) = self.validate_selected_pulses(allpulses) # collect the pulses to be calibrated cal_pulses = allpulses[first_pulse: last_pulse: pulse_step] # Check if a specific pulses need to be calibrated # or with only simple pulse ranging. if pulse_step > 1 or \ allpulses[first_pulse] >= allpulses[last_pulse]: can_calibrate = np.logical_and(can_calibrate, np.isin(allpulses, cal_pulses)) else: # Check interesection between array of booleans and # array of pulses to calibrate. can_calibrate = np.logical_and(can_calibrate, (allpulses <= np.max(cal_pulses)), (allpulses >= np.min(cal_pulses)) ) return can_calibrate def gen_valid_range(self, first_index: int, last_index: int, max_cells: int, allcells: np.array, allpulses: np.array, valid_indices: Optional[np.array] = None, apply_sel_pulses: Optional[bool] = True ) -> np.array: """ Validate the arrays of image.cellId and image.pulseId to check presence of data and to avoid empty trains. selected pulses range given from the AGIPD correction notebook is taken into account if apply_sel_pulses is True :param first_index: first index of image data :param last_index: last index of image data :param max_cells: number of memory cells to correct :param allcells: array of image.cellsIds of raw data :param allpulses: array of image.pulseIds of raw data :param valid_indices: validated indices of image.data :param apply_sel_pulses: A flag for applying selected pulses after validation for correction :return firange: An array of validated image.data indices to correct # TODO: Ignore rows (32 pulse) of empty pulses even if common-mode is selected """ if valid_indices is not None: allcells = allcells[valid_indices] allpulses = allpulses[valid_indices] else: allcells = allcells[first_index:last_index] allpulses = allpulses[first_index:last_index] can_calibrate = (allcells < max_cells) if not np.any(can_calibrate): return if apply_sel_pulses: can_calibrate = self.choose_selected_pulses( allpulses, can_calibrate=can_calibrate) if valid_indices is None: firange = np.arange(first_index, last_index) else: firange = valid_indices firange = firange[can_calibrate] return firange def copy_and_sanitize_non_cal_data(self, infile, outfile, agipd_base, idx_base, trains): """ Copy and sanitize data in `infile` that is not touched by `correctAGIPD` """ # these are touched in the correct function, do not copy them here dont_copy = ["data", "cellId", "trainId", "pulseId", "status", "length"] dont_copy = [agipd_base + "image/{}".format(do) for do in dont_copy] # don't copy these as we may need to adjust if we filter trains dont_copy += [idx_base + "{}/first".format(do) for do in ["image", ]] dont_copy += [idx_base + "{}/count".format(do) for do in ["image", ]] dont_copy += [idx_base + "{}/last".format(do) for do in ["image", ]] dont_copy += [idx_base + "{}/status".format(do) for do in ["image", ]] # a visitor to copy everything else def visitor(k, item): if k not in dont_copy: if isinstance(item, h5py.Group): outfile.create_group(k) elif isinstance(item, h5py.Dataset): group = str(k).split("/") group = "/".join(group[:-1]) infile.copy(k, outfile[group]) infile.visititems(visitor) # sanitize indices for do in ["image", ]: # uq: INDEX/trainID # fidxv: INDEX/.../image/first idx values # cntsv: INDEX/.../image/counts values # Extract parameters through identifying # unique trains, index and numbers. uq, fidxv, cntsv = np.unique(trains, return_index=True, return_counts=True) # noqa # Validate calculated CORR INDEX contents by checking # difference between trainId stored in RAW data and trains from train_diff = np.isin(np.array(infile["/INDEX/trainId"]), uq, invert=True) # noqa # Insert zeros for missing trains. # fidxv and cntsv should have same length as # raw INDEX/.../image/first and INDEX/.../image/count, # respectively # first_inc = first incrementation first_inc = True for i, diff in enumerate(train_diff): if diff: if i < len(cntsv): cntsv = np.insert(cntsv, i, 0) if i == 0: fidxv = np.insert(fidxv, i, 0) else: fidxv = np.insert(fidxv, i, fidxv[i]) else: # append if at the end of the array cntsv = np.append(cntsv, 0) # increment fidxv once with the # no. of processed mem-cells. if first_inc: fidxv = np.append(fidxv, (2 * fidxv[i-1]) - fidxv[i-2]) first_inc = False else: fidxv = np.append(fidxv, fidxv[i-1]) # save INDEX contents (first, count) in CORR files outfile.create_dataset(idx_base + "{}/first".format(do), fidxv.shape, dtype=fidxv.dtype, data=fidxv, fletcher32=True) outfile.create_dataset(idx_base + "{}/count".format(do), cntsv.shape, dtype=cntsv.dtype, data=cntsv, fletcher32=True) def retrieve_constant_and_time(self, karabo_id: str, karabo_da: str, const_dict: Dict[str, Any], cal_db_interface: str, creation_time: str ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """A wrapper around get_constant_from_db_and_time to get constant image and creation time using a dictionary of the names of the required constant from the data base and their parameter operating conditions. :param karabo_id: karabo identifier :param karabo_da: karabo data aggregator :param const_dict: (Dict) A dictionary with constants to retrieve and their operating conditions. e.g.{ "Offset": ["zeros", (128, 512, memory_cells, 3), "Dark"] "Dark-cond": {Condition-name}: condition-value } :param cal_db_interface: (Str) The port interface for calibrationDBRemote :param creation_time: (Str) Raw data creation time :return: cons_data: (np.array) Constant data when: (Dict) A dictionary with all retrieved constants and their creation-time. {"Constant-Name": "creation-time"} """ when = {} cons_data = {} for cname, cval in const_dict.items(): condition = \ getattr(Conditions, cval[2][0]).AGIPD(**cval[2][1]) cons_data[cname], when[cname] = \ get_constant_from_db_and_time(karabo_id, karabo_da, getattr(Constants.AGIPD, cname)(), # noqa condition, getattr(np, cval[0])(cval[1]), cal_db_interface, creation_time, print_once=0) return cons_data, when def init_constants(self, cons_data, when, module_idx): """ For CI derived gain, a mean multiplication factor of 4.48 compared to medium gain is used, as no reliable CI data for all memory cells exists of the current AGIPD instances. Relative gain is derived both from pulse capacitor as well as low intensity flat field data, information from flat field data is needed to 'calibrate' pulse capacitor data, if there is no available FF data, relative gain for High Gain stage is set to 1: * Relative gain for High gain stage - from the FF data we get the relative slopes of a given pixel and memory cells with respect to all memory cells and all pixels in the module, Please note: Current slopesFF avaialble in calibibration constants are created per pixel only, not per memory cell: rel_high_gain = 1 if only PC data is available rel_high_gain = rel_slopesFF if FF data is also available * Relative gain for Medium gain stage: we derive the factor between high and medium gain using slope information from fits to the linear part of high and medium gain: rfpc_high_medium = m_h/m_m where m_h and m_m is the medium gain slope of given memory cells and pixel and m_h is the high gain slope as above rel_gain_medium = rel_high_gain * rfpc_high_medium With this data the relative gain for the three gain stages evaluates to: rel_high gain = 1 or rel_slopesFF rel_medium gain = rel_high_gain * rfpc_high_medium rel_low gain = _rel_medium gain * 4.48 :param cons_data: A dictionary for each retrieved constant value. :param when: A dictionary for the creation time of each retrieved constant. :param module_idx: A module_idx index :return: """ self.offset[module_idx][...] = cons_data["Offset"].transpose()[...] self.noise[module_idx][...] = cons_data["Noise"].transpose()[...] if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: self.thresholds[module_idx][...] = cons_data["ThresholdsDark"].transpose()[:3,...] # noqa if self.corr_bools.get("low_medium_gap"): t0 = self.thresholds[module_idx][0] t1 = self.thresholds[module_idx][1] t1[t1 <= 1.05 * t0] = np.iinfo(np.uint16).max bpixels = cons_data["BadPixelsDark"].astype(np.uint32) if self.corr_bools.get("xray_corr"): if when["BadPixelsFF"]: bpixels |= cons_data["BadPixelsFF"].astype(np.uint32)[..., :bpixels.shape[2], # noqa None] if when["SlopesFF"]: # Checking if constant was retrieved slopesFF = cons_data["SlopesFF"] # This could be used for backward compatibility # for very old SlopesFF constants if len(slopesFF.shape) == 4: slopesFF = slopesFF[..., 0] # This is for backward compatability for old FF constants # (128, 512, mem_cells) if slopesFF.shape[-1] == 2: xray_cor = np.squeeze(slopesFF[..., 0]) xray_cor_med = np.nanmedian(xray_cor) xray_cor[np.isnan(xray_cor)] = xray_cor_med xray_cor[(xray_cor < 0.8) | ( xray_cor > 1.2)] = xray_cor_med xray_cor = np.dstack([xray_cor]*self.max_cells) else: # Memory cell resolved xray_cor correction xray_cor = slopesFF # (128, 512, mem_cells) if xray_cor.shape[-1] < self.max_cells: # When working with new constant with fewer memory # cells, eg. lacking enough FF data or during # development, xray_cor must be expand its last memory # cell to maintain a consistent shape. xray_cor = np.dstack(xray_cor, np.dstack([xray_cor[..., -1]] * (self.max_cells - xray_cor.shape[-1]))) # noqa # This is already done for old constants, # but new constant is absolute and we need to have # global ADU output for the moment xray_cor /= self.ff_gain else: xray_cor = np.ones((128, 512, self.max_cells), np.float32) self.xray_cor[module_idx][...] = xray_cor.transpose()[...] # add additional bad pixel information if any(self.pc_bools): if when["BadPixelsPC"]: bppc = np.moveaxis(cons_data["BadPixelsPC"].astype(np.uint32), 0, 2) bpixels |= bppc[..., :bpixels.shape[2], None] # calculate relative gain from the constants rel_gain = np.ones((128, 512, self.max_cells, 3), np.float32) if when["SlopesPC"]: slopesPC = cons_data["SlopesPC"].astype(np.float32) # This will handle some historical data in a different format # constant dimension injected first if slopesPC.shape[0] == 10 or slopesPC.shape[0] == 11: slopesPC = np.moveaxis(slopesPC, 0, 3) slopesPC = np.moveaxis(slopesPC, 0, 2) # high gain slope from pulse capacitor data pc_high_m = slopesPC[..., :self.max_cells, 0] pc_high_l = slopesPC[..., :self.max_cells, 1] # medium gain slope from pulse capacitor data pc_med_m = slopesPC[..., :self.max_cells, 3] pc_med_l = slopesPC[..., :self.max_cells, 4] # calculate median for slopes pc_high_med = np.nanmedian(pc_high_m, axis=(0, 1)) pc_med_med = np.nanmedian(pc_med_m, axis=(0, 1)) # calculate median for intercepts: pc_high_l_med = np.nanmedian(pc_high_l, axis=(0, 1)) pc_med_l_med = np.nanmedian(pc_med_l, axis=(0, 1)) # sanitize PC data # (it should be done already on the level of constants) # In the following loop, # replace `nan`s across memory cells with # the median value calculated previously. # Then, values outside of the valid range (0.8 and 1.2) # are fixed to the median value. # This is applied for high and medium gain stages for i in range(self.max_cells): pc_high_m[np.isnan(pc_high_m[..., i])] = pc_high_med[i] pc_med_m[np.isnan(pc_med_m[..., i])] = pc_med_med[i] pc_high_l[np.isnan(pc_high_l[..., i])] = pc_high_l_med[i] pc_med_l[np.isnan(pc_med_l[..., i])] = pc_med_l_med[i] pc_high_m[(pc_high_m[..., i] < 0.8 * pc_high_med[i]) | (pc_high_m[..., i] > 1.2 * pc_high_med[i])] = pc_high_med[i] # noqa pc_med_m[(pc_med_m[..., i] < 0.8 * pc_med_med[i]) | (pc_med_m[..., i] > 1.2 * pc_med_med[i])] = pc_med_med[i] # noqa pc_high_l[(pc_high_l[..., i] < 0.8 * pc_high_l_med[i]) | (pc_high_l[..., i] > 1.2 * pc_high_l_med[i])] = pc_high_l_med[i] # noqa pc_med_l[(pc_med_l[..., i] < 0.8 * pc_med_l_med[i]) | (pc_med_l[..., i] > 1.2 * pc_med_l_med[i])] = pc_med_l_med[i] # noqa # ration between HG and MG per pixel per mem cell used # for rel gain calculation frac_high_med_pix = pc_high_m / pc_med_m # average ratio between HG and MG as a function of # mem cell (needed for bls_stripes) # TODO: Per pixel would be more optimal correction frac_high_med = pc_high_med / pc_med_med # calculate additional medium-gain offset md_additional_offset = pc_high_l - pc_med_l * pc_high_m / pc_med_m # noqa # Calculate relative gain. If FF constants are available, # use them for high gain # if not rel_gain is calculated using PC data only # if self.corr_bools.get("xray_corr"): # rel_gain[..., :self.max_cells, 0] /= xray_corr # PC data should be 'calibrated with X-ray data, # if it is not done, it is better to use 1 instead of bias # the results with PC arteffacts. # rel_gain[..., 0] = 1./(pc_high_m / pc_high_ave) rel_gain[..., 1] = rel_gain[..., 0] * frac_high_med_pix rel_gain[..., 2] = rel_gain[..., 1] * 4.48 else: # Intialize with fake calculated parameters of Ones and Zeros md_additional_offset = np.zeros((128, 512, self.max_cells), np.float32) frac_high_med = np.ones((self.max_cells,), np.float32) self.md_additional_offset[module_idx][...] = md_additional_offset.transpose()[...] # noqa self.rel_gain[module_idx][...] = rel_gain[...].transpose() self.frac_high_med[module_idx][...] = frac_high_med self.mask[module_idx][...] = bpixels.transpose()[...] return def initialize_from_yaml( self, karabo_da: str, const_yaml: Dict[str, Any], module_idx: int ) -> Dict[str, Any]: """Initialize calibration constants from a yaml file :param karabo_da: a karabo data aggregator :param const_yaml: from the "retrieved-constants" part of a yaml file from pre-notebook, which consists of metadata of either the constant file path or the empty constant shape, and the creation-time of the retrieved constants :param module_idx: Index of module :return when: dict of retrieved constants with their creation-time """ # string of the device name. cons_data = dict() when = dict() db_module = const_yaml[karabo_da]["physical-detector-unit"] for cname, mdata in const_yaml[karabo_da]["constants"].items(): when[cname] = mdata["creation-time"] if when[cname]: with h5py.File(mdata["file-path"], "r") as cf: cons_data[cname] = np.copy(cf[f"{db_module}/{cname}/0/data"]) # noqa else: # Create empty constant using the list elements cons_data[cname] = getattr(np, mdata["file-path"][0])(mdata["file-path"][1]) # noqa self.init_constants(cons_data, when, module_idx) return when def initialize_from_db(self, karabo_id: str, karabo_da: str, cal_db_interface: str, creation_time: 'datetime.datetime', memory_cells: float, bias_voltage: int, photon_energy: float, gain_setting: float, acquisition_rate: float, module_idx: int, only_dark: bool = False): """ Initialize calibration constants from the calibration database :param karabo_id: karabo identifier :param karabo_da: karabo data aggregator :param cal_db_interface: database interaface port :param creation_time: time for desired calibration constant version :param memory_cells: number of memory cells used for CCV conditions :param bias_voltage: bias voltage used for CCV conditions :param photon_energy: photon energy used for CCV conditions :param gain_setting: gain setting used for CCV conditions :param acquisition_rate: acquistion rate used for CCV conditions :param module_idx: module index to save retrieved CCV in sharedmem :param only_dark: load only dark image derived constants. This implies that a `calfile` is used to load the remaining constants. Useful to reduce DB traffic and interactions for non-frequently changing constants, i.e. such which are not usually updated during a beamtime. The `cal_db_interface` parameter in the `dbparms` tuple may be in one of the following notations: * tcp://host:port to directly identify the host and port to connect to * tcp://host:port_low#port_high to specify a port range from which a random port will be picked. E.g. specifying tcp://max-exfl016:8015#8025 will randomly pick an address in the range max-exfl016:8015 and max-exfl016:8025. The latter notation allows for load-balancing. This routine loads the following constants as given in `iCalibrationDB`: Dark Image Derived ------------------ * Constants.AGIPD.Offset * Constants.AGIPD.Noise * Constants.AGIPD.BadPixelsDark * Constants.AGIPD.ThresholdsDark Pulse Capacitor Derived ----------------------- * Constants.AGIPD.SlopesPC Flat-Field Derived * Constants.AGIPD.SlopesFF """ const_dict = assemble_constant_dict( self.corr_bools, self.pc_bools, memory_cells, bias_voltage, gain_setting, acquisition_rate, photon_energy, beam_energy=None, only_dark=only_dark, ) cons_data, when = self.retrieve_constant_and_time( karabo_id, karabo_da, const_dict, cal_db_interface, creation_time ) self.init_constants(cons_data, when, module_idx) return when def allocate_constants(self, modules, constant_shape): """ Allocate memory for correction constants :param modules: Module indices :param constant_shape: Shape of expected constants (gain, cells, x, y) """ for module_idx in modules: self.offset[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN: self.thresholds[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa self.noise[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa self.md_additional_offset[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") # noqa self.rel_gain[module_idx] = sharedmem.empty(constant_shape, dtype="f4") # noqa self.frac_high_med[module_idx] = sharedmem.empty(constant_shape[1], dtype="f4") # noqa self.mask[module_idx] = sharedmem.empty(constant_shape, dtype="i4") self.xray_cor[module_idx] = sharedmem.empty(constant_shape[1:], dtype="f4") # noqa def allocate_images(self, shape, n_cores_files): """ Allocate memory for image data and variable shared between correction functions :param shape: Shape of expected data (nImg, x, y) :param n_cores_files: Number of files, handled in parallel """ self.shared_dict = [] for i in range(n_cores_files): self.shared_dict.append({}) self.shared_dict[i]["cellId"] = sharedmem.empty(shape[0], dtype="u2") # noqa self.shared_dict[i]["pulseId"] = sharedmem.empty(shape[0], dtype="u8") # noqa self.shared_dict[i]["trainId"] = sharedmem.empty(shape[0], dtype="u8") # noqa self.shared_dict[i]["moduleIdx"] = sharedmem.empty(1, dtype="i4") self.shared_dict[i]["nImg"] = sharedmem.empty(1, dtype="i4") self.shared_dict[i]["mask"] = sharedmem.empty(shape, dtype="u4") self.shared_dict[i]["data"] = sharedmem.empty(shape, dtype="f4") self.shared_dict[i]["rawgain"] = sharedmem.empty(shape, dtype="u2") self.shared_dict[i]["gain"] = sharedmem.empty(shape, dtype="u1") self.shared_dict[i]["blShift"] = sharedmem.empty(shape[0], dtype="f4") # noqa # Parameters shared between image-wise correction functions self.shared_dict[i]["msk"] = sharedmem.empty(shape, dtype="i4") self.shared_dict[i]["raw_data"] = sharedmem.empty(shape, dtype="f4") # noqa self.shared_dict[i]["rel_corr"] = sharedmem.empty(shape, dtype="f4") # noqa self.shared_dict[i]["t0_rgain"] = sharedmem.empty(shape, dtype="u2") # noqa