From a6994a61e8aac2a88e949c93906ec0cb65c80a93 Mon Sep 17 00:00:00 2001 From: Laurent Mercadier <laurent.mercadier@xfel.eu> Date: Mon, 3 Feb 2025 13:53:16 +0100 Subject: [PATCH] Remove unused digitizer functions and adapt old DSSC code --- src/toolbox_scs/detectors/digitizers.py | 646 +------ src/toolbox_scs/detectors/digitizers_old.py | 1766 +++++++++++++++++++ src/toolbox_scs/detectors/dssc.py | 3 +- src/toolbox_scs/detectors/dssc_misc.py | 12 +- 4 files changed, 1776 insertions(+), 651 deletions(-) create mode 100644 src/toolbox_scs/detectors/digitizers_old.py diff --git a/src/toolbox_scs/detectors/digitizers.py b/src/toolbox_scs/detectors/digitizers.py index 67c2cac..58b3582 100644 --- a/src/toolbox_scs/detectors/digitizers.py +++ b/src/toolbox_scs/detectors/digitizers.py @@ -26,10 +26,6 @@ from extra.components import XrayPulses, OpticalLaserPulses __all__ = [ 'check_peak_params', 'get_digitizer_peaks', - 'get_laser_peaks', - 'get_peaks', - 'get_tim_peaks', - 'digitizer_signal_description', 'get_dig_avg_trace', 'load_processed_peaks', 'check_processed_peak_params' @@ -177,189 +173,6 @@ def peaks_from_apd(array, params, digitizer, bpt, bunchPattern): return array -def get_peaks(run, - data, - mnemonic, - useRaw=True, - autoFind=True, - integParams=None, - bunchPattern='sase3', - bpt=None, - extra_dim=None, - indices=None, - ): - """ - Extract peaks from one source (channel) of a digitizer. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data - data: xarray DataArray or str - array containing the raw traces or peak-integrated values from the - digitizer. If str, must be one of the ToolBox mnemonics. If None, - the data is loaded via the source and key arguments. - mnemonic: str or dict - ToolBox mnemonic or dict with source and key as in - {'source': 'SCS_UTC1_ADQ/ADC/1:network', - 'key': 'digitizers.channel_1_A.raw.samples'} - useRaw: bool - If True, extract peaks from raw traces. If False, uses the APD (or - peaks) data from the digitizer. - autoFind: bool - If True, finds integration parameters by inspecting the average raw - trace. Only valid if useRaw is True. - integParams: dict - dictionnary containing the integration parameters for raw trace - integration: 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', - 'period', 'npulses'. Not used if autoFind is True. All keys are - required when bunch pattern is missing. - bunchPattern: str - match the peaks to the bunch pattern: 'sase1', 'sase3', 'scs_ppl'. - This will dictate the name of the pulse ID coordinates: 'sa1_pId', - 'sa3_pId' or 'scs_ppl'. - bpt: xarray DataArray - bunch pattern table - extra_dim: str - Name given to the dimension along the peaks. If None, the name is given - according to the bunchPattern. - indices: array, slice - indices from the peak-integrated data to retrieve. Only required - when bunch pattern is missing and useRaw is False. - - Returns - ------- - xarray.DataArray containing digitizer peaks with pulse coordinates - """ - arr = data - dim = [d for d in arr.dims if d != 'trainId'][0] - - # Load bunch pattern table - run_mnemonics = mnemonics_for_run(run) - if bpt is None and bunchPattern != 'None': - if 'bunchPatternTable' in run_mnemonics: - m = run_mnemonics['bunchPatternTable'] - bpt = run.get_array(m['source'], m['key'], m['dim']) - pattern = bunchPattern - else: - pattern = bunchPattern - if bunchPattern == 'None': - bpt = None - - # Find digitizer type - m = mnemonic if isinstance(mnemonic, dict) else run_mnemonics[mnemonic] - digitizer = digitizer_type(run, m.get('source')) - - # 1. Peak-integrated data from digitizer - if useRaw is False: - # 1.1 No bunch pattern provided - if bpt is None: - log.info('Missing bunch pattern info.') - if indices is None: - raise TypeError('indices argument must be provided ' - 'when bunch pattern info is missing.') - if extra_dim is None: - extra_dim = 'pulseId' - return arr.isel({dim: indices}).rename({dim: extra_dim}) - - # 1.2 Bunch pattern is provided - if isinstance(mnemonic, dict): - peak_params = channel_peak_params(run, mnemonic.get('source'), - mnemonic.get('key')) - else: - peak_params = channel_peak_params(run, mnemonic) - log.debug(f'Digitizer peak integration parameters: {peak_params}') - return peaks_from_apd(arr, peak_params, digitizer, bpt, bunchPattern) - - # 2. Use raw data from digitizer - # minimum pulse period @ 4.5MHz, according to digitizer type - min_distance = 1 - if digitizer == 'FastADC': - min_distance = 24 - if digitizer == 'ADQ412': - min_distance = 440 - if autoFind: - stride = int(np.max([1, np.floor(arr.sizes['trainId']/200)])) - trace = arr.isel(trainId=slice(0, None, stride)).mean(dim='trainId') - try: - integParams = find_integ_params(trace) - except ValueError as err: - log.warning(f'{err}, trying with averaged trace over all trains.') - trace = arr.mean(dim='trainId') - integParams = find_integ_params(trace) - log.debug(f'Auto find peaks result: {integParams}') - - required_keys = ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop', 'period', 'npulses'] - if integParams is None or not all(name in integParams - for name in required_keys): - raise TypeError('All keys of integParams argument ' - f'{required_keys} are required when ' - 'bunch pattern info is missing.') - - # 2.1. No bunch pattern provided - if bpt is None: - log.info('Missing bunch pattern info.') - log.debug(f'Retrieving {integParams["npulses"]} pulses.') - if extra_dim is None: - extra_dim = 'pulseId' - return peaks_from_raw_trace(arr, integParams['pulseStart'], - integParams['pulseStop'], - integParams['baseStart'], - integParams['baseStop'], - integParams['period'], - integParams['npulses'], - extra_dim=extra_dim) - - # 2.2 Bunch pattern is provided - # load mask and extract pulse Id: - dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', - 'scs_ppl': 'ol_pId', 'None': 'pulseId'} - extra_dim = dim_names[pattern] - valid_tid = np.intersect1d(arr.trainId, bpt.trainId, assume_unique=True) - mask = is_pulse_at(bpt, pattern) - pattern_changed = ~(mask == mask[0]).all().values - if not pattern_changed: - pid = np.nonzero(mask[0].values)[0] - valid_arr = arr - else: - mask = is_pulse_at(bpt.sel(trainId=valid_tid), pattern) - mask = mask.rename({'pulse_slot': extra_dim}) - mask = mask.assign_coords( - {extra_dim: np.arange(bpt.sizes['pulse_slot'])}) - mask_on = mask.where(mask, drop=True).fillna(False).astype(bool) - log.info(f'Bunch pattern of {pattern} changed during the run.') - pid = mask_on.coords[extra_dim] - # select trains containing pulses - valid_arr = arr.sel(trainId=mask_on.trainId) - npulses = len(pid) - log.debug(f'Bunch pattern: {npulses} pulses for {pattern}.') - if npulses == 1: - period_bpt = 0 - else: - period_bpt = np.min(np.diff(pid)) - if autoFind and period_bpt*min_distance != integParams['period']: - log.warning('The period from the bunch pattern is different than ' - 'that found by the peak-finding algorithm. Either ' - 'the algorithm failed or the bunch pattern source ' - f'({bunchPattern}) is not correct.') - # create array of sample indices for peak integration - sample_id = (pid-pid[0])*min_distance - # override auto find parameters - if isinstance(integParams['pulseStart'], (int, np.integer)): - integParams['pulseStart'] = integParams['pulseStart'] + sample_id - peaks = peaks_from_raw_trace(valid_arr, integParams['pulseStart'], - integParams['pulseStop'], - integParams['baseStart'], - integParams['baseStop'], - integParams['period'], - integParams['npulses'], - extra_dim) - if pattern_changed: - peaks = peaks.where(mask_on, drop=True) - return peaks.assign_coords({extra_dim: pid}) - - def get_dig_avg_trace(run, mnemonic, ntrains=None): """ Compute the average over ntrains evenly spaced accross all trains @@ -537,131 +350,6 @@ def adq412_channel_peak_params(run, source, key=None, return result -def find_integ_params(trace, height=None, width=1): - """ - Find integration parameters for peak integration of a raw - digitizer trace. Based on scipy find_peaks(). - - Parameters - ---------- - trace: numpy array or xarray DataArray - The digitier raw trace used to find peaks - height: int - minimum threshold for peak determination - width: int - minimum width of peak - - Returns - ------- - dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', - 'period', 'npulses' and values in number of samples. - """ - if isinstance(trace, xr.DataArray): - trace = trace.values - trace_norm = trace - np.median(trace) - trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm - SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100]) - if SNR < 10: - log.warning('signal-to-noise ratio too low: cannot ' - 'automatically find peaks.') - return {'pulseStart': 2, 'pulseStop': 3, - 'baseStart': 0, 'baseStop': 1, - 'period': 0, 'npulses': 1} - # Compute autocorrelation using fftconvolve - # from "https://stackoverflow.com/questions/12323959/" - # "fast-cross-correlation-method-in-python" - if len(trace_norm) % 2 == 0: - n = len(trace_norm) - else: - n = len(trace_norm) - 1 - hl = int(n/2) - a = trace_norm[:n] - b = np.zeros(2*n) - b[hl: hl + n] = a - # Do an array flipped convolution, which is a correlation. - ac_trace = fftconvolve(b, a[::-1], mode='valid') - # slower approach: - # ac_trace = np.correlate(trace_norm, trace_norm, mode='same') - n = int(len(ac_trace)/2) - # estimate quality of ac_trace and define min height to detect peaks - factor = np.abs(ac_trace.max() / np.median(ac_trace)) - factor = 3 if factor > 20 else 1.5 - ac_peaks = find_peaks(ac_trace[n:], height=ac_trace[n:].max()/factor) - if len(ac_peaks[0]) == 0: - period = 0 - distance = 1 - elif len(ac_peaks[0]) == 1: - period = ac_peaks[0][0] - distance = max(1, period-6) - else: - period = int(np.median(ac_peaks[0][1:] - ac_peaks[0][:-1])) - distance = max(1, period-6) # smaller than period to account for all peaks - # define min height to detect peaks depending on signal quality - f = np.max([3, np.min([(SNR/10), 4])]) - height = trace_norm.max() / f - peaks, dic = find_peaks(trace_norm, distance=distance, - height=height, width=1) - # filter out peaks that are not periodically spaced - idx = np.ones(len(peaks), dtype=bool) - idx[1:] = np.isclose(peaks[1:] - peaks[:-1], - np.ones(len(peaks[1:]))*period, atol=6) - peaks = peaks[idx] - for d in dic: - dic[d] = dic[d][idx] - npulses = len(peaks) - if npulses == 0: - raise ValueError('Could not automatically find peaks.') - pulseStart = np.mean(dic['left_ips'] - peaks + peaks[0], dtype=int) - pulseStop = np.mean(dic['right_ips'] - peaks + peaks[0], dtype=int) - baseStop = pulseStart - np.mean(peaks - dic['left_ips'])/2 - 1 - baseStop = np.min([baseStop, pulseStart]).astype(int) - baseStop = np.max([baseStop, 0]).astype(int) - baseStart = baseStop - np.mean(dic['widths'])/2 - baseStart = np.max([baseStart, 0]).astype(int) - result = {'pulseStart': pulseStart, 'pulseStop': pulseStop, - 'baseStart': baseStart, 'baseStop': baseStop, - 'period': period, 'npulses': npulses} - return result - - -def get_peak_params(run, mnemonic, raw_trace=None, ntrains=200): - """ - Get the peak region and baseline region of a raw digitizer trace used - to compute the peak integration. These regions are either those of the - digitizer peak-integration settings or those determined in get_tim_peaks - or get_fast_adc_peaks when the inputs are raw traces. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. - raw_trace: optional, 1D numpy array or xarray DataArray - Raw trace to display. If None, the average raw trace over - ntrains of the corresponding channel is loaded (this can - be time-consuming). - ntrains: optional, int - Only used if raw_trace is None. Number of trains used to - calculate the average raw trace of the corresponding channel. - """ - run_mnemonics = mnemonics_for_run(run) - if mnemonic not in run_mnemonics: - raise ToolBoxValueError("mnemonic must be a ToolBox mnemonics") - if "raw" not in mnemonic: - params = channel_peak_params(run, mnemonic) - if 'enable' in params and params['enable'] == 0: - log.warning('The digitizer did not record peak-integrated data.') - title = 'Digitizer peak params' - else: - title = 'Auto-find peak params' - if raw_trace is None: - raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) - params = find_integ_params(raw_trace) - log.debug(f'{title} for {mnemonic}: {params}') - return params - - def find_peaks_in_raw_trace(trace, height=None, width=1, distance=24): """ Find integration parameters for peak integration of a raw @@ -843,8 +531,7 @@ def check_peak_params(proposal, runNB, mnemonic, raw_trace=None, Checks and plots the peak parameters (pulse window and baseline window of a raw digitizer trace) used to compute the peak integration. These parameters are either set by the digitizer peak-integration settings, - or are determined by a peak finding algorithm (used in get_tim_peaks - or get_fast_adc_peaks) when the inputs are raw traces. The parameters + or are determined by a peak finding algorithmes. The parameters can also be provided manually for visual inspection. The plot either shows the first and last pulse of the trace or the entire trace. @@ -1021,88 +708,6 @@ def digitizer_type(run, source): return ret -def get_tim_peaks(run, mnemonic=None, merge_with=None, - bunchPattern='sase3', integParams=None, - keepAllSase=False): - """ - Automatically computes TIM peaks. Sources can be loaded on the - fly via the mnemonics argument, or processed from an existing data - set (merge_with). The bunch pattern table is used to assign the - pulse id coordinates. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - mnemonics for TIM, e.g. "MCP2apd". - merge_with: xarray Dataset - If provided, the resulting Dataset will be merged with this - one. The TIM variables of merge_with (if any) will also be - computed and merged. - bunchPattern: str - 'sase1' or 'sase3' or 'scs_ppl', bunch pattern - used to extract peaks. The pulse ID dimension will be named - 'sa1_pId', 'sa3_pId' or 'ol_pId', respectively. - integParams: dict - dictionnary for raw trace integration, e.g. - {'pulseStart':100, 'pulsestop':200, 'baseStart':50, - 'baseStop':99, 'period':24, 'npulses':500}. - If None, integration parameters are computed automatically. - keepAllSase: bool - Only relevant in case of sase-dedicated trains. If True, all - trains are kept, else only those of the bunchPattern are kept. - - Returns - ------- - xarray Dataset with TIM variables substituted by - the peak caclulated values (e.g. "MCP2raw" becomes - "MCP2peaks"), merged with Dataset *merge_with* if provided. - """ - return get_digitizer_peaks(run, mnemonic, merge_with, - bunchPattern, integParams, - keepAllSase) - - -def get_laser_peaks(run, mnemonic=None, merge_with=None, - bunchPattern='scs_ppl', integParams=None): - """ - Extracts laser photodiode signal (peak intensity) from Fast ADC - digitizer. Sources can be loaded on the fly via the mnemonics - argument, and/or processed from an existing data set (merge_with). - The PP laser bunch pattern is used to assign the pulse idcoordinates. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - mnemonic for FastADC corresponding to laser signal, e.g. - "FastADC2peaks" or 'I0_ILHraw'. - merge_with: xarray Dataset - If provided, the resulting Dataset will be merged with this - one. The FastADC variables of merge_with (if any) will also be - computed and merged. - bunchPattern: str - 'sase1' or 'sase3' or 'scs_ppl', bunch pattern - used to extract peaks. - integParams: dict - dictionnary for raw trace integration, e.g. - {'pulseStart':100, 'pulsestop':200, 'baseStart':50, - 'baseStop':99, 'period':24, 'npulses':500}. - If None, integration parameters are computed - automatically. - - Returns - ------- - xarray Dataset with all Fast ADC variables substituted by - the peak caclulated values (e.g. "FastADC2raw" becomes - "FastADC2peaks"). - """ - return get_digitizer_peaks(run, mnemonic, merge_with, - bunchPattern, integParams, False) - - def get_digitizer_peaks(proposal, runNB, mnemonic, merge_with=None, bunchPattern='sase3', integParams=None, save=False, subdir='usr/processed_runs'): @@ -1206,255 +811,6 @@ def get_digitizer_peaks(proposal, runNB, mnemonic, merge_with=None, return ds -def digitizer_signal_description(run, digitizer=None): - """ - Check for the existence of signal description and return all corresponding - channels in a dictionnary. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - digitizer: str or list of str (default=None) - Name of the digitizer: one in ['FastADC', FastADC2', - 'ADQ412', 'ADQ412_2] - If None, all digitizers are used - - Returns - ------- - signal_description: dictionnary containing the signal description of - the digitizer channels. - - - Example - ------- - import toolbox_scs as tb - run = tb.open_run(3481, 100) - signals = tb.digitizer_signal_description(run) - signals_fadc2 = tb.digitizer_signal_description(run, 'FastADC2') - """ - if digitizer not in [None, 'FastADC', 'FastADC2']: - raise ValueError('digitizer must be one in ' - '["FastADC", "FastADC2"]') - if digitizer is None: - digitizer = ['FastADC', 'FastADC2', 'ADQ412', 'ADQ412_2'] - else: - digitizer = [digitizer] - def key_fadc(i): - if i > 9: - return None - return f'channel_{i}.signalDescription.value' - def key_adq412(i): - if i > 3: - return None - return f'board1.channel_{i}.description.value' - - sources = {'FastADC': ['SCS_UTC1_MCP/ADC/1', key_fadc], - 'FastADC2': ['SCS_UTC2_FADC/ADC/1', key_fadc], - 'ADQ412': ['SCS_UTC1_ADQ/ADC/1', key_adq412], - 'ADQ412_2': ['SCS_UTC2_ADQ/ADC/1', key_adq412]} - res = {} - for d in digitizer: - if sources[d][0] not in run.all_sources: - continue - if sources[d][1](0) not in run.get_run_values(sources[d][0]): - raise ValueError('No signal description available for ' - f'{d}: {sources[d][0]}') - for ch in range(10): - val = sources[d][1](ch) - if val is None: - break - desc = run.get_run_value(sources[d][0], val) - res[f'{d}_Ch{ch}'] = desc - return res - - -def calibrateTIM(data, rollingWindow=200, mcp=1, plot=False, use_apd=True, intstart=None, - intstop=None, bkgstart=None, bkgstop=None, t_offset=None, npulses_apd=None): - ''' Calibrate TIM signal (Peak-integrated signal) to the slow ion signal of SCS_XGM - (photocurrent read by Keithley, channel 'pulseEnergy.photonFlux.value'). - The aim is to find F so that E_tim_peak[uJ] = F x TIM_peak. For this, we want to - match the SASE3-only average TIM pulse peak per train (TIM_avg) to the slow XGM - signal E_slow. - Since E_slow is the average energy per pulse over all SASE1 and SASE3 - pulses (N1 and N3), we first extract the relative contribution C of the SASE3 pulses - by looking at the pulse-resolved signals of the SA3_XGM in the tunnel. - There, the signal of SASE1 is usually strong enough to be above noise level. - Let TIM_avg be the average of the TIM pulses (SASE3 only). - The calibration factor is then defined as: F = E_slow * C * (N1+N3) / ( N3 * TIM_avg ). - If N3 changes during the run, we locate the indices for which N3 is maximum and define - a window where to apply calibration (indices start/stop). - - Warning: the calibration does not include the transmission by the KB mirrors! - - Inputs: - data: xarray Dataset - rollingWindow: length of running average to calculate TIM_avg - mcp: MCP channel - plot: boolean. If True, plot calibration results. - use_apd: boolean. If False, the TIM pulse peaks are extract from raw traces using - getTIMapd - intstart: trace index of integration start - intstop: trace index of integration stop - bkgstart: trace index of background start - bkgstop: trace index of background stop - t_offset: index separation between two pulses - npulses_apd: number of pulses - - Output: - F: float, TIM calibration factor. - - ''' - start = 0 - stop = None - npulses = data['npulses_sase3'] - ntrains = npulses.shape[0] - if not np.all(npulses == npulses[0]): - start = np.argmax(npulses.values) - stop = ntrains + np.argmax(npulses.values[::-1]) - 1 - if stop - start < rollingWindow: - print('not enough consecutive data points with the largest number of pulses per train') - start += rollingWindow - stop = np.min((ntrains, stop+rollingWindow)) - filteredTIM = getTIMapd(data, mcp, use_apd, intstart, intstop, bkgstart, bkgstop, t_offset, npulses_apd) - sa3contrib = saseContribution(data, 'sase3', 'XTD10_XGM') - avgFast = filteredTIM.mean(axis=1).rolling(trainId=rollingWindow).mean() - ratio = ((data['npulses_sase3']+data['npulses_sase1']) * - data['SCS_photonFlux'] * sa3contrib) / (avgFast*data['npulses_sase3']) - F = float(ratio[start:stop].median().values) - - if plot: - fig = plt.figure(figsize=(8,5)) - ax = plt.subplot(211) - ax.set_title('E[uJ] = {:2e} x TIM (MCP{})'.format(F, mcp)) - ax.plot(data['SCS_photonFlux'], label='SCS XGM slow (all SASE)', color='C0') - slow_avg_sase3 = data['SCS_photonFlux']*(data['npulses_sase1'] - +data['npulses_sase3'])*sa3contrib/data['npulses_sase3'] - ax.plot(slow_avg_sase3, label='SCS XGM slow (SASE3 only)', color='C1') - ax.plot(avgFast*F, label='Calibrated TIM rolling avg', color='C2') - ax.legend(loc='upper left', fontsize=8) - ax.set_ylabel('Energy [$\mu$J]', size=10) - ax.plot(filteredTIM.mean(axis=1)*F, label='Calibrated TIM train avg', alpha=0.2, color='C9') - ax.legend(loc='best', fontsize=8, ncol=2) - plt.xlabel('train in run') - - ax = plt.subplot(234) - xgm_fast = selectSASEinXGM(data) - ax.scatter(filteredTIM, xgm_fast, s=5, alpha=0.1, rasterized=True) - fit, cov = np.polyfit(filteredTIM.values.flatten(),xgm_fast.values.flatten(),1, cov=True) - y=np.poly1d(fit) - x=np.linspace(filteredTIM.min(), filteredTIM.max(), 10) - ax.plot(x, y(x), lw=2, color='r') - ax.set_ylabel('Raw HAMP [$\mu$J]', size=10) - ax.set_xlabel('TIM (MCP{}) signal'.format(mcp), size=10) - ax.annotate(s='y(x) = F x + A\n'+ - 'F = %.3e\n$\Delta$F/F = %.2e\n'%(fit[0],np.abs(np.sqrt(cov[0,0])/fit[0]))+ - 'A = %.3e'%fit[1], - xy=(0.5,0.6), xycoords='axes fraction', fontsize=10, color='r') - print('TIM calibration factor: %e'%(F)) - - ax = plt.subplot(235) - ax.hist(filteredTIM.values.flatten()*F, bins=50, rwidth=0.8) - ax.set_ylabel('number of pulses', size=10) - ax.set_xlabel('Pulse energy MCP{} [uJ]'.format(mcp), size=10) - ax.set_yscale('log') - - ax = plt.subplot(236) - if not use_apd: - pulseStart = intstart - pulseStop = intstop - else: - pulseStart = data.attrs['run'].get_array( - 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStart.value')[0].values - pulseStop = data.attrs['run'].get_array( - 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStop.value')[0].values - - if 'MCP{}raw'.format(mcp) not in data: - tid, data = data.attrs['run'].train_from_index(0) - trace = data['SCS_UTC1_ADQ/ADC/1:network']['digitizers.channel_1_D.raw.samples'] - print('no raw data for MCP{}. Loading trace from MCP1'.format(mcp)) - label_trace='MCP1 Voltage [V]' - else: - trace = data['MCP{}raw'.format(mcp)][0] - label_trace='MCP{} Voltage [V]'.format(mcp) - ax.plot(trace[:pulseStop+25], 'o-', ms=2, label='trace') - ax.axvspan(pulseStart, pulseStop, color='C2', alpha=0.2, label='APD region') - ax.axvline(pulseStart, color='gray', ls='--') - ax.axvline(pulseStop, color='gray', ls='--') - ax.set_xlim(pulseStart - 25, pulseStop + 25) - ax.set_ylabel(label_trace, size=10) - ax.set_xlabel('sample #', size=10) - ax.legend(fontsize=8) - - return F - - -''' TIM calibration table - Dict with key= photon energy and value= array of polynomial coefficients for each MCP (1,2,3). - The polynomials correspond to a fit of the logarithm of the calibration factor as a function - of MCP voltage. If P is a polynomial and V the MCP voltage, the calibration factor (in microjoule - per APD signal) is given by -exp(P(V)). - This table was generated from the calibration of March 2019, proposal 900074, semester 201930, - runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 - runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 - runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 -''' -tim_calibration_table = { - 705.5: np.array([ - [-6.85344690e-12, 5.00931986e-08, -1.27206912e-04, 1.15596821e-01, -3.15215367e+01], - [ 1.25613942e-11, -5.41566381e-08, 8.28161004e-05, -7.27230153e-02, 3.10984925e+01], - [ 1.14094964e-12, 7.72658935e-09, -4.27504907e-05, 4.07253378e-02, -7.00773062e+00]]), - 779: np.array([ - [ 4.57610777e-12, -2.33282497e-08, 4.65978738e-05, -6.43305156e-02, 3.73958623e+01], - [ 2.96325102e-11, -1.61393276e-07, 3.32600044e-04, -3.28468195e-01, 1.28328844e+02], - [ 1.14521506e-11, -5.81980336e-08, 1.12518434e-04, -1.19072484e-01, 5.37601559e+01]]), - 851: np.array([ - [ 3.15774215e-11, -1.71452934e-07, 3.50316512e-04, -3.40098861e-01, 1.31064501e+02], - [5.36341958e-11, -2.92533156e-07, 6.00574534e-04, -5.71083140e-01, 2.10547161e+02], - [ 3.69445588e-11, -1.97731342e-07, 3.98203522e-04, -3.78338599e-01, 1.41894119e+02]]) -} - - -def timFactorFromTable(voltage, photonEnergy, mcp=1): - ''' Returns an energy calibration factor for TIM integrated peak signal (APD) - according to calibration from March 2019, proposal 900074, semester 201930, - runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 - runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 - runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 - Uses the tim_calibration_table declared above. - - Inputs: - voltage: MCP voltage in volts. - photonEnergy: FEL photon energy in eV. Calibration factor is linearly - interpolated between the known values from the calibration table. - mcp: MCP channel (1, 2, or 3). - - Output: - f: calibration factor in microjoule per APD signal - ''' - energies = np.sort([key for key in tim_calibration_table]) - if photonEnergy not in energies: - if photonEnergy > energies.max(): - photonEnergy = energies.max() - elif photonEnergy < energies.min(): - photonEnergy = energies.min() - else: - idx = np.searchsorted(energies, photonEnergy) - 1 - polyA = np.poly1d(tim_calibration_table[energies[idx]][mcp-1]) - polyB = np.poly1d(tim_calibration_table[energies[idx+1]][mcp-1]) - fA = -np.exp(polyA(voltage)) - fB = -np.exp(polyB(voltage)) - f = fA + (fB-fA)/(energies[idx+1]-energies[idx])*(photonEnergy - energies[idx]) - return f - poly = np.poly1d(tim_calibration_table[photonEnergy][mcp-1]) - f = -np.exp(poly(voltage)) - return f - -############################################################################################# -############################################################################################# -############################################################################################# - - def save_peaks(proposal, runNB, peaks, avg_trace, subdir): ''' Save the peaks extracted with extract_digitizer_peaks() as well as the diff --git a/src/toolbox_scs/detectors/digitizers_old.py b/src/toolbox_scs/detectors/digitizers_old.py new file mode 100644 index 0000000..109df50 --- /dev/null +++ b/src/toolbox_scs/detectors/digitizers_old.py @@ -0,0 +1,1766 @@ +""" Digitizers related sub-routines + + Copyright (2021) SCS Team. + + (contributions preferrably comply with pep8 code structure + guidelines.) +""" + +import logging +import os + +import numpy as np +import xarray as xr +import matplotlib.pyplot as plt +from scipy.signal import find_peaks +from scipy.signal import fftconvolve + +from ..misc.bunch_pattern_external import is_pulse_at +from ..util.exceptions import ToolBoxValueError +from ..mnemonics_machinery import (mnemonics_to_process, + mnemonics_for_run) +from extra_data import open_run +from extra_data.read_machinery import find_proposal +from extra.components import XrayPulses, OpticalLaserPulses + +__all__ = [ + 'check_peak_params', + 'get_digitizer_peaks', + 'get_laser_peaks', + 'get_peaks', + 'get_tim_peaks', + 'digitizer_signal_description', + 'get_dig_avg_trace', + 'extract_digitizer_peaks', + 'load_processed_peaks', + 'check_processed_peak_params' +] + +log = logging.getLogger(__name__) + + +def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop, + period=None, npulses=None, extra_dim=None): + """ + Computes peaks from raw digitizer traces by trapezoidal integration. + + Parameters + ---------- + traces: xarray DataArray or numpy array containing raw traces. If + numpy array is provided, the second dimension is that of the samples. + pulseStart: int or list or 1D-numpy array + trace index of integration start. If 1d array, each value is the start + of one peak. The period and npulses parameters are ignored. + pulseStop: int + trace index of integration stop. + baseStart: int + trace index of background start. + baseStop: int + trace index of background stop. + period: int + number of samples between two peaks. Ignored if intstart is a 1D-array. + npulses: int + number of pulses. Ignored if intstart is a 1D-array. + extra_dim: str + Name given to the dimension along the peaks. Defaults to 'pulseId'. + + Returns + ------- + xarray DataArray + """ + + assert len(traces.shape) == 2 + + if isinstance(traces, xr.DataArray): + ntid = traces.sizes['trainId'] + coords = traces.coords + traces = traces.values + if traces.shape[0] != ntid: + traces = traces.T + else: + coords = None + + if hasattr(pulseStart, '__len__'): + pulseStart = np.array(pulseStart) + pulses = pulseStart - pulseStart[0] + pulseStart = pulseStart[0] + else: + pulses = range(0, npulses*period, period) + + if extra_dim is None: + extra_dim = 'pulseId' + results = xr.DataArray(np.empty((traces.shape[0], len(pulses))), + coords=coords, + dims=['trainId', extra_dim]) + + for i, p in enumerate(pulses): + a = pulseStart + p + b = pulseStop + p + bkga = baseStart + p + bkgb = baseStop + p + if b > traces.shape[1]: + break + bg = np.outer(np.median(traces[:, bkga:bkgb], axis=1), + np.ones(b-a)) + results[:, i] = np.trapz(traces[:, a:b] - bg, axis=1) + return results + + +def peaks_from_apd(array, params, digitizer, bpt, bunchPattern): + """ + Extract peak-integrated data according to the bunch pattern. + + Parameters + ---------- + array: xarray DataArray + 2D array containing peak-integrated data + params: dict + peak-integration parameters of the digitizer + digitizer: str + digitizer type, one of {'FastADC', 'ADQ412'} + bpt: xarray DataArray + bunch pattern table + bunchPattern: str + one of {'sase1', 'sase3', 'scs_ppl'}, used to select pulses and + assign name of the pulse id dimension. + + Returns + ------- + xarray DataArray with pulse id coordinates. + """ + if params['enable'] == 0 or (array == 1.).all(): + raise ValueError('The digitizer did not record integrated peaks. ' + 'Consider using raw traces from the same channel ' + 'for peak integration.') + if digitizer == 'FastADC': + min_distance = 24 + if digitizer == 'ADQ412': + min_distance = 440 + period = params['period'] + if period % min_distance != 0: + log.warning(f'Warning: the pulse period ({period} samples) of ' + 'digitizer is not a multiple of the pulse separation at ' + f'4.5 MHz ({min_distance} samples). Pulse id assignment ' + 'is likely to fail.') + stride = int(period/min_distance) + npulses_apd = params['npulses'] + dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId'} + pulse_dim = dim_names[bunchPattern] + arr_dim = [dim for dim in array.dims if dim != 'trainId'][0] + if npulses_apd > array.sizes[arr_dim]: + log.warning(f'The digitizer was set to record {npulses_apd} pulses ' + f'but the array length is only {array.sizes[arr_dim]}.') + npulses_apd = array.sizes[arr_dim] + mask = is_pulse_at(bpt, bunchPattern).rename({'pulse_slot': pulse_dim}) + mask = mask.where(mask.trainId.isin(array.trainId), drop=True) + mask = mask.assign_coords({pulse_dim: np.arange(bpt.sizes['pulse_slot'])}) + pid = np.sort(np.unique(np.where(mask)[1])) + npulses_bpt = len(pid) + apd_coords = np.arange(pid[0], pid[0] + stride*npulses_apd, stride) + noalign = False + if len(np.intersect1d(apd_coords, pid, assume_unique=True)) < npulses_bpt: + log.warning('Not all pulses were recorded. The digitizer ' + 'was set to record pulse ids ' + f'{apd_coords[apd_coords<bpt.sizes["pulse_slot"]]} but the' + 'bunch pattern for' + f' {bunchPattern} is {pid}. Skipping pulse ID alignment.') + noalign = True + array = array.isel({arr_dim: slice(0, npulses_apd)}) + array = array.where(array != 1.) + if noalign: + return array + array = array.rename( + {arr_dim: pulse_dim}).assign_coords({pulse_dim: apd_coords}) + array, mask = xr.align(array, mask, join='inner') + array = array.where(mask, drop=True) + return array + + +def get_peaks(run, + data, + mnemonic, + useRaw=True, + autoFind=True, + integParams=None, + bunchPattern='sase3', + bpt=None, + extra_dim=None, + indices=None, + ): + """ + Extract peaks from one source (channel) of a digitizer. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data + data: xarray DataArray or str + array containing the raw traces or peak-integrated values from the + digitizer. If str, must be one of the ToolBox mnemonics. If None, + the data is loaded via the source and key arguments. + mnemonic: str or dict + ToolBox mnemonic or dict with source and key as in + {'source': 'SCS_UTC1_ADQ/ADC/1:network', + 'key': 'digitizers.channel_1_A.raw.samples'} + useRaw: bool + If True, extract peaks from raw traces. If False, uses the APD (or + peaks) data from the digitizer. + autoFind: bool + If True, finds integration parameters by inspecting the average raw + trace. Only valid if useRaw is True. + integParams: dict + dictionnary containing the integration parameters for raw trace + integration: 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', + 'period', 'npulses'. Not used if autoFind is True. All keys are + required when bunch pattern is missing. + bunchPattern: str + match the peaks to the bunch pattern: 'sase1', 'sase3', 'scs_ppl'. + This will dictate the name of the pulse ID coordinates: 'sa1_pId', + 'sa3_pId' or 'scs_ppl'. + bpt: xarray DataArray + bunch pattern table + extra_dim: str + Name given to the dimension along the peaks. If None, the name is given + according to the bunchPattern. + indices: array, slice + indices from the peak-integrated data to retrieve. Only required + when bunch pattern is missing and useRaw is False. + + Returns + ------- + xarray.DataArray containing digitizer peaks with pulse coordinates + """ + arr = data + dim = [d for d in arr.dims if d != 'trainId'][0] + + # Load bunch pattern table + run_mnemonics = mnemonics_for_run(run) + if bpt is None and bunchPattern != 'None': + if 'bunchPatternTable' in run_mnemonics: + m = run_mnemonics['bunchPatternTable'] + bpt = run.get_array(m['source'], m['key'], m['dim']) + pattern = bunchPattern + else: + pattern = bunchPattern + if bunchPattern == 'None': + bpt = None + + # Find digitizer type + m = mnemonic if isinstance(mnemonic, dict) else run_mnemonics[mnemonic] + digitizer = digitizer_type(run, m.get('source')) + + # 1. Peak-integrated data from digitizer + if useRaw is False: + # 1.1 No bunch pattern provided + if bpt is None: + log.info('Missing bunch pattern info.') + if indices is None: + raise TypeError('indices argument must be provided ' + 'when bunch pattern info is missing.') + if extra_dim is None: + extra_dim = 'pulseId' + return arr.isel({dim: indices}).rename({dim: extra_dim}) + + # 1.2 Bunch pattern is provided + if isinstance(mnemonic, dict): + peak_params = channel_peak_params(run, mnemonic.get('source'), + mnemonic.get('key')) + else: + peak_params = channel_peak_params(run, mnemonic) + log.debug(f'Digitizer peak integration parameters: {peak_params}') + return peaks_from_apd(arr, peak_params, digitizer, bpt, bunchPattern) + + # 2. Use raw data from digitizer + # minimum pulse period @ 4.5MHz, according to digitizer type + min_distance = 1 + if digitizer == 'FastADC': + min_distance = 24 + if digitizer == 'ADQ412': + min_distance = 440 + if autoFind: + stride = int(np.max([1, np.floor(arr.sizes['trainId']/200)])) + trace = arr.isel(trainId=slice(0, None, stride)).mean(dim='trainId') + try: + integParams = find_integ_params(trace) + except ValueError as err: + log.warning(f'{err}, trying with averaged trace over all trains.') + trace = arr.mean(dim='trainId') + integParams = find_integ_params(trace) + log.debug(f'Auto find peaks result: {integParams}') + + required_keys = ['pulseStart', 'pulseStop', 'baseStart', + 'baseStop', 'period', 'npulses'] + if integParams is None or not all(name in integParams + for name in required_keys): + raise TypeError('All keys of integParams argument ' + f'{required_keys} are required when ' + 'bunch pattern info is missing.') + + # 2.1. No bunch pattern provided + if bpt is None: + log.info('Missing bunch pattern info.') + log.debug(f'Retrieving {integParams["npulses"]} pulses.') + if extra_dim is None: + extra_dim = 'pulseId' + return peaks_from_raw_trace(arr, integParams['pulseStart'], + integParams['pulseStop'], + integParams['baseStart'], + integParams['baseStop'], + integParams['period'], + integParams['npulses'], + extra_dim=extra_dim) + + # 2.2 Bunch pattern is provided + # load mask and extract pulse Id: + dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', + 'scs_ppl': 'ol_pId', 'None': 'pulseId'} + extra_dim = dim_names[pattern] + valid_tid = np.intersect1d(arr.trainId, bpt.trainId, assume_unique=True) + mask = is_pulse_at(bpt, pattern) + pattern_changed = ~(mask == mask[0]).all().values + if not pattern_changed: + pid = np.nonzero(mask[0].values)[0] + valid_arr = arr + else: + mask = is_pulse_at(bpt.sel(trainId=valid_tid), pattern) + mask = mask.rename({'pulse_slot': extra_dim}) + mask = mask.assign_coords( + {extra_dim: np.arange(bpt.sizes['pulse_slot'])}) + mask_on = mask.where(mask, drop=True).fillna(False).astype(bool) + log.info(f'Bunch pattern of {pattern} changed during the run.') + pid = mask_on.coords[extra_dim] + # select trains containing pulses + valid_arr = arr.sel(trainId=mask_on.trainId) + npulses = len(pid) + log.debug(f'Bunch pattern: {npulses} pulses for {pattern}.') + if npulses == 1: + period_bpt = 0 + else: + period_bpt = np.min(np.diff(pid)) + if autoFind and period_bpt*min_distance != integParams['period']: + log.warning('The period from the bunch pattern is different than ' + 'that found by the peak-finding algorithm. Either ' + 'the algorithm failed or the bunch pattern source ' + f'({bunchPattern}) is not correct.') + # create array of sample indices for peak integration + sample_id = (pid-pid[0])*min_distance + # override auto find parameters + if isinstance(integParams['pulseStart'], (int, np.integer)): + integParams['pulseStart'] = integParams['pulseStart'] + sample_id + peaks = peaks_from_raw_trace(valid_arr, integParams['pulseStart'], + integParams['pulseStop'], + integParams['baseStart'], + integParams['baseStop'], + integParams['period'], + integParams['npulses'], + extra_dim) + if pattern_changed: + peaks = peaks.where(mask_on, drop=True) + return peaks.assign_coords({extra_dim: pid}) + + +def get_dig_avg_trace(run, mnemonic, ntrains=None): + """ + Compute the average over ntrains evenly spaced accross all trains + of a digitizer trace. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + mnemonic: str + ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. + ntrains: int + Number of trains used to calculate the average raw trace. + If None, all trains are used. + + Returns + ------- + trace: DataArray + The average digitizer trace + """ + run_mnemonics = mnemonics_for_run(run) + if ntrains is None: + sel = run + else: + total_tid = len(run.train_ids) + stride = int(np.max([1, np.floor(total_tid/ntrains)])) + sel = run.select_trains(np.s_[0:None:stride]) + m = run_mnemonics[mnemonic] + raw_trace = sel.get_array(m['source'], m['key'], m['dim']) + raw_trace = raw_trace.mean(dim='trainId') + return raw_trace + + +def channel_peak_params(run, source, key=None, channel=None, board=None): + """ + Extract peak-integration parameters used by a channel of the digitizer. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + source: str + ToolBox mnemonic of a digitizer data, e.g. "MCP2apd" or + "FastADC4peaks", or name of digitizer source, e.g. + 'SCS_UTC1_ADQ/ADC/1:network'. + key: str + optional, used in combination of source (if source is not a ToolBox + mnemonics) instead of digitizer, channel and board. + channel: int or str + The digitizer channel for which to retrieve the parameters. If None, + inferred from the source mnemonic or source + key arguments. + board: int + Board of the ADQ412. If None, inferred from the source mnemonic or + source + key arguments. + + Returns + ------- + dict with peak integration parameters. + """ + run_mnemonics = mnemonics_for_run(run) + if source in run_mnemonics: + m = run_mnemonics[source] + source = m['source'] + key = m['key'] + if 'network' in source: + return adq412_channel_peak_params(run, source, key, channel, board) + else: + return fastADC_channel_peak_params(run, source, channel) + + +def fastADC_channel_peak_params(run, source, channel=None): + """ + Extract peak-integration parameters used by a channel of the Fast ADC. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + source: str + Name of Fast ADC source, e.g. 'SCS_UTC1_MCP/ADC/1:channel_5.output'. + channel: int + The Fast ADC channel for which to retrieve the parameters. If None, + inferred from the source. + + Returns + ------- + dict with peak integration parameters. + """ + fastADC_keys = { + 'baseStart': ('baselineSettings.baseStart.value', + 'baseStart.value'), + 'baseStop': ('baseStop.value',), + 'baseLength': ('baselineSettings.length.value',), + 'enable': ('enablePeakComputation.value',), + 'pulseStart': ('initialDelay.value',), + 'pulseLength': ('peakSamples.value',), + 'npulses': ('numPulses.value',), + 'period': ('pulsePeriod.value',) + } + if channel is None: + channel = int(source.split(':')[1].split('.')[0].split('_')[1]) + baseName = f'channel_{channel}.' + source = source.split(':')[0] + data = run.select(source).train_from_index(0)[1][source] + result = {} + for mnemo, versions in fastADC_keys.items(): + for v in versions: + key = baseName + v + if key in data: + result[mnemo] = data[key] + if 'baseLength' in result: + result['baseStop'] = result['baseStart'] + \ + result.pop('baseLength') + if 'pulseLength' in result: + result['pulseStop'] = result['pulseStart'] + \ + result.pop('pulseLength') + return result + + +def adq412_channel_peak_params(run, source, key=None, + channel=None, board=None): + """ + Extract peak-integration parameters used by a channel of the ADQ412. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + source: str + Nname of ADQ412 source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'. + key: str + optional, e.g. 'digitizers.channel_1_D.apd.pulseIntegral', used in + combination of source instead of channel and board. + channel: int or str + The ADQ412 channel for which to retrieve the parameters. If None, + inferred from the source mnemonic or source + key arguments. + board: int + Board of the ADQ412. If None, inferred from the source mnemonic or + source + key arguments. + + Returns + ------- + dict with peak integration parameters. + """ + if key is None: + if channel is None or board is None: + raise ValueError('key or channel + board arguments are ' + 'missing.') + else: + k = key.split('.')[1].split('_') + ch_to_int = {'A': 0, 'B': 1, 'C': 2, 'D': 3} + channel = ch_to_int[k[2]] + board = k[1] + baseName = f'board{board}.apd.channel_{channel}.' + source = source.split(':')[0] + adq412_keys = { + 'baseStart': (baseName + 'baseStart.value',), + 'baseStop': (baseName + 'baseStop.value',), + 'enable': (baseName + 'enable.value',), + 'pulseStart': (baseName + 'pulseStart.value',), + 'pulseStop': (baseName + 'pulseStop.value',), + 'initialDelay': (baseName + 'initialDelay.value',), + 'upperLimit': (baseName + 'upperLimit.value',), + 'npulses': (f'board{board}.apd.numberOfPulses.value',) + } + + data = run.select(source).train_from_index(0)[1][source] + result = {} + for mnemo, versions in adq412_keys.items(): + for key in versions: + if key in data: + result[mnemo] = data[key] + result['period'] = result.pop('upperLimit') - \ + result.pop('initialDelay') + return result + + +def find_integ_params(trace, height=None, width=1): + """ + Find integration parameters for peak integration of a raw + digitizer trace. Based on scipy find_peaks(). + + Parameters + ---------- + trace: numpy array or xarray DataArray + The digitier raw trace used to find peaks + height: int + minimum threshold for peak determination + width: int + minimum width of peak + + Returns + ------- + dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', + 'period', 'npulses' and values in number of samples. + """ + if isinstance(trace, xr.DataArray): + trace = trace.values + trace_norm = trace - np.median(trace) + trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm + SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100]) + if SNR < 10: + log.warning('signal-to-noise ratio too low: cannot ' + 'automatically find peaks.') + return {'pulseStart': 2, 'pulseStop': 3, + 'baseStart': 0, 'baseStop': 1, + 'period': 0, 'npulses': 1} + # Compute autocorrelation using fftconvolve + # from "https://stackoverflow.com/questions/12323959/" + # "fast-cross-correlation-method-in-python" + if len(trace_norm) % 2 == 0: + n = len(trace_norm) + else: + n = len(trace_norm) - 1 + hl = int(n/2) + a = trace_norm[:n] + b = np.zeros(2*n) + b[hl: hl + n] = a + # Do an array flipped convolution, which is a correlation. + ac_trace = fftconvolve(b, a[::-1], mode='valid') + # slower approach: + # ac_trace = np.correlate(trace_norm, trace_norm, mode='same') + n = int(len(ac_trace)/2) + # estimate quality of ac_trace and define min height to detect peaks + factor = np.abs(ac_trace.max() / np.median(ac_trace)) + factor = 3 if factor > 20 else 1.5 + ac_peaks = find_peaks(ac_trace[n:], height=ac_trace[n:].max()/factor) + if len(ac_peaks[0]) == 0: + period = 0 + distance = 1 + elif len(ac_peaks[0]) == 1: + period = ac_peaks[0][0] + distance = max(1, period-6) + else: + period = int(np.median(ac_peaks[0][1:] - ac_peaks[0][:-1])) + distance = max(1, period-6) # smaller than period to account for all peaks + # define min height to detect peaks depending on signal quality + f = np.max([3, np.min([(SNR/10), 4])]) + height = trace_norm.max() / f + peaks, dic = find_peaks(trace_norm, distance=distance, + height=height, width=1) + # filter out peaks that are not periodically spaced + idx = np.ones(len(peaks), dtype=bool) + idx[1:] = np.isclose(peaks[1:] - peaks[:-1], + np.ones(len(peaks[1:]))*period, atol=6) + peaks = peaks[idx] + for d in dic: + dic[d] = dic[d][idx] + npulses = len(peaks) + if npulses == 0: + raise ValueError('Could not automatically find peaks.') + pulseStart = np.mean(dic['left_ips'] - peaks + peaks[0], dtype=int) + pulseStop = np.mean(dic['right_ips'] - peaks + peaks[0], dtype=int) + baseStop = pulseStart - np.mean(peaks - dic['left_ips'])/2 - 1 + baseStop = np.min([baseStop, pulseStart]).astype(int) + baseStop = np.max([baseStop, 0]).astype(int) + baseStart = baseStop - np.mean(dic['widths'])/2 + baseStart = np.max([baseStart, 0]).astype(int) + result = {'pulseStart': pulseStart, 'pulseStop': pulseStop, + 'baseStart': baseStart, 'baseStop': baseStop, + 'period': period, 'npulses': npulses} + return result + + +def get_peak_params(run, mnemonic, raw_trace=None, ntrains=None): + """ + Get the peak region and baseline region of a raw digitizer trace used + to compute the peak integration. These regions are either those of the + digitizer peak-integration settings or those determined in get_tim_peaks + or get_fast_adc_peaks when the inputs are raw traces. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + mnemonic: str + ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. + raw_trace: optional, 1D numpy array or xarray DataArray + Raw trace to display. If None, the average raw trace over + ntrains of the corresponding channel is loaded (this can + be time-consuming). + ntrains: optional, int + Only used if raw_trace is None. Number of trains used to + calculate the average raw trace of the corresponding channel. + """ + run_mnemonics = mnemonics_for_run(run) + if mnemonic not in run_mnemonics: + raise ToolBoxValueError("mnemonic must be a ToolBox mnemonics") + if "raw" not in mnemonic: + params = channel_peak_params(run, mnemonic) + if 'enable' in params and params['enable'] == 0: + log.warning('The digitizer did not record peak-integrated data.') + else: + if raw_trace is None: + raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) + params = find_integ_params(raw_trace) + return params + + +def find_peak_integration_parameters(run, mnemonic, raw_trace=None, + integParams=None, pattern=None): + ''' + Finds peak integration parameters. + ''' + run_mnemonics = mnemonics_for_run(run) + digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) + pulse_period = 24 if digitizer == "FastADC" else 440 + + if integParams is None: # load raw trace and find peaks + autoFind = True + if raw_trace is None: + raw_trace = get_dig_avg_trace(run, mnemonic) + params = get_peak_params(run, mnemonic, raw_trace) + else: # inspect provided parameters + autoFind = False + required_keys = ['pulseStart', 'pulseStop', 'baseStart', + 'baseStop'] + add_text = '' + if pattern is None and not hasattr(integParams['pulseStart'], + '__len__'): + required_keys += ['period', 'npulses'] + add_text = 'Bunch pattern not provided. ' + if not all(name in integParams for name in required_keys): + raise ValueError(add_text + 'All keys of integParams argument ' + f'{required_keys} are required.') + params = integParams.copy() + + # extract the pulse ids from the parameters (starting at 0) + if hasattr(params['pulseStart'], '__len__'): + if params.get('npulses') is not None and ( + params.get('npulses') != len(params['pulseStart']) ): + log.warning('The number of pulses does not match the length ' + 'of pulseStart. Using length of pulseStart as ' + 'the number of pulses.') + params['npulses'] = len(params['pulseStart']) + pulse_ids_params = ((np.array(params['pulseStart']) + - params['pulseStart'][0]) / pulse_period).astype(int) + else: + pulse_ids_params = np.arange(0, params['npulses'] * params['period'] / pulse_period, + params['period'] / pulse_period).astype(int) + + # Extract pulse_ids and npulses from bunch pattern + pulse_ids, npulses, period = None, None, None + regular = True + if pattern is not None: + bunchPattern = 'sase3' if hasattr(pattern, 'sase') else 'scs_ppl' + if pattern.is_constant_pattern() is False: + log.info('The number of pulses changed during the run.') + pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) + npulses, period = None, None + regular = False + else: + pulse_ids = pattern.peek_pulse_ids(labelled=False) + npulses = len(pulse_ids) + if npulses > 1: + periods = np.diff(pulse_ids) + if len(np.unique(periods)) > 1: + regular = False + else: + period = np.unique(periods)[0] * pulse_period + # Compare parameters with bunch pattern + if len(pulse_ids_params) == len(pulse_ids): + if not (pulse_ids_params == pulse_ids - pulse_ids[0]).all(): + log.warning('The provided pulseStart parameters do not match the ' + f'{bunchPattern} bunch pattern pulse ids. Using ' + 'bunch pattern parameters.') + pulse_ids_params = pulse_ids + + if (npulses != params.get('npulses') or + period != params.get('period')): + if autoFind: + add_text = 'Automatically found ' + else: + add_text = 'Provided ' + log.warning(add_text + 'integration parameters ' + f'(npulses={params.get("npulses")}, ' + f'period={params.get("period")}) do not match the ' + f'{bunchPattern} bunch pattern (npulses={npulses}, ' + f'period={period}). Using bunch pattern parameters.') + pulse_ids_params = pulse_ids + params['npulses'] = npulses + params['period'] = period + if not hasattr(params['pulseStart'], '__len__'): + start = params['pulseStart'] + else: + start = params['pulseStart'][0] + + params['pulseStart'] = np.array([int(start + (pid - pulse_ids_params[0]) * pulse_period) + for pid in pulse_ids_params]) + return params, pulse_ids_params, regular, raw_trace + + +def check_peak_params(run, mnemonic, raw_trace=None, ntrains=200, params=None, + plot=True, show_all=False, bunchPattern='sase3'): + """ + Checks and plots the peak parameters (pulse window and baseline window + of a raw digitizer trace) used to compute the peak integration. These + parameters are either set by the digitizer peak-integration settings, + or are determined by a peak finding algorithm (used in get_tim_peaks + or get_fast_adc_peaks) when the inputs are raw traces. The parameters + can also be provided manually for visual inspection. The plot either + shows the first and last pulse of the trace or the entire trace. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + mnemonic: str + ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. + raw_trace: optional, 1D numpy array or xarray DataArray + Raw trace to display. If None, the average raw trace over + ntrains of the corresponding channel is loaded (this can + be time-consuming). + ntrains: optional, int + Only used if raw_trace is None. Number of trains used to + calculate the average raw trace of the corresponding channel. + plot: bool + If True, displays the raw trace and peak integration regions. + show_all: bool + If True, displays the entire raw trace and all peak integration + regions (this can be time-consuming). + If False, shows the first and last pulse according to the bunchPattern. + bunchPattern: optional, str + Only used if plot is True. Checks the bunch pattern against + the digitizer peak parameters and shows potential mismatch. + + Returns + ------- + dictionnary of peak integration parameters + """ + ''' + run_mnemonics = mnemonics_for_run(run) + if raw_trace is None: + raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) + if params is None: + integParams = get_peak_params(run, mnemonic, raw_trace) + title = 'Auto-find peak params' + else: + title = 'Provided peak params' + integParams = params.copy() + if 'enable' in integParams and integParams['enable'] == 0: + log.warning('The digitizer did not record peak-integrated data.') + if not plot: + return integParams + + digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) + pulse_period = 24 if digitizer == "FastADC" else 440 + pattern = None + try: + if bunchPattern == 'sase3': + pattern = XrayPulses(run) + if bunchPattern == 'scs_ppl': + pattern = OpticalLaserPulses(run) + except Exception as e: + log.warning(e) + bunchPattern = None + + # Extract pulse_ids, npulses and period from bunch pattern + pulse_ids, npulses, period = None, None, None + regular = True + if pattern is not None: + if pattern.is_constant_pattern() is False: + log.info('The number of pulses changed during the run.') + pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) + npulses, period = None, None + regular = False + else: + pulse_ids = pattern.peek_pulse_ids(labelled=False) + npulses = len(pulse_ids) + if npulses > 1: + periods = np.diff(pulse_ids) + if len(np.unique(periods)) > 1: + regular = False + else: + period = np.unique(periods)[0] * pulse_period + bp_params = {} + bp_params['npulses'] = npulses + bp_params['period'] = period + bp_params['pulse_ids'] = pulse_ids + bp_params['regular'] = regular + + if regular: + print(f'bunch pattern {bunchPattern}: {bp_params["npulses"]} pulses,' + f' {bp_params["period"]} samples between two pulses') + else: + print(f'bunch pattern {bunchPattern}: Not a regular pattern. ' + f'{bp_params["npulses"]} pulses, pulse_ids=' + f'{bp_params["pulse_ids"]}.') + if (npulses != integParams.get('npulses') or + period != integParams.get('period')): + log.warning(f'Integration parameters ' + f'(npulses={integParams.get("npulses")}, ' + f'period={integParams.get("period")}) do not match the ' + f'the bunch pattern (npulses={npulses}, ' + f'period={period}). Using bunch pattern parameters.') + integParams['npulses'] = npulses + integParams['period'] = period + start = integParams['pulseStart'] + integParams['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period + for pid in pulse_ids] + else: + bp_params = None + print(f'{title}: {integParams["npulses"]} pulses, {integParams["period"]}' + ' samples between two pulses') + fig, ax = plotPeakIntegrationWindow(raw_trace, integParams, bp_params, show_all) + ax[0].set_ylabel(mnemonic.replace('peaks', 'raw').replace('apd', 'raw')) + fig.suptitle(title, size=12) + ''' + run_mnemonics = mnemonics_for_run(run) + digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) + pulse_period = 24 if digitizer == "FastADC" else 440 + if params is None: + title = 'Auto-find peak parameters' + else: + title = 'Provided peak parameters' + pattern = None + try: + if bunchPattern == 'sase3': + pattern = XrayPulses(run) + if bunchPattern == 'scs_ppl': + pattern = OpticalLaserPulses(run) + except Exception as e: + log.warning(e) + bunchPattern = None + integParams, pulse_ids, regular, raw_trace = find_peak_integration_parameters( + run, mnemonic, raw_trace, params, pattern) + if bunchPattern: + if regular: + print(f'bunch pattern {bunchPattern}: {len(pulse_ids)} pulses,' + f' {(pulse_ids[1] - pulse_ids[0]) * pulse_period} samples between two pulses') + print(title + ': ' + f' {integParams["npulses"]} pulses, ' + f'{integParams["period"]} samples between two pulses') + else: + print(f'bunch pattern {bunchPattern}: Not a regular pattern. ' + f'{len(pulse_ids)} pulses, pulse_ids=' + f'{pulse_ids}.') + if plot: + if raw_trace is None: + raw_trace = get_dig_avg_trace(run, mnemonic) + fig, ax = plotPeakIntegrationWindow(raw_trace, integParams, + show_all=show_all) + fig.suptitle(title, size=12) + return integParams + + +def plotPeakIntegrationWindow(raw_trace, params, show_all=False): + if hasattr(params['pulseStart'], '__len__'): + pulseStarts = np.array(params['pulseStart']) + pulseStops = params['pulseStop'] + pulseStarts - params['pulseStart'][0] + baseStarts = params['baseStart'] + pulseStarts - params['pulseStart'][0] + baseStops = params['baseStop'] + pulseStarts - params['pulseStart'][0] + else: + n = params['npulses'] + p = params['period'] + pulseStarts = [params['pulseStart'] + i*p for i in range(n)] + pulseStops = [params['pulseStop'] + i*p for i in range(n)] + baseStarts = [params['baseStart'] + i*p for i in range(n)] + baseStops = [params['baseStop'] + i*p for i in range(n)] + + if show_all: + fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True) + for i in range(len(pulseStarts)): + lbl = 'baseline' if i == 0 else None + lp = 'peak' if i == 0 else None + ax.axvline(baseStarts[i], ls='--', color='k') + ax.axvline(baseStops[i], ls='--', color='k') + ax.axvspan(baseStarts[i], baseStops[i], + alpha=0.5, color='grey', label=lbl) + ax.axvline(pulseStarts[i], ls='--', color='r') + ax.axvline(pulseStops[i], ls='--', color='r') + ax.axvspan(pulseStarts[i], pulseStops[i], + alpha=0.2, color='r', label=lp) + ax.plot(raw_trace, color='C0', label='raw trace') + ax.legend(fontsize=8) + return fig, [ax] + + fig, ax = plt.subplots(1, 2, figsize=(6, 3), constrained_layout=True) + for plot in range(2): + title = 'First pulse' if plot == 0 else 'Last pulse' + i = 0 if plot == 0 else -1 + ax[plot].axvline(baseStarts[i], ls='--', color='k') + ax[plot].axvline(baseStops[i], ls='--', color='k') + ax[plot].axvspan(baseStarts[i], baseStops[i], + alpha=0.5, color='grey', label='baseline') + ax[plot].axvline(pulseStarts[i], ls='--', color='r') + ax[plot].axvline(pulseStops[i], ls='--', color='r') + ax[plot].axvspan(pulseStarts[i], pulseStops[i], + alpha=0.2, color='r', label='peak') + xmin = np.max([0, baseStarts[i]-200]) + xmax = np.min([pulseStops[i]+200, raw_trace.size]) + ax[plot].plot(np.arange(xmin, xmax), raw_trace[xmin:xmax], color='C0', + label=title) + ax[plot].legend(fontsize=8) + ax[plot].set_xlim(xmin, xmax) + ax[plot].set_xlabel('digitizer samples') + ax[plot].set_title(title, size=10) + return fig, ax + + +def digitizer_type(run, source): + """ + Finds the digitizer type based on the class Id / name of the source. + Example source: 'SCS_UTC1_MCP/ADC/1'. Defaults to ADQ412 if not found. + """ + ret = None + if '_MCP/ADC/1' in source: + ret = 'FastADC' + if '_ADQ/ADC/1' in source: + ret = 'ADQ412' + if ret is None: + digi_dict = {'FastAdc': 'FastADC', + 'FastAdcLegacy': 'FastADC', + 'AdqDigitizer': 'ADQ412', + 'PyADCChannel': 'FastADC', + 'PyADCChannelLegacy': 'FastADC'} + try: + source = source.split(':')[0] + classId = run.get_run_value(source, 'classId.value') + ret = digi_dict.get(classId) + except Exception as e: + log.warning(str(e)) + log.warning(f'Could not find digitizer type from source {source}.') + ret = 'ADQ412' + return ret + + +def get_tim_peaks(run, mnemonic=None, merge_with=None, + bunchPattern='sase3', integParams=None, + keepAllSase=False): + """ + Automatically computes TIM peaks. Sources can be loaded on the + fly via the mnemonics argument, or processed from an existing data + set (merge_with). The bunch pattern table is used to assign the + pulse id coordinates. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + mnemonic: str + mnemonics for TIM, e.g. "MCP2apd". + merge_with: xarray Dataset + If provided, the resulting Dataset will be merged with this + one. The TIM variables of merge_with (if any) will also be + computed and merged. + bunchPattern: str + 'sase1' or 'sase3' or 'scs_ppl', bunch pattern + used to extract peaks. The pulse ID dimension will be named + 'sa1_pId', 'sa3_pId' or 'ol_pId', respectively. + integParams: dict + dictionnary for raw trace integration, e.g. + {'pulseStart':100, 'pulsestop':200, 'baseStart':50, + 'baseStop':99, 'period':24, 'npulses':500}. + If None, integration parameters are computed automatically. + keepAllSase: bool + Only relevant in case of sase-dedicated trains. If True, all + trains are kept, else only those of the bunchPattern are kept. + + Returns + ------- + xarray Dataset with TIM variables substituted by + the peak caclulated values (e.g. "MCP2raw" becomes + "MCP2peaks"), merged with Dataset *merge_with* if provided. + """ + return get_digitizer_peaks(run, mnemonic, merge_with, + bunchPattern, integParams, + keepAllSase) + + +def get_laser_peaks(run, mnemonic=None, merge_with=None, + bunchPattern='scs_ppl', integParams=None): + """ + Extracts laser photodiode signal (peak intensity) from Fast ADC + digitizer. Sources can be loaded on the fly via the mnemonics + argument, and/or processed from an existing data set (merge_with). + The PP laser bunch pattern is used to assign the pulse idcoordinates. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + mnemonic: str + mnemonic for FastADC corresponding to laser signal, e.g. + "FastADC2peaks" or 'I0_ILHraw'. + merge_with: xarray Dataset + If provided, the resulting Dataset will be merged with this + one. The FastADC variables of merge_with (if any) will also be + computed and merged. + bunchPattern: str + 'sase1' or 'sase3' or 'scs_ppl', bunch pattern + used to extract peaks. + integParams: dict + dictionnary for raw trace integration, e.g. + {'pulseStart':100, 'pulsestop':200, 'baseStart':50, + 'baseStop':99, 'period':24, 'npulses':500}. + If None, integration parameters are computed + automatically. + + Returns + ------- + xarray Dataset with all Fast ADC variables substituted by + the peak caclulated values (e.g. "FastADC2raw" becomes + "FastADC2peaks"). + """ + return get_digitizer_peaks(run, mnemonic, merge_with, + bunchPattern, integParams, False) + + +def get_digitizer_peaks(run, mnemonic, merge_with=None, + bunchPattern='sase3', integParams=None, + keepAllSase=False): + """ + Automatically computes digitizer peaks. A source can be loaded on the + fly via the mnemonic argument, or processed from an existing data set + (merge_with). The bunch pattern table is used to assign the pulse + id coordinates. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + mnemonic: str + mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd". + The data is either loaded from the DataCollection or taken from + merge_with. + merge_with: xarray Dataset + If provided, the resulting Dataset will be merged with this one. + bunchPattern: str or dict + 'sase1' or 'sase3' or 'scs_ppl', 'None': bunch pattern + integParams: dict + dictionnary for raw trace integration, e.g. + {'pulseStart':100, 'pulsestop':200, 'baseStart':50, + 'baseStop':99, 'period':24, 'npulses':500}. + If None, integration parameters are computed automatically. + keepAllSase: bool + Only relevant in case of sase-dedicated trains. If True, all + trains are kept, else only those of the bunchPattern are kept. + + Returns + ------- + xarray Dataset with digitizer peak variables. Raw variables are + substituted by the peak caclulated values (e.g. "FastADC2raw" becomes + "FastADC2peaks"). + """ + run_mnemonics = mnemonics_for_run(run) + if mnemonic not in run_mnemonics: + log.warning('Mnemonic not found in run. Skipping.') + return merge_with + if bool(merge_with) and mnemonic in merge_with: + for d in merge_with[mnemonic].dims: + if d in ['sa3_pId', 'ol_pId']: + log.warning(f'{mnemonic} already extracted. ' + 'Skipping.') + return merge_with + # check if bunch pattern table exists + bpt = None + if bool(merge_with) and 'bunchPatternTable' in merge_with: + bpt = merge_with['bunchPatternTable'] + log.debug('Using bpt from merge_with dataset.') + elif 'bunchPatternTable' in run_mnemonics: + m = run_mnemonics['bunchPatternTable'] + bpt = run.get_array(m['source'], m['key'], m['dim']) + log.debug('Loaded bpt from DataCollection.') + elif 'bunchPatternTable_SA3' in run_mnemonics: + m = run_mnemonics['bunchPatternTable_SA3'] + bpt = run.get_array(m['source'], m['key'], m['dim']) + log.debug('Loaded bpt from DataCollection.') + else: + log.warning('Could not load bunch pattern table.') + # prepare resulting dataset + if bool(merge_with): + mw_ds = merge_with.drop(mnemonic, errors='ignore') + else: + mw_ds = xr.Dataset() + # iterate over mnemonics and merge arrays in dataset + autoFind = True if integParams is None else False + m = run_mnemonics[mnemonic] + useRaw = True if 'raw' in mnemonic else False + if bool(merge_with) and mnemonic in merge_with: + data = merge_with[mnemonic] + else: + data = run.get_array(m['source'], m['key'], m['dim']) + peaks = get_peaks(run, data, mnemonic, + useRaw=useRaw, + autoFind=autoFind, + integParams=integParams, + bunchPattern=bunchPattern, + bpt=bpt) + name = mnemonic.replace('raw', 'peaks').replace('apd', 'peaks') + join = 'outer' if keepAllSase else 'inner' + ds = mw_ds.merge(peaks.rename(name), join=join) + return ds + + +def digitizer_signal_description(run, digitizer=None): + """ + Check for the existence of signal description and return all corresponding + channels in a dictionnary. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection containing the digitizer data. + digitizer: str or list of str (default=None) + Name of the digitizer: one in ['FastADC', FastADC2', + 'ADQ412', 'ADQ412_2] + If None, all digitizers are used + + Returns + ------- + signal_description: dictionnary containing the signal description of + the digitizer channels. + + + Example + ------- + import toolbox_scs as tb + run = tb.open_run(3481, 100) + signals = tb.digitizer_signal_description(run) + signals_fadc2 = tb.digitizer_signal_description(run, 'FastADC2') + """ + if digitizer not in [None, 'FastADC', 'FastADC2']: + raise ValueError('digitizer must be one in ' + '["FastADC", "FastADC2"]') + if digitizer is None: + digitizer = ['FastADC', 'FastADC2', 'ADQ412', 'ADQ412_2'] + else: + digitizer = [digitizer] + def key_fadc(i): + if i > 9: + return None + return f'channel_{i}.signalDescription.value' + def key_adq412(i): + if i > 3: + return None + return f'board1.channel_{i}.description.value' + + sources = {'FastADC': ['SCS_UTC1_MCP/ADC/1', key_fadc], + 'FastADC2': ['SCS_UTC2_FADC/ADC/1', key_fadc], + 'ADQ412': ['SCS_UTC1_ADQ/ADC/1', key_adq412], + 'ADQ412_2': ['SCS_UTC2_ADQ/ADC/1', key_adq412]} + res = {} + for d in digitizer: + if sources[d][0] not in run.all_sources: + continue + if sources[d][1](0) not in run.get_run_values(sources[d][0]): + raise ValueError('No signal description available for ' + f'{d}: {sources[d][0]}') + for ch in range(10): + val = sources[d][1](ch) + if val is None: + break + desc = run.get_run_value(sources[d][0], val) + res[f'{d}_Ch{ch}'] = desc + return res + + +def calibrateTIM(data, rollingWindow=200, mcp=1, plot=False, use_apd=True, intstart=None, + intstop=None, bkgstart=None, bkgstop=None, t_offset=None, npulses_apd=None): + ''' Calibrate TIM signal (Peak-integrated signal) to the slow ion signal of SCS_XGM + (photocurrent read by Keithley, channel 'pulseEnergy.photonFlux.value'). + The aim is to find F so that E_tim_peak[uJ] = F x TIM_peak. For this, we want to + match the SASE3-only average TIM pulse peak per train (TIM_avg) to the slow XGM + signal E_slow. + Since E_slow is the average energy per pulse over all SASE1 and SASE3 + pulses (N1 and N3), we first extract the relative contribution C of the SASE3 pulses + by looking at the pulse-resolved signals of the SA3_XGM in the tunnel. + There, the signal of SASE1 is usually strong enough to be above noise level. + Let TIM_avg be the average of the TIM pulses (SASE3 only). + The calibration factor is then defined as: F = E_slow * C * (N1+N3) / ( N3 * TIM_avg ). + If N3 changes during the run, we locate the indices for which N3 is maximum and define + a window where to apply calibration (indices start/stop). + + Warning: the calibration does not include the transmission by the KB mirrors! + + Inputs: + data: xarray Dataset + rollingWindow: length of running average to calculate TIM_avg + mcp: MCP channel + plot: boolean. If True, plot calibration results. + use_apd: boolean. If False, the TIM pulse peaks are extract from raw traces using + getTIMapd + intstart: trace index of integration start + intstop: trace index of integration stop + bkgstart: trace index of background start + bkgstop: trace index of background stop + t_offset: index separation between two pulses + npulses_apd: number of pulses + + Output: + F: float, TIM calibration factor. + + ''' + start = 0 + stop = None + npulses = data['npulses_sase3'] + ntrains = npulses.shape[0] + if not np.all(npulses == npulses[0]): + start = np.argmax(npulses.values) + stop = ntrains + np.argmax(npulses.values[::-1]) - 1 + if stop - start < rollingWindow: + print('not enough consecutive data points with the largest number of pulses per train') + start += rollingWindow + stop = np.min((ntrains, stop+rollingWindow)) + filteredTIM = getTIMapd(data, mcp, use_apd, intstart, intstop, bkgstart, bkgstop, t_offset, npulses_apd) + sa3contrib = saseContribution(data, 'sase3', 'XTD10_XGM') + avgFast = filteredTIM.mean(axis=1).rolling(trainId=rollingWindow).mean() + ratio = ((data['npulses_sase3']+data['npulses_sase1']) * + data['SCS_photonFlux'] * sa3contrib) / (avgFast*data['npulses_sase3']) + F = float(ratio[start:stop].median().values) + + if plot: + fig = plt.figure(figsize=(8,5)) + ax = plt.subplot(211) + ax.set_title('E[uJ] = {:2e} x TIM (MCP{})'.format(F, mcp)) + ax.plot(data['SCS_photonFlux'], label='SCS XGM slow (all SASE)', color='C0') + slow_avg_sase3 = data['SCS_photonFlux']*(data['npulses_sase1'] + +data['npulses_sase3'])*sa3contrib/data['npulses_sase3'] + ax.plot(slow_avg_sase3, label='SCS XGM slow (SASE3 only)', color='C1') + ax.plot(avgFast*F, label='Calibrated TIM rolling avg', color='C2') + ax.legend(loc='upper left', fontsize=8) + ax.set_ylabel('Energy [$\mu$J]', size=10) + ax.plot(filteredTIM.mean(axis=1)*F, label='Calibrated TIM train avg', alpha=0.2, color='C9') + ax.legend(loc='best', fontsize=8, ncol=2) + plt.xlabel('train in run') + + ax = plt.subplot(234) + xgm_fast = selectSASEinXGM(data) + ax.scatter(filteredTIM, xgm_fast, s=5, alpha=0.1, rasterized=True) + fit, cov = np.polyfit(filteredTIM.values.flatten(),xgm_fast.values.flatten(),1, cov=True) + y=np.poly1d(fit) + x=np.linspace(filteredTIM.min(), filteredTIM.max(), 10) + ax.plot(x, y(x), lw=2, color='r') + ax.set_ylabel('Raw HAMP [$\mu$J]', size=10) + ax.set_xlabel('TIM (MCP{}) signal'.format(mcp), size=10) + ax.annotate(s='y(x) = F x + A\n'+ + 'F = %.3e\n$\Delta$F/F = %.2e\n'%(fit[0],np.abs(np.sqrt(cov[0,0])/fit[0]))+ + 'A = %.3e'%fit[1], + xy=(0.5,0.6), xycoords='axes fraction', fontsize=10, color='r') + print('TIM calibration factor: %e'%(F)) + + ax = plt.subplot(235) + ax.hist(filteredTIM.values.flatten()*F, bins=50, rwidth=0.8) + ax.set_ylabel('number of pulses', size=10) + ax.set_xlabel('Pulse energy MCP{} [uJ]'.format(mcp), size=10) + ax.set_yscale('log') + + ax = plt.subplot(236) + if not use_apd: + pulseStart = intstart + pulseStop = intstop + else: + pulseStart = data.attrs['run'].get_array( + 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStart.value')[0].values + pulseStop = data.attrs['run'].get_array( + 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStop.value')[0].values + + if 'MCP{}raw'.format(mcp) not in data: + tid, data = data.attrs['run'].train_from_index(0) + trace = data['SCS_UTC1_ADQ/ADC/1:network']['digitizers.channel_1_D.raw.samples'] + print('no raw data for MCP{}. Loading trace from MCP1'.format(mcp)) + label_trace='MCP1 Voltage [V]' + else: + trace = data['MCP{}raw'.format(mcp)][0] + label_trace='MCP{} Voltage [V]'.format(mcp) + ax.plot(trace[:pulseStop+25], 'o-', ms=2, label='trace') + ax.axvspan(pulseStart, pulseStop, color='C2', alpha=0.2, label='APD region') + ax.axvline(pulseStart, color='gray', ls='--') + ax.axvline(pulseStop, color='gray', ls='--') + ax.set_xlim(pulseStart - 25, pulseStop + 25) + ax.set_ylabel(label_trace, size=10) + ax.set_xlabel('sample #', size=10) + ax.legend(fontsize=8) + + return F + + +''' TIM calibration table + Dict with key= photon energy and value= array of polynomial coefficients for each MCP (1,2,3). + The polynomials correspond to a fit of the logarithm of the calibration factor as a function + of MCP voltage. If P is a polynomial and V the MCP voltage, the calibration factor (in microjoule + per APD signal) is given by -exp(P(V)). + This table was generated from the calibration of March 2019, proposal 900074, semester 201930, + runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 + runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 + runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 +''' +tim_calibration_table = { + 705.5: np.array([ + [-6.85344690e-12, 5.00931986e-08, -1.27206912e-04, 1.15596821e-01, -3.15215367e+01], + [ 1.25613942e-11, -5.41566381e-08, 8.28161004e-05, -7.27230153e-02, 3.10984925e+01], + [ 1.14094964e-12, 7.72658935e-09, -4.27504907e-05, 4.07253378e-02, -7.00773062e+00]]), + 779: np.array([ + [ 4.57610777e-12, -2.33282497e-08, 4.65978738e-05, -6.43305156e-02, 3.73958623e+01], + [ 2.96325102e-11, -1.61393276e-07, 3.32600044e-04, -3.28468195e-01, 1.28328844e+02], + [ 1.14521506e-11, -5.81980336e-08, 1.12518434e-04, -1.19072484e-01, 5.37601559e+01]]), + 851: np.array([ + [ 3.15774215e-11, -1.71452934e-07, 3.50316512e-04, -3.40098861e-01, 1.31064501e+02], + [5.36341958e-11, -2.92533156e-07, 6.00574534e-04, -5.71083140e-01, 2.10547161e+02], + [ 3.69445588e-11, -1.97731342e-07, 3.98203522e-04, -3.78338599e-01, 1.41894119e+02]]) +} + + +def timFactorFromTable(voltage, photonEnergy, mcp=1): + ''' Returns an energy calibration factor for TIM integrated peak signal (APD) + according to calibration from March 2019, proposal 900074, semester 201930, + runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 + runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 + runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 + Uses the tim_calibration_table declared above. + + Inputs: + voltage: MCP voltage in volts. + photonEnergy: FEL photon energy in eV. Calibration factor is linearly + interpolated between the known values from the calibration table. + mcp: MCP channel (1, 2, or 3). + + Output: + f: calibration factor in microjoule per APD signal + ''' + energies = np.sort([key for key in tim_calibration_table]) + if photonEnergy not in energies: + if photonEnergy > energies.max(): + photonEnergy = energies.max() + elif photonEnergy < energies.min(): + photonEnergy = energies.min() + else: + idx = np.searchsorted(energies, photonEnergy) - 1 + polyA = np.poly1d(tim_calibration_table[energies[idx]][mcp-1]) + polyB = np.poly1d(tim_calibration_table[energies[idx+1]][mcp-1]) + fA = -np.exp(polyA(voltage)) + fB = -np.exp(polyB(voltage)) + f = fA + (fB-fA)/(energies[idx+1]-energies[idx])*(photonEnergy - energies[idx]) + return f + poly = np.poly1d(tim_calibration_table[photonEnergy][mcp-1]) + f = -np.exp(poly(voltage)) + return f + +############################################################################################# +############################################################################################# +############################################################################################# + +def extract_digitizer_peaks(proposal, runNB, mnemonic, bunchPattern=None, + integParams=None, save=False, + subdir='usr/processed_runs'): + """ + Extract the peaks from digitizer raw traces and saves them into a file. + The calculation is a trapezoidal integration between 'pulseStart' and + 'pulseStop' with subtraction of a baseline defined as the median between + 'baseStart' and 'baseStop'. + The integration parameters can either be provided using integParams, or + determined by a peak finding algorithm using autoFind. If the bunchPattern + argument is provided, the pulse ids are aligned to it. + + Parameters + ---------- + proposal: int + the proposal number + runNB: int + the run number + mnemonic: str + the mnemonic containing raw traces. Example: 'XRD_MCP_BIGraw' + bunchPattern: str + 'sase3' or 'scs_ppl'. If provided, checks the bunch pattern table using + Extra XrayPulses or OpticalPulses and aligns the pulse ids. + If None, the pulse ids are not aligned and indexed between 0 and the + number of pulses per train. + integParams: dict + dictionnary with keys ['pulseStart', 'pulseStop', 'baseStart', + 'baseStop', 'period', 'npulses']. If provided, autoFind is set to False. + If bunchPattern is not None, 'period' and 'npulses' are adjusted to match + the bunch pattern and align the pulse ids. + save: bool + whether to save the result to a file or not. + subdir: str + subdirectory. The data is stored in + <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + """ + if integParams is None and autoFind is False: + log.warning('integParams not provided and autoFind is False. ' + 'Cannot compute peak integration parameters.') + return xr.DataArray() + + run = open_run(proposal, runNB) + run_mnemonics = mnemonics_for_run(run) + mnemo = run_mnemonics.get(mnemonic) + if mnemo is None: + log.warning('Mnemonic not found. Skipping.') + return xr.DataArray() + source, key = mnemo['source'], mnemo['key'] + extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern) + if extra_dim is None: + extra_dim = 'pulseId' + digitizer = digitizer_type(run, source) + if digitizer == 'FastADC': + pulse_period = 24 + if digitizer == 'ADQ412': + pulse_period = 440 + pattern = None + try: + if bunchPattern == 'sase3': + pattern = XrayPulses(run) + if bunchPattern == 'scs_ppl': + pattern = OpticalLaserPulses(run) + except Exception as e: + log.warning(e) + ''' + pattern = None + try: + if bunchPattern == 'sase3': + pattern = XrayPulses(run) + if bunchPattern == 'scs_ppl': + pattern = OpticalLaserPulses(run) + except Exception as e: + log.warning(e) + bunchPattern = None + + # Extract pulse_ids, npulses and period from bunch pattern + pulse_ids, npulses, period = None, None, None + regular = True + if pattern is not None: + if pattern.is_constant_pattern() is False: + log.info('The number of pulses changed during the run.') + pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) + npulses, period = None, None + regular = False + else: + pulse_ids = pattern.peek_pulse_ids(labelled=False) + npulses = len(pulse_ids) + if npulses > 1: + periods = np.diff(pulse_ids) + if len(np.unique(periods)) > 1: + regular = False + else: + period = np.unique(periods)[0] * pulse_period + + # Use integration parameters, adjust them to match bunch pattern if necessary + if integParams is not None: + required_keys = ['pulseStart', 'pulseStop', 'baseStart', + 'baseStop'] + if bunchPattern is None: + required_keys += ['period', 'npulses'] + if not all(name in integParams for name in required_keys): + raise TypeError('All keys of integParams argument ' + f'{required_keys} are required.') + params = integParams.copy() + if pattern is not None: + if (npulses != params.get('npulses') or + period != params.get('period')): + log.warning(f'Integration parameters ' + f'(npulses={params.get("npulses")}, ' + f'period={params.get("period")}) do not match the ' + f'the bunch pattern (npulses={npulses}, ' + f'period={period}). Using bunch pattern parameters.') + params['npulses'] = npulses + params['period'] = period + else: + pulse_ids = np.arange(0, params['npulses'], params['period'], + dtype=np.uint64) + start = params['pulseStart'] + params['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period + for pid in pulse_ids] + ''' + + # load traces and calculate the average + traces = run[source, key].xarray(name=mnemonic, extra_dims=mnemo['dim']) + trace = traces.mean('trainId').rename(mnemonic.replace('raw', 'avg')) + + params, pulse_ids, regular, trace = find_peak_integration_parameters( + run, mnemonic, trace, integParams, pattern) + + ''' + # find peak integration parameters + if integParams is None: + params = find_integ_params(trace) + if (period is not None and params['period'] != period + or npulses is not None and params['npulses'] != npulses): + log.warning(f'Bunch pattern (npulses={npulses}, period={period}) and ' + f'found integration parameters (npulses={params["npulses"]}, ' + f'period={params["period"]}) do not match. Using bunch ' + 'pattern parameters.') + params["npulses"] = npulses + params["period"] = period + if pulse_ids is None: + pulse_ids = np.arange(params['npulses'], dtype=np.uint64) + start = params['pulseStart'] + params['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period + for pid in pulse_ids] + ''' + + # extract peaks + data = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim) + data = data.rename(mnemonic.replace('raw', 'peaks')) + + if regular is True: + data = data.assign_coords({extra_dim: pulse_ids}) + else: + mask = pattern.pulse_mask(labelled=False) + mask = xr.DataArray(mask, dims=['trainId', extra_dim], + coords={'trainId': run[source, key].train_ids, + extra_dim: np.arange(mask.shape[1])}) + mask = mask.sel({extra_dim: pulse_ids}) + data = data.where(mask, drop=True) + + data.attrs['params_keys'] = list(params.keys()) + if regular: + params['pulseStart'] = params['pulseStart'][0] + for p in params: + if params[p] is None: + params[p] = 'None' + data.attrs[f'params_{data.name}'] = list(params.values()) + if save: + save_peaks(proposal, runNB, data, trace, subdir) + return data + + +def save_peaks(proposal, runNB, peaks, avg_trace, subdir): + ''' + Save the peaks extracted with extract_digitizer_peaks() as well as the + average raw trace in a dataset at the proposal + subdir location. + If a dataset already exists, the new data is merged with it. + + Parameters + ---------- + proposal: int + the proposal number + runNB: int + the run number + peaks: xarray DataArray + the 2D-array obtained by extract_digitizer_peaks() + avg_trace: xarray DataArray + the average raw trace over the trains + subdir: str + subdirectory. The data is stored in + <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + + Returns + ------- + None + ''' + root = find_proposal(f'p{proposal:06d}') + path = os.path.join(root, subdir + f'/r{runNB:04d}/') + os.makedirs(path, exist_ok=True) + fname = path + f'r{runNB:04d}-digitizers-data.h5' + ds_peaks = peaks.to_dataset(promote_attrs=True) + + if os.path.isfile(fname): + ds = xr.load_dataset(fname) + ds = ds.drop_vars([peaks.name, avg_trace.name], errors='ignore') + for dim in ds.dims: + if all(dim not in ds[v].dims for v in ds): + ds = ds.drop_dims(dim) + dim_name = 'sampleId' + if 'sampleId' in ds.dims and ds.sizes['sampleId'] != len(avg_trace): + dim_name = 'sampleId2' + avg_trace = avg_trace.rename({avg_trace.dims[0]: dim_name}) + if f'params_{peaks.name}' in ds.attrs: + del ds.attrs[f'params_{peaks.name}'] + ds = xr.merge([ds, ds_peaks, avg_trace], + combine_attrs='drop_conflicts', join='inner') + else: + ds = ds_peaks.merge(avg_trace.rename({avg_trace.dims[0]: 'sampleId'})) + ds.to_netcdf(fname, format='NETCDF4') + print(f'saved data into {fname}.') + return None + + +def load_processed_peaks(proposal, runNB, mnemonic=None, + data='usr/processed_runs', merge_with=None): + """ + Load processed digitizer peaks data. + + Parameters + ---------- + proposal: int + the proposal number + runNB: int + the run number + mnemonic: str + the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. + If None, the entire dataset is loaded + data: str + subdirectory. The data is stored in + <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + merge_with: xarray Dataset + A dataset to merge the data with. + + Returns + ------- + xarray DataArray if menmonic is not None and merge_with is None + xarray Dataset if mnemonic is None or merge_with is not None. + + Example + ------- + # load the mono energy and the MCP_BIG peaks + run, ds = tb.load(proposal, runNB, 'nrj') + ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks', merge_with=ds) + """ + if mnemonic is None: + return load_all_processed_peaks(proposal, runNB, data, merge_with) + root = find_proposal(f'p{proposal:06d}') + path = os.path.join(root, data + f'/r{runNB:04d}/') + fname = path + f'r{runNB:04d}-digitizers-data.h5' + if os.path.isfile(fname): + ds = xr.load_dataset(fname) + if mnemonic not in ds: + print(f'Mnemonic {mnemonic} not found in {fname}') + return {} + da = ds[mnemonic] + da.attrs['params_keys'] = ds.attrs['params_keys'] + da.attrs[f'params_{mnemonic}'] = ds.attrs[f'params_{mnemonic}'] + if merge_with is not None: + return merge_with.merge(da.to_dataset(promote_attrs=True), + combine_attrs='drop_conflicts', join='inner') + else: + return da + else: + print(f'Mnemonic {mnemonic} not found in {fname}') + return merge_with + + +def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs', + merge_with=None): + """ + Load processed digitizer peaks dataset. The data contains the peaks, + the average raw trace and the integration parameters (attribute) of + each processed digitizer source. + + Parameters + ---------- + proposal: int + the proposal number + runNB: int + the run number + data: str + subdirectory. The data is stored in + <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + merge_with: xarray Dataset + A dataset to merge the data with. + + Returns + ------- + xarray Dataset + """ + root = find_proposal(f'p{proposal:06d}') + path = os.path.join(root, data + f'/r{runNB:04d}/') + fname = path + f'r{runNB:04d}-digitizers-data.h5' + if os.path.isfile(fname): + if merge_with is not None: + return merge_with.merge(xr.load_dataset(fname), + combine_attrs='drop_conflicts', join='inner') + return xr.load_dataset(fname) + else: + print(f'{fname} not found.') + return merge_with + + +def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_runs', + plot=True, show_all=False): + """ + Check the integration parameters used to generate the processed peak values. + + Parameters + ---------- + proposal: int + the proposal number + runNB: int + the run number + mnemonic: str + the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. + If None, the entire dataset is loaded + data: str + subdirectory. The data is stored in + <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' + plot: bool + If True, displays the raw trace and peak integration regions. + show_all: bool + If True, displays the entire raw trace and all peak integration + regions (this can be time-consuming). + If False, shows the first and last pulses. + + Returns + ------- + params: dict + the integration parameters with keys ['pulseStart', 'pulseStop', + 'baseStart', 'baseStop', 'period', 'npulses']. + See `extract_digitizer_peaks()`. + """ + root = find_proposal(f'p{proposal:06d}') + path = os.path.join(root, data + f'/r{runNB:04d}/') + fname = path + f'r{runNB:04d}-digitizers-data.h5' + if os.path.isfile(fname): + ds = xr.load_dataset(fname) + if mnemonic.replace('raw', 'peaks') not in ds: + print(f'Mnemonic {mnemonic} not found in {fname}') + return {} + da = ds[mnemonic] + params = dict(zip(ds.attrs['params_keys'], ds.attrs[f'params_{mnemonic}'])) + if plot: + plotPeakIntegrationWindow(ds[mnemonic.replace('peaks', 'avg')], + params, show_all=show_all) + return params + else: + print(f'{fname} not found.') + return {} \ No newline at end of file diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 07b11cc..a50800d 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -141,7 +141,8 @@ class DSSCBinner: load tim data and construct coordinate array according to corresponding dssc frame number. """ - self.tim = get_tim_formatted(self.run, + self.tim = get_tim_formatted(self.proposal, + self.runnr, self.tim_names, self.dssc_coords_stride) diff --git a/src/toolbox_scs/detectors/dssc_misc.py b/src/toolbox_scs/detectors/dssc_misc.py index 96da909..00c84da 100644 --- a/src/toolbox_scs/detectors/dssc_misc.py +++ b/src/toolbox_scs/detectors/dssc_misc.py @@ -12,7 +12,7 @@ from imageio import imread import extra_data as ed from .xgm import get_xgm -from .digitizers import get_tim_peaks +from .digitizers import get_digitizer_peaks __all__ = [ 'create_dssc_bins', @@ -132,14 +132,16 @@ def get_xgm_formatted(run_obj, xgm_name, dssc_frame_coords): return xgm -def get_tim_formatted(run_obj, tim_names, dssc_frame_coords): +def get_tim_formatted(proposal, runNB, tim_names, dssc_frame_coords): """ Load the tim data and define coordinates along the pulse dimension. Parameters ---------- - run_obj: extra_data.DataCollection - DataCollection object + proposal: int + the proposal number + runNB: int + the run number tim_names: list of str a list of valid mnemonics for tim data sources dssc_frame_coords: int @@ -151,7 +153,7 @@ def get_tim_formatted(run_obj, tim_names, dssc_frame_coords): tim data with coordinate 'pulse'. """ log.debug('load tim data') - tim = get_tim_peaks(run_obj, tim_names) + tim = get_digitizer_peaks(run_obj, tim_names) # average over all tim sources tim = -tim.to_array().mean(dim='variable') pulse_dim = [d for d in tim.dims if d != 'trainId'][0] -- GitLab