From 36a51392ab4307e6618c72aa6e1c248015def1ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Le=20Guyader?= <loic.le.guyader@xfel.eu> Date: Tue, 26 Nov 2024 09:14:26 +0100 Subject: [PATCH] Back on master --- src/toolbox_scs/detectors/digitizers.py | 371 +----------------------- 1 file changed, 2 insertions(+), 369 deletions(-) diff --git a/src/toolbox_scs/detectors/digitizers.py b/src/toolbox_scs/detectors/digitizers.py index 9dc268d..8367242 100644 --- a/src/toolbox_scs/detectors/digitizers.py +++ b/src/toolbox_scs/detectors/digitizers.py @@ -7,7 +7,6 @@ """ import logging -import os import numpy as np import xarray as xr @@ -19,9 +18,6 @@ from ..misc.bunch_pattern_external import is_pulse_at from ..util.exceptions import ToolBoxValueError from ..mnemonics_machinery import (mnemonics_to_process, mnemonics_for_run) -from extra_data import open_run -from extra_data.read_machinery import find_proposal -from extra.components import XrayPulses, OpticalLaserPulses __all__ = [ 'check_peak_params', @@ -30,10 +26,7 @@ __all__ = [ 'get_peaks', 'get_tim_peaks', 'digitizer_signal_description', - 'get_dig_avg_trace', - 'extract_digitizer_peaks', - 'load_processed_peaks', - 'check_processed_peak_params' + 'get_dig_avg_trace' ] log = logging.getLogger(__name__) @@ -737,7 +730,7 @@ def check_peak_params(run, mnemonic, raw_trace=None, ntrains=200, params=None, return params -def plotPeakIntegrationWindow(raw_trace, params, bp_params=None, show_all=False): +def plotPeakIntegrationWindow(raw_trace, params, bp_params, show_all=False): if show_all: fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True) n = params['npulses'] @@ -1252,363 +1245,3 @@ def timFactorFromTable(voltage, photonEnergy, mcp=1): poly = np.poly1d(tim_calibration_table[photonEnergy][mcp-1]) f = -np.exp(poly(voltage)) return f - -############################################################################################# -############################################################################################# -############################################################################################# - -def extract_digitizer_peaks(proposal, runNB, mnemonic, bunchPattern=None, - integParams=None, autoFind=True, save=True, - subdir='usr/processed_runs'): - """ - Extract the peaks from digitizer raw traces and saves them into a file. - The calculation is a trapezoidal integration between 'pulseStart' and - 'pulseStop' with subtraction of a baseline defined as the median between - 'baseStart' and 'baseStop'. - The integration parameters can either be provided using integParams, or - determined by a peak finding algorithm using autoFind. If the bunchPattern - argument is provided, the pulse ids are aligned to it. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing raw traces. Example: 'XRD_MCP_BIGraw' - bunchPattern: str - 'sase3' or 'scs_ppl'. If provided, checks the bunch pattern table using - Extra XrayPulses or OpticalPulses and aligns the pulse ids. - If None, the pulse ids are not aligned and indexed between 0 and the - number of pulses per train. - integParams: dict - dictionnary with keys ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop', 'period', 'npulses']. If provided, autoFind is set to False. - If bunchPattern is not None, 'period' and 'npulses' are adjusted to match - the bunch pattern and align the pulse ids. - autoFind: bool - If True, a peak-finding algorithm is used to determine 'pulseStart', - 'pulseStop', 'baseStart', 'baseStop', and 'period' and 'npulses' if - bunchPattern is None (otherwise the period and npulses from bunchPattern - are used for pulse id alingment). - save: bool - whether to save the result to a file or not. - subdir: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - """ - if integParams is None and autoFind is False: - log.warning('integParams not provided and autoFind is False. ' - 'Cannot compute peak integration parameters.') - return xr.DataArray() - - run = open_run(proposal, runNB) - run_mnemonics = mnemonics_for_run(run) - mnemo = run_mnemonics.get(mnemonic) - if mnemo is None: - log.warning('Mnemonic not found. Skipping.') - return xr.DataArray() - source, key = mnemo['source'], mnemo['key'] - extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern) - if extra_dim is None: - extra_dim = 'pulseId' - digitizer = digitizer_type(run, source) - if digitizer == 'FastADC': - pulse_period = 24 - if digitizer == 'ADQ412': - pulse_period = 440 - - pattern = None - regular = True - try: - if bunchPattern == 'sase3': - pattern = XrayPulses(run) - if bunchPattern == 'scs_ppl': - pattern = OpticalLaserPulses(run) - except Exception as e: - print(e) - bunchPattern = None - - if integParams is not None: - required_keys = ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop', 'period', 'npulses'] - if not all(name in integParams for name in required_keys): - raise TypeError('All keys of integParams argument ' - f'{required_keys} are required.') - params = integParams.copy() - autoFind = False - if pattern is not None: - # use period and npulses from pattern - pulse_ids = pattern.peek_pulse_ids(labelled=False) - npulses_from_bp = len(pulse_ids) - period_from_bp = 0 - if npulses_from_bp > 1: - period_from_bp = min(np.diff(pulse_ids)) * pulse_period - if (npulses_from_bp != params['npulses'] or - period_from_bp != params['period']): - print(f'Integration parameters (npulses={params["npulses"]}, ' - f'period={params["period"]}) do not match ' - f'the bunch pattern (npulses={npulses_from_bp}, ' - f'period={period_from_bp}). Using bunch pattern parameters.') - params['npulses'] = npulses_from_bp - params['period'] = period_from_bp - else: - period = params['period'] - npulses = params['npulses'] - pulse_ids = np.arange(params['npulses'], dtype=np.uint64) - - elif pattern is not None: - if pattern.is_constant_pattern() is False: - print('The number of pulses changed during the run.') - pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) - regular = False - else: - pulse_ids = pattern.peek_pulse_ids(labelled=False) - npulses = len(pulse_ids) - period = 0 - if npulses > 1: - periods = np.diff(pulse_ids) - if len(np.unique(periods)) > 1: - regular = False - period = min(periods) - npulses = int((max(pulse_ids) - min(pulse_ids)) / period) + 1 - period *= pulse_period - else: - pulse_ids = npulses = period = None - - # generate average trace - traces = run[source, key].xarray(name=mnemonic.replace('raw', 'avg'), - extra_dims=mnemo['dim']) - trace = traces.mean('trainId') - - # find peak integration parameters - if autoFind == True: - params = find_integ_params(trace) - if (period is not None and params['period'] != period - or npulses is not None and params['npulses'] != npulses): - log.warning(f'Bunch pattern (npulses={npulses}, period={period}) and ' - f'found integration parameters (npulses={params["npulses"]}, ' - f'period={params["period"]}) do not match. Using bunch ' - 'pattern parameters.') - params['period'] = period - params['npulses'] = min(len(trace) // period, npulses) - print(params['npulses']) - if pulse_ids is None: - pulse_ids = np.arange(params['npulses'], dtype=np.uint64) - - if params is None: - print('Could not find peak integration parameters.') - return xr.DataArray() - - # extract peaks - data = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim) - data = data.rename(mnemonic.replace('raw', 'peaks')) - data = data.assign_coords({extra_dim: pulse_ids}) - - if regular is False: - period = int(period / pulse_period) - mask = pattern.pulse_mask(labelled=False) - mask = xr.DataArray(mask, dims=['trainId', extra_dim], - coords={'trainId': run[source, key].train_ids, - extra_dim: np.arange(mask.shape[1])}) - mask = mask.sel({extra_dim: slice(pulse_ids[0], - pulse_ids[0] + npulses*period,period)}) - data = data.where(mask, drop=True) - - data.attrs['params_keys'] = list(params.keys()) - data.attrs[f'params_{data.name}'] = list(params.values()) - if save: - save_peaks(proposal, runNB, data, trace, subdir) - return data - - -def save_peaks(proposal, runNB, peaks, avg_trace, subdir): - ''' - Save the peaks extracted with extract_digitizer_peaks() as well as the - average raw trace in a dataset at the proposal + subdir location. - If a dataset already exists, the new data is merged with it. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - peaks: xarray DataArray - the 2D-array obtained by extract_digitizer_peaks() - avg_trace: xarray DataArray - the average raw trace over the trains - subdir: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - - Returns - ------- - None - ''' - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, subdir + f'/r{runNB:04d}/') - os.makedirs(path, exist_ok=True) - fname = path + f'r{runNB:04d}-digitizers-data.h5' - ds_peaks = peaks.to_dataset(promote_attrs=True) - - if os.path.isfile(fname): - ds = xr.load_dataset(fname) - ds = ds.drop_vars([peaks.name, avg_trace.name], errors='ignore') - for dim in ds.dims: - if all(dim not in ds[v].dims for v in ds): - ds = ds.drop_dims(dim) - dim_name = 'sampleId' - if 'sampleId' in ds.dims and ds.sizes['sampleId'] != len(avg_trace): - dim_name = 'sampleId2' - avg_trace = avg_trace.rename({avg_trace.dims[0]: dim_name}) - if f'params_{peaks.name}' in ds.attrs: - del ds.attrs[f'params_{peaks.name}'] - ds = xr.merge([ds, ds_peaks, avg_trace], - combine_attrs='drop_conflicts', join='inner') - else: - ds = ds_peaks.merge(avg_trace.rename({avg_trace.dims[0]: 'sampleId'})) - ds.to_netcdf(fname, format='NETCDF4') - print(f'saved data into {fname}.') - return None - - -def load_processed_peaks(proposal, runNB, mnemonic=None, - data='usr/processed_runs', merge_with=None): - """ - Load processed digitizer peaks data. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. - If None, the entire dataset is loaded - data: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - merge_with: xarray Dataset - A dataset to merge the data with. - - Returns - ------- - xarray DataArray if menmonic is not None and merge_with is None - xarray Dataset if mnemonic is None or merge_with is not None. - - Example - ------- - # load the mono energy and the MCP_BIG peaks - run, ds = tb.load(proposal, runNB, 'nrj') - ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks', merge_with=ds) - """ - if mnemonic is None: - return load_all_processed_peaks(proposal, runNB, data, merge_with) - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, data + f'/r{runNB:04d}/') - fname = path + f'r{runNB:04d}-digitizers-data.h5' - if os.path.isfile(fname): - ds = xr.load_dataset(fname) - if mnemonic not in ds: - print(f'Mnemonic {mnemonic} not found in {fname}') - return {} - da = ds[mnemonic] - da.attrs['params_keys'] = ds.attrs['params_keys'] - da.attrs[f'params_{mnemonic}'] = ds.attrs[f'params_{mnemonic}'] - if merge_with is not None: - return merge_with.merge(da.to_dataset(promote_attrs=True), - combine_attrs='drop_conflicts', join='inner') - else: - return da - else: - print(f'Mnemonic {mnemonic} not found in {fname}') - return merge_with - - -def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs', - merge_with=None): - """ - Load processed digitizer peaks dataset. The data contains the peaks, - the average raw trace and the integration parameters (attribute) of - each processed digitizer source. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - data: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - merge_with: xarray Dataset - A dataset to merge the data with. - - Returns - ------- - xarray Dataset - """ - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, data + f'/r{runNB:04d}/') - fname = path + f'r{runNB:04d}-digitizers-data.h5' - if os.path.isfile(fname): - if merge_with is not None: - return merge_with.merge(xr.load_dataset(fname), - combine_attrs='drop_conflicts', join='inner') - return xr.load_dataset(fname) - else: - print(f'{fname} not found.') - return merge_with - - -def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_runs', - plot=True, show_all=False): - """ - Check the integration parameters used to generate the processed peak values. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. - If None, the entire dataset is loaded - data: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - plot: bool - If True, displays the raw trace and peak integration regions. - show_all: bool - If True, displays the entire raw trace and all peak integration - regions (this can be time-consuming). - If False, shows the first and last pulses. - - Returns - ------- - params: dict - the integration parameters with keys ['pulseStart', 'pulseStop', - 'baseStart', 'baseStop', 'period', 'npulses']. - See `extract_digitizer_peaks()`. - """ - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, data + f'/r{runNB:04d}/') - fname = path + f'r{runNB:04d}-digitizers-data.h5' - if os.path.isfile(fname): - ds = xr.load_dataset(fname) - if mnemonic.replace('raw', 'peaks') not in ds: - print(f'Mnemonic {mnemonic} not found in {fname}') - return {} - da = ds[mnemonic] - params = dict(zip(ds.attrs['params_keys'], ds.attrs[f'params_{mnemonic}'])) - if plot: - plotPeakIntegrationWindow(ds[mnemonic.replace('peaks', 'avg')], - params, show_all=show_all) - return params - else: - print(f'{fname} not found.') - return {} \ No newline at end of file -- GitLab