diff --git a/src/toolbox_scs/detectors/digitizers.py b/src/toolbox_scs/detectors/digitizers.py index 544344544f41b42fe361fdd2938776141cc6bdf4..67c2cac0777a9ce2ff00a488d300e17ecd85f223 100644 --- a/src/toolbox_scs/detectors/digitizers.py +++ b/src/toolbox_scs/detectors/digitizers.py @@ -31,7 +31,6 @@ __all__ = [ 'get_tim_peaks', 'digitizer_signal_description', 'get_dig_avg_trace', - 'extract_digitizer_peaks', 'load_processed_peaks', 'check_processed_peak_params' ] @@ -69,7 +68,7 @@ def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop, xarray DataArray """ - assert len(traces.shape) == 2 + assert traces.ndim == 2 if isinstance(traces, xr.DataArray): ntid = traces.sizes['trainId'] @@ -85,6 +84,8 @@ def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop, pulses = pulseStart - pulseStart[0] pulseStart = pulseStart[0] else: + if period is None or period == 0: + period = 1 pulses = range(0, npulses*period, period) if extra_dim is None: @@ -1102,19 +1103,27 @@ def get_laser_peaks(run, mnemonic=None, merge_with=None, bunchPattern, integParams, False) -def get_digitizer_peaks(run, mnemonic, merge_with=None, +def get_digitizer_peaks(proposal, runNB, mnemonic, merge_with=None, bunchPattern='sase3', integParams=None, - keepAllSase=False): + save=False, subdir='usr/processed_runs'): """ - Automatically computes digitizer peaks. A source can be loaded on the - fly via the mnemonic argument, or processed from an existing data set - (merge_with). The bunch pattern table is used to assign the pulse - id coordinates. + Extract the peaks from digitizer raw traces. The result can be merged + to an existing `merge_with` dataset and/or saved into an HDF file. + The calculation is a trapezoidal integration between 'pulseStart' and + 'pulseStop' with subtraction of a baseline defined as the median between + 'baseStart' and 'baseStop'. + The integration parameters can either be provided using integParams, or + computed by a peak finding algorithm if integParams is None. + If the bunchPattern argument is provided, the pulse ids are aligned to it. + If there is a mismatch between the provided parameters or the computed + parameters and the bunch pattern, the bunch pattern parameters prevail. Parameters ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. + proposal: int + the proposal number + runNB: int + the run number mnemonic: str mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd". The data is either loaded from the DataCollection or taken from @@ -1128,9 +1137,6 @@ def get_digitizer_peaks(run, mnemonic, merge_with=None, {'pulseStart':100, 'pulsestop':200, 'baseStart':50, 'baseStop':99, 'period':24, 'npulses':500}. If None, integration parameters are computed automatically. - keepAllSase: bool - Only relevant in case of sase-dedicated trains. If True, all - trains are kept, else only those of the bunchPattern are kept. Returns ------- @@ -1138,53 +1144,65 @@ def get_digitizer_peaks(run, mnemonic, merge_with=None, substituted by the peak caclulated values (e.g. "FastADC2raw" becomes "FastADC2peaks"). """ - run_mnemonics = mnemonics_for_run(run) - if mnemonic not in run_mnemonics: - log.warning('Mnemonic not found in run. Skipping.') - return merge_with - if bool(merge_with) and mnemonic in merge_with: - for d in merge_with[mnemonic].dims: - if d in ['sa3_pId', 'ol_pId']: - log.warning(f'{mnemonic} already extracted. ' - 'Skipping.') - return merge_with - # check if bunch pattern table exists - bpt = None - if bool(merge_with) and 'bunchPatternTable' in merge_with: - bpt = merge_with['bunchPatternTable'] - log.debug('Using bpt from merge_with dataset.') - elif 'bunchPatternTable' in run_mnemonics: - m = run_mnemonics['bunchPatternTable'] - bpt = run.get_array(m['source'], m['key'], m['dim']) - log.debug('Loaded bpt from DataCollection.') - elif 'bunchPatternTable_SA3' in run_mnemonics: - m = run_mnemonics['bunchPatternTable_SA3'] - bpt = run.get_array(m['source'], m['key'], m['dim']) - log.debug('Loaded bpt from DataCollection.') - else: - log.warning('Could not load bunch pattern table.') # prepare resulting dataset if bool(merge_with): mw_ds = merge_with.drop(mnemonic, errors='ignore') else: mw_ds = xr.Dataset() - # iterate over mnemonics and merge arrays in dataset - autoFind = True if integParams is None else False - m = run_mnemonics[mnemonic] - useRaw = True if 'raw' in mnemonic else False - if bool(merge_with) and mnemonic in merge_with: - data = merge_with[mnemonic] - else: - data = run.get_array(m['source'], m['key'], m['dim']) - peaks = get_peaks(run, data, mnemonic, - useRaw=useRaw, - autoFind=autoFind, - integParams=integParams, - bunchPattern=bunchPattern, - bpt=bpt) - name = mnemonic.replace('raw', 'peaks').replace('apd', 'peaks') - join = 'outer' if keepAllSase else 'inner' - ds = mw_ds.merge(peaks.rename(name), join=join) + run = open_run(proposal, runNB) + run_mnemonics = mnemonics_for_run(run) + mnemo = run_mnemonics.get(mnemonic) + if mnemo is None: + log.warning('Mnemonic not found. Skipping.') + return xr.DataArray() + source, key = mnemo['source'], mnemo['key'] + extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern) + if extra_dim is None: + extra_dim = 'pulseId' + pattern = None + try: + if bunchPattern == 'sase3': + pattern = XrayPulses(run) + if bunchPattern == 'scs_ppl': + pattern = OpticalLaserPulses(run) + except Exception as e: + log.warning(e) + + # load traces and calculate the average + traces = run[source, key].xarray(name=mnemonic, extra_dims=mnemo['dim']) + trace = traces.mean('trainId').rename(mnemonic.replace('raw', 'avg')) + + # find integration parameters + params, pulse_ids, regular, trace = find_peak_integration_parameters( + run, mnemonic, trace, integParams, pattern) + + # extract peaks + peaks = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim) + peaks = peaks.rename(mnemonic.replace('raw', 'peaks')) + + # Align pulse id + if regular is True: + peaks = peaks.assign_coords({extra_dim: pulse_ids}) + elif pattern: + mask = pattern.pulse_mask(labelled=False) + mask = xr.DataArray(mask, dims=['trainId', extra_dim], + coords={'trainId': run[source, key].train_ids, + extra_dim: np.arange(mask.shape[1])}) + mask = mask.sel({extra_dim: pulse_ids}) + peaks = peaks.where(mask, drop=True) + + # add peak integration parameters attributes + for p in params: + if params[p] is None: + params[p] = 'None' + peaks.attrs[f'{peaks.name}_{p}'] = params[p] + + # save peaks + if save: + save_peaks(proposal, runNB, peaks, trace, subdir) + + # merge with existing dataset + ds = mw_ds.merge(peaks, join='inner') return ds @@ -1436,171 +1454,6 @@ def timFactorFromTable(voltage, photonEnergy, mcp=1): ############################################################################################# ############################################################################################# -def extract_digitizer_peaks(proposal, runNB, mnemonic, bunchPattern=None, - integParams=None, autoFind=True, save=False, - subdir='usr/processed_runs'): - """ - Extract the peaks from digitizer raw traces and saves them into a file. - The calculation is a trapezoidal integration between 'pulseStart' and - 'pulseStop' with subtraction of a baseline defined as the median between - 'baseStart' and 'baseStop'. - The integration parameters can either be provided using integParams, or - determined by a peak finding algorithm using autoFind. If the bunchPattern - argument is provided, the pulse ids are aligned to it. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing raw traces. Example: 'XRD_MCP_BIGraw' - bunchPattern: str - 'sase3' or 'scs_ppl'. If provided, checks the bunch pattern table using - Extra XrayPulses or OpticalPulses and aligns the pulse ids. - If None, the pulse ids are not aligned and indexed between 0 and the - number of pulses per train. - integParams: dict - dictionnary with keys ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop', 'period', 'npulses']. If provided, autoFind is set to False. - If bunchPattern is not None, 'period' and 'npulses' are adjusted to match - the bunch pattern and align the pulse ids. - autoFind: bool - If True, a peak-finding algorithm is used to determine 'pulseStart', - 'pulseStop', 'baseStart', 'baseStop', and 'period' and 'npulses' if - bunchPattern is None (otherwise the period and npulses from bunchPattern - are used for pulse id alingment). - save: bool - whether to save the result to a file or not. - subdir: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - """ - if integParams is None and autoFind is False: - log.warning('integParams not provided and autoFind is False. ' - 'Cannot compute peak integration parameters.') - return xr.DataArray() - - run = open_run(proposal, runNB) - run_mnemonics = mnemonics_for_run(run) - mnemo = run_mnemonics.get(mnemonic) - if mnemo is None: - log.warning('Mnemonic not found. Skipping.') - return xr.DataArray() - source, key = mnemo['source'], mnemo['key'] - extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern) - if extra_dim is None: - extra_dim = 'pulseId' - digitizer = digitizer_type(run, source) - if digitizer == 'FastADC': - pulse_period = 24 - if digitizer == 'ADQ412': - pulse_period = 440 - - pattern = None - try: - if bunchPattern == 'sase3': - pattern = XrayPulses(run) - if bunchPattern == 'scs_ppl': - pattern = OpticalLaserPulses(run) - except Exception as e: - log.warning(e) - bunchPattern = None - - # Extract pulse_ids, npulses and period from bunch pattern - pulse_ids, npulses, period = None, None, None - regular = True - if pattern is not None: - if pattern.is_constant_pattern() is False: - log.info('The number of pulses changed during the run.') - pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) - npulses, period = None, None - regular = False - else: - pulse_ids = pattern.peek_pulse_ids(labelled=False) - npulses = len(pulse_ids) - if npulses > 1: - periods = np.diff(pulse_ids) - if len(np.unique(periods)) > 1: - regular = False - else: - period = np.unique(periods)[0] * pulse_period - - # Use integration parameters, adjust them to match bunch pattern if necessary - if integParams is not None: - required_keys = ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop'] - if bunchPattern is None: - required_keys += ['period', 'npulses'] - if not all(name in integParams for name in required_keys): - raise TypeError('All keys of integParams argument ' - f'{required_keys} are required.') - params = integParams.copy() - autoFind = False - if pattern is not None: - if (npulses != params.get('npulses') or - period != params.get('period')): - log.warning(f'Integration parameters ' - f'(npulses={params.get("npulses")}, ' - f'period={params.get("period")}) do not match the ' - f'the bunch pattern (npulses={npulses}, ' - f'period={period}). Using bunch pattern parameters.') - params['npulses'] = npulses - params['period'] = period - else: - pulse_ids = np.arange(0, params['npulses'], params['period'], - dtype=np.uint64) - start = params['pulseStart'] - params['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period - for pid in pulse_ids] - - # load traces and generate average - traces = run[source, key].xarray(name=mnemonic, extra_dims=mnemo['dim']) - trace = traces.mean('trainId').rename(mnemonic.replace('raw', 'avg')) - - # find peak integration parameters - if autoFind == True: - params = find_integ_params(trace) - if (period is not None and params['period'] != period - or npulses is not None and params['npulses'] != npulses): - log.warning(f'Bunch pattern (npulses={npulses}, period={period}) and ' - f'found integration parameters (npulses={params["npulses"]}, ' - f'period={params["period"]}) do not match. Using bunch ' - 'pattern parameters.') - params["npulses"] = npulses - params["period"] = period - if pulse_ids is None: - pulse_ids = np.arange(params['npulses'], dtype=np.uint64) - start = params['pulseStart'] - params['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period - for pid in pulse_ids] - - # extract peaks - data = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim) - data = data.rename(mnemonic.replace('raw', 'peaks')) - - if regular is True: - data = data.assign_coords({extra_dim: pulse_ids}) - else: - mask = pattern.pulse_mask(labelled=False) - mask = xr.DataArray(mask, dims=['trainId', extra_dim], - coords={'trainId': run[source, key].train_ids, - extra_dim: np.arange(mask.shape[1])}) - mask = mask.sel({extra_dim: pulse_ids}) - data = data.where(mask, drop=True) - - data.attrs['params_keys'] = list(params.keys()) - if regular: - params['pulseStart'] = params['pulseStart'][0] - for p in params: - if params[p] is None: - params[p] = 'None' - data.attrs[f'params_{data.name}'] = list(params.values()) - if save: - save_peaks(proposal, runNB, data, trace, subdir) - return data - def save_peaks(proposal, runNB, peaks, avg_trace, subdir): ''' @@ -1620,7 +1473,7 @@ def save_peaks(proposal, runNB, peaks, avg_trace, subdir): the average raw trace over the trains subdir: str subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 Returns ------- @@ -1669,7 +1522,7 @@ def load_processed_peaks(proposal, runNB, mnemonic=None, If None, the entire dataset is loaded data: str subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 merge_with: xarray Dataset A dataset to merge the data with. @@ -1682,7 +1535,8 @@ def load_processed_peaks(proposal, runNB, mnemonic=None, ------- # load the mono energy and the MCP_BIG peaks run, ds = tb.load(proposal, runNB, 'nrj') - ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks', merge_with=ds) + ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks', + merge_with=ds) """ if mnemonic is None: return load_all_processed_peaks(proposal, runNB, data, merge_with) @@ -1692,18 +1546,18 @@ def load_processed_peaks(proposal, runNB, mnemonic=None, if os.path.isfile(fname): ds = xr.load_dataset(fname) if mnemonic not in ds: - print(f'Mnemonic {mnemonic} not found in {fname}') + log.warning(f'Mnemonic {mnemonic} not found in {fname}') return {} da = ds[mnemonic] - da.attrs['params_keys'] = ds.attrs['params_keys'] - da.attrs[f'params_{mnemonic}'] = ds.attrs[f'params_{mnemonic}'] + da.attrs = {k: ds.attrs[k] for k in ds.attrs if f'{mnemonic}_' in k} if merge_with is not None: return merge_with.merge(da.to_dataset(promote_attrs=True), - combine_attrs='drop_conflicts', join='inner') + combine_attrs='drop_conflicts', + join='inner') else: return da else: - print(f'Mnemonic {mnemonic} not found in {fname}') + log.warning(f'Mnemonic {mnemonic} not found in {fname}') return merge_with @@ -1722,7 +1576,7 @@ def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs', the run number data: str subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 merge_with: xarray Dataset A dataset to merge the data with. @@ -1736,17 +1590,20 @@ def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs', if os.path.isfile(fname): if merge_with is not None: return merge_with.merge(xr.load_dataset(fname), - combine_attrs='drop_conflicts', join='inner') + combine_attrs='drop_conflicts', + join='inner') return xr.load_dataset(fname) else: print(f'{fname} not found.') return merge_with -def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_runs', +def check_processed_peak_params(proposal, runNB, mnemonic, + data='usr/processed_runs', plot=True, show_all=False): """ - Check the integration parameters used to generate the processed peak values. + Check the integration parameters used to generate the processed + peak values. Parameters ---------- @@ -1759,7 +1616,7 @@ def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_r If None, the entire dataset is loaded data: str subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + <proposal path>/<subdir>/r{runNB:04d}/r{runNB:04d}-digitizers-data.h5 plot: bool If True, displays the raw trace and peak integration regions. show_all: bool @@ -1780,14 +1637,17 @@ def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_r if os.path.isfile(fname): ds = xr.load_dataset(fname) if mnemonic.replace('raw', 'peaks') not in ds: - print(f'Mnemonic {mnemonic} not found in {fname}') + log.warning(f'Mnemonic {mnemonic} not found in {fname}') return {} - da = ds[mnemonic] - params = dict(zip(ds.attrs['params_keys'], ds.attrs[f'params_{mnemonic}'])) + params = {k.replace(f'{mnemonic}_', ''): ds.attrs[k] for\ + k in ds.attrs if f'{mnemonic}_' in k} if plot: - plotPeakIntegrationWindow(ds[mnemonic.replace('peaks', 'avg')], + title = 'Processed peak parameters' + fig, ax = plotPeakIntegrationWindow(ds[mnemonic.replace('peaks', 'avg')], params, show_all=show_all) + fig.suptitle(f'p{proposal} r{runNB} '+ title, size=12) + return params else: - print(f'{fname} not found.') - return {} \ No newline at end of file + log.warning(f'{fname} not found.') + return {} diff --git a/src/toolbox_scs/load.py b/src/toolbox_scs/load.py index 275e555a12a1fcceae21c3983974014957370aa6..e2da7af2f8af6c9728c3df4cec431978ed4bc520 100644 --- a/src/toolbox_scs/load.py +++ b/src/toolbox_scs/load.py @@ -237,8 +237,8 @@ def load(proposalNB=None, runNB=None, bp = bunchPattern.get(k) if bp is None: continue - ds = tbdet.get_digitizer_peaks( - run, mnemonic=k, merge_with=ds, bunchPattern=bp) + ds = tbdet.get_digitizer_peaks(proposalNB, runNB, + mnemonic=k, merge_with=ds, bunchPattern=bp) if extract_xgm: for k, v in run_mnemonics.items(): if k not in ds or v.get('extract') != 'XGM':