diff --git a/src/toolbox_scs/detectors/digitizers_old.py b/src/toolbox_scs/detectors/digitizers_old.py deleted file mode 100644 index 109df50a0e5b1e88dc496a48cfdeaf885c2ae28a..0000000000000000000000000000000000000000 --- a/src/toolbox_scs/detectors/digitizers_old.py +++ /dev/null @@ -1,1766 +0,0 @@ -""" Digitizers related sub-routines - - Copyright (2021) SCS Team. - - (contributions preferrably comply with pep8 code structure - guidelines.) -""" - -import logging -import os - -import numpy as np -import xarray as xr -import matplotlib.pyplot as plt -from scipy.signal import find_peaks -from scipy.signal import fftconvolve - -from ..misc.bunch_pattern_external import is_pulse_at -from ..util.exceptions import ToolBoxValueError -from ..mnemonics_machinery import (mnemonics_to_process, - mnemonics_for_run) -from extra_data import open_run -from extra_data.read_machinery import find_proposal -from extra.components import XrayPulses, OpticalLaserPulses - -__all__ = [ - 'check_peak_params', - 'get_digitizer_peaks', - 'get_laser_peaks', - 'get_peaks', - 'get_tim_peaks', - 'digitizer_signal_description', - 'get_dig_avg_trace', - 'extract_digitizer_peaks', - 'load_processed_peaks', - 'check_processed_peak_params' -] - -log = logging.getLogger(__name__) - - -def peaks_from_raw_trace(traces, pulseStart, pulseStop, baseStart, baseStop, - period=None, npulses=None, extra_dim=None): - """ - Computes peaks from raw digitizer traces by trapezoidal integration. - - Parameters - ---------- - traces: xarray DataArray or numpy array containing raw traces. If - numpy array is provided, the second dimension is that of the samples. - pulseStart: int or list or 1D-numpy array - trace index of integration start. If 1d array, each value is the start - of one peak. The period and npulses parameters are ignored. - pulseStop: int - trace index of integration stop. - baseStart: int - trace index of background start. - baseStop: int - trace index of background stop. - period: int - number of samples between two peaks. Ignored if intstart is a 1D-array. - npulses: int - number of pulses. Ignored if intstart is a 1D-array. - extra_dim: str - Name given to the dimension along the peaks. Defaults to 'pulseId'. - - Returns - ------- - xarray DataArray - """ - - assert len(traces.shape) == 2 - - if isinstance(traces, xr.DataArray): - ntid = traces.sizes['trainId'] - coords = traces.coords - traces = traces.values - if traces.shape[0] != ntid: - traces = traces.T - else: - coords = None - - if hasattr(pulseStart, '__len__'): - pulseStart = np.array(pulseStart) - pulses = pulseStart - pulseStart[0] - pulseStart = pulseStart[0] - else: - pulses = range(0, npulses*period, period) - - if extra_dim is None: - extra_dim = 'pulseId' - results = xr.DataArray(np.empty((traces.shape[0], len(pulses))), - coords=coords, - dims=['trainId', extra_dim]) - - for i, p in enumerate(pulses): - a = pulseStart + p - b = pulseStop + p - bkga = baseStart + p - bkgb = baseStop + p - if b > traces.shape[1]: - break - bg = np.outer(np.median(traces[:, bkga:bkgb], axis=1), - np.ones(b-a)) - results[:, i] = np.trapz(traces[:, a:b] - bg, axis=1) - return results - - -def peaks_from_apd(array, params, digitizer, bpt, bunchPattern): - """ - Extract peak-integrated data according to the bunch pattern. - - Parameters - ---------- - array: xarray DataArray - 2D array containing peak-integrated data - params: dict - peak-integration parameters of the digitizer - digitizer: str - digitizer type, one of {'FastADC', 'ADQ412'} - bpt: xarray DataArray - bunch pattern table - bunchPattern: str - one of {'sase1', 'sase3', 'scs_ppl'}, used to select pulses and - assign name of the pulse id dimension. - - Returns - ------- - xarray DataArray with pulse id coordinates. - """ - if params['enable'] == 0 or (array == 1.).all(): - raise ValueError('The digitizer did not record integrated peaks. ' - 'Consider using raw traces from the same channel ' - 'for peak integration.') - if digitizer == 'FastADC': - min_distance = 24 - if digitizer == 'ADQ412': - min_distance = 440 - period = params['period'] - if period % min_distance != 0: - log.warning(f'Warning: the pulse period ({period} samples) of ' - 'digitizer is not a multiple of the pulse separation at ' - f'4.5 MHz ({min_distance} samples). Pulse id assignment ' - 'is likely to fail.') - stride = int(period/min_distance) - npulses_apd = params['npulses'] - dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', 'scs_ppl': 'ol_pId'} - pulse_dim = dim_names[bunchPattern] - arr_dim = [dim for dim in array.dims if dim != 'trainId'][0] - if npulses_apd > array.sizes[arr_dim]: - log.warning(f'The digitizer was set to record {npulses_apd} pulses ' - f'but the array length is only {array.sizes[arr_dim]}.') - npulses_apd = array.sizes[arr_dim] - mask = is_pulse_at(bpt, bunchPattern).rename({'pulse_slot': pulse_dim}) - mask = mask.where(mask.trainId.isin(array.trainId), drop=True) - mask = mask.assign_coords({pulse_dim: np.arange(bpt.sizes['pulse_slot'])}) - pid = np.sort(np.unique(np.where(mask)[1])) - npulses_bpt = len(pid) - apd_coords = np.arange(pid[0], pid[0] + stride*npulses_apd, stride) - noalign = False - if len(np.intersect1d(apd_coords, pid, assume_unique=True)) < npulses_bpt: - log.warning('Not all pulses were recorded. The digitizer ' - 'was set to record pulse ids ' - f'{apd_coords[apd_coords<bpt.sizes["pulse_slot"]]} but the' - 'bunch pattern for' - f' {bunchPattern} is {pid}. Skipping pulse ID alignment.') - noalign = True - array = array.isel({arr_dim: slice(0, npulses_apd)}) - array = array.where(array != 1.) - if noalign: - return array - array = array.rename( - {arr_dim: pulse_dim}).assign_coords({pulse_dim: apd_coords}) - array, mask = xr.align(array, mask, join='inner') - array = array.where(mask, drop=True) - return array - - -def get_peaks(run, - data, - mnemonic, - useRaw=True, - autoFind=True, - integParams=None, - bunchPattern='sase3', - bpt=None, - extra_dim=None, - indices=None, - ): - """ - Extract peaks from one source (channel) of a digitizer. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data - data: xarray DataArray or str - array containing the raw traces or peak-integrated values from the - digitizer. If str, must be one of the ToolBox mnemonics. If None, - the data is loaded via the source and key arguments. - mnemonic: str or dict - ToolBox mnemonic or dict with source and key as in - {'source': 'SCS_UTC1_ADQ/ADC/1:network', - 'key': 'digitizers.channel_1_A.raw.samples'} - useRaw: bool - If True, extract peaks from raw traces. If False, uses the APD (or - peaks) data from the digitizer. - autoFind: bool - If True, finds integration parameters by inspecting the average raw - trace. Only valid if useRaw is True. - integParams: dict - dictionnary containing the integration parameters for raw trace - integration: 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', - 'period', 'npulses'. Not used if autoFind is True. All keys are - required when bunch pattern is missing. - bunchPattern: str - match the peaks to the bunch pattern: 'sase1', 'sase3', 'scs_ppl'. - This will dictate the name of the pulse ID coordinates: 'sa1_pId', - 'sa3_pId' or 'scs_ppl'. - bpt: xarray DataArray - bunch pattern table - extra_dim: str - Name given to the dimension along the peaks. If None, the name is given - according to the bunchPattern. - indices: array, slice - indices from the peak-integrated data to retrieve. Only required - when bunch pattern is missing and useRaw is False. - - Returns - ------- - xarray.DataArray containing digitizer peaks with pulse coordinates - """ - arr = data - dim = [d for d in arr.dims if d != 'trainId'][0] - - # Load bunch pattern table - run_mnemonics = mnemonics_for_run(run) - if bpt is None and bunchPattern != 'None': - if 'bunchPatternTable' in run_mnemonics: - m = run_mnemonics['bunchPatternTable'] - bpt = run.get_array(m['source'], m['key'], m['dim']) - pattern = bunchPattern - else: - pattern = bunchPattern - if bunchPattern == 'None': - bpt = None - - # Find digitizer type - m = mnemonic if isinstance(mnemonic, dict) else run_mnemonics[mnemonic] - digitizer = digitizer_type(run, m.get('source')) - - # 1. Peak-integrated data from digitizer - if useRaw is False: - # 1.1 No bunch pattern provided - if bpt is None: - log.info('Missing bunch pattern info.') - if indices is None: - raise TypeError('indices argument must be provided ' - 'when bunch pattern info is missing.') - if extra_dim is None: - extra_dim = 'pulseId' - return arr.isel({dim: indices}).rename({dim: extra_dim}) - - # 1.2 Bunch pattern is provided - if isinstance(mnemonic, dict): - peak_params = channel_peak_params(run, mnemonic.get('source'), - mnemonic.get('key')) - else: - peak_params = channel_peak_params(run, mnemonic) - log.debug(f'Digitizer peak integration parameters: {peak_params}') - return peaks_from_apd(arr, peak_params, digitizer, bpt, bunchPattern) - - # 2. Use raw data from digitizer - # minimum pulse period @ 4.5MHz, according to digitizer type - min_distance = 1 - if digitizer == 'FastADC': - min_distance = 24 - if digitizer == 'ADQ412': - min_distance = 440 - if autoFind: - stride = int(np.max([1, np.floor(arr.sizes['trainId']/200)])) - trace = arr.isel(trainId=slice(0, None, stride)).mean(dim='trainId') - try: - integParams = find_integ_params(trace) - except ValueError as err: - log.warning(f'{err}, trying with averaged trace over all trains.') - trace = arr.mean(dim='trainId') - integParams = find_integ_params(trace) - log.debug(f'Auto find peaks result: {integParams}') - - required_keys = ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop', 'period', 'npulses'] - if integParams is None or not all(name in integParams - for name in required_keys): - raise TypeError('All keys of integParams argument ' - f'{required_keys} are required when ' - 'bunch pattern info is missing.') - - # 2.1. No bunch pattern provided - if bpt is None: - log.info('Missing bunch pattern info.') - log.debug(f'Retrieving {integParams["npulses"]} pulses.') - if extra_dim is None: - extra_dim = 'pulseId' - return peaks_from_raw_trace(arr, integParams['pulseStart'], - integParams['pulseStop'], - integParams['baseStart'], - integParams['baseStop'], - integParams['period'], - integParams['npulses'], - extra_dim=extra_dim) - - # 2.2 Bunch pattern is provided - # load mask and extract pulse Id: - dim_names = {'sase3': 'sa3_pId', 'sase1': 'sa1_pId', - 'scs_ppl': 'ol_pId', 'None': 'pulseId'} - extra_dim = dim_names[pattern] - valid_tid = np.intersect1d(arr.trainId, bpt.trainId, assume_unique=True) - mask = is_pulse_at(bpt, pattern) - pattern_changed = ~(mask == mask[0]).all().values - if not pattern_changed: - pid = np.nonzero(mask[0].values)[0] - valid_arr = arr - else: - mask = is_pulse_at(bpt.sel(trainId=valid_tid), pattern) - mask = mask.rename({'pulse_slot': extra_dim}) - mask = mask.assign_coords( - {extra_dim: np.arange(bpt.sizes['pulse_slot'])}) - mask_on = mask.where(mask, drop=True).fillna(False).astype(bool) - log.info(f'Bunch pattern of {pattern} changed during the run.') - pid = mask_on.coords[extra_dim] - # select trains containing pulses - valid_arr = arr.sel(trainId=mask_on.trainId) - npulses = len(pid) - log.debug(f'Bunch pattern: {npulses} pulses for {pattern}.') - if npulses == 1: - period_bpt = 0 - else: - period_bpt = np.min(np.diff(pid)) - if autoFind and period_bpt*min_distance != integParams['period']: - log.warning('The period from the bunch pattern is different than ' - 'that found by the peak-finding algorithm. Either ' - 'the algorithm failed or the bunch pattern source ' - f'({bunchPattern}) is not correct.') - # create array of sample indices for peak integration - sample_id = (pid-pid[0])*min_distance - # override auto find parameters - if isinstance(integParams['pulseStart'], (int, np.integer)): - integParams['pulseStart'] = integParams['pulseStart'] + sample_id - peaks = peaks_from_raw_trace(valid_arr, integParams['pulseStart'], - integParams['pulseStop'], - integParams['baseStart'], - integParams['baseStop'], - integParams['period'], - integParams['npulses'], - extra_dim) - if pattern_changed: - peaks = peaks.where(mask_on, drop=True) - return peaks.assign_coords({extra_dim: pid}) - - -def get_dig_avg_trace(run, mnemonic, ntrains=None): - """ - Compute the average over ntrains evenly spaced accross all trains - of a digitizer trace. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. - ntrains: int - Number of trains used to calculate the average raw trace. - If None, all trains are used. - - Returns - ------- - trace: DataArray - The average digitizer trace - """ - run_mnemonics = mnemonics_for_run(run) - if ntrains is None: - sel = run - else: - total_tid = len(run.train_ids) - stride = int(np.max([1, np.floor(total_tid/ntrains)])) - sel = run.select_trains(np.s_[0:None:stride]) - m = run_mnemonics[mnemonic] - raw_trace = sel.get_array(m['source'], m['key'], m['dim']) - raw_trace = raw_trace.mean(dim='trainId') - return raw_trace - - -def channel_peak_params(run, source, key=None, channel=None, board=None): - """ - Extract peak-integration parameters used by a channel of the digitizer. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - source: str - ToolBox mnemonic of a digitizer data, e.g. "MCP2apd" or - "FastADC4peaks", or name of digitizer source, e.g. - 'SCS_UTC1_ADQ/ADC/1:network'. - key: str - optional, used in combination of source (if source is not a ToolBox - mnemonics) instead of digitizer, channel and board. - channel: int or str - The digitizer channel for which to retrieve the parameters. If None, - inferred from the source mnemonic or source + key arguments. - board: int - Board of the ADQ412. If None, inferred from the source mnemonic or - source + key arguments. - - Returns - ------- - dict with peak integration parameters. - """ - run_mnemonics = mnemonics_for_run(run) - if source in run_mnemonics: - m = run_mnemonics[source] - source = m['source'] - key = m['key'] - if 'network' in source: - return adq412_channel_peak_params(run, source, key, channel, board) - else: - return fastADC_channel_peak_params(run, source, channel) - - -def fastADC_channel_peak_params(run, source, channel=None): - """ - Extract peak-integration parameters used by a channel of the Fast ADC. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - source: str - Name of Fast ADC source, e.g. 'SCS_UTC1_MCP/ADC/1:channel_5.output'. - channel: int - The Fast ADC channel for which to retrieve the parameters. If None, - inferred from the source. - - Returns - ------- - dict with peak integration parameters. - """ - fastADC_keys = { - 'baseStart': ('baselineSettings.baseStart.value', - 'baseStart.value'), - 'baseStop': ('baseStop.value',), - 'baseLength': ('baselineSettings.length.value',), - 'enable': ('enablePeakComputation.value',), - 'pulseStart': ('initialDelay.value',), - 'pulseLength': ('peakSamples.value',), - 'npulses': ('numPulses.value',), - 'period': ('pulsePeriod.value',) - } - if channel is None: - channel = int(source.split(':')[1].split('.')[0].split('_')[1]) - baseName = f'channel_{channel}.' - source = source.split(':')[0] - data = run.select(source).train_from_index(0)[1][source] - result = {} - for mnemo, versions in fastADC_keys.items(): - for v in versions: - key = baseName + v - if key in data: - result[mnemo] = data[key] - if 'baseLength' in result: - result['baseStop'] = result['baseStart'] + \ - result.pop('baseLength') - if 'pulseLength' in result: - result['pulseStop'] = result['pulseStart'] + \ - result.pop('pulseLength') - return result - - -def adq412_channel_peak_params(run, source, key=None, - channel=None, board=None): - """ - Extract peak-integration parameters used by a channel of the ADQ412. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - source: str - Nname of ADQ412 source, e.g. 'SCS_UTC1_ADQ/ADC/1:network'. - key: str - optional, e.g. 'digitizers.channel_1_D.apd.pulseIntegral', used in - combination of source instead of channel and board. - channel: int or str - The ADQ412 channel for which to retrieve the parameters. If None, - inferred from the source mnemonic or source + key arguments. - board: int - Board of the ADQ412. If None, inferred from the source mnemonic or - source + key arguments. - - Returns - ------- - dict with peak integration parameters. - """ - if key is None: - if channel is None or board is None: - raise ValueError('key or channel + board arguments are ' - 'missing.') - else: - k = key.split('.')[1].split('_') - ch_to_int = {'A': 0, 'B': 1, 'C': 2, 'D': 3} - channel = ch_to_int[k[2]] - board = k[1] - baseName = f'board{board}.apd.channel_{channel}.' - source = source.split(':')[0] - adq412_keys = { - 'baseStart': (baseName + 'baseStart.value',), - 'baseStop': (baseName + 'baseStop.value',), - 'enable': (baseName + 'enable.value',), - 'pulseStart': (baseName + 'pulseStart.value',), - 'pulseStop': (baseName + 'pulseStop.value',), - 'initialDelay': (baseName + 'initialDelay.value',), - 'upperLimit': (baseName + 'upperLimit.value',), - 'npulses': (f'board{board}.apd.numberOfPulses.value',) - } - - data = run.select(source).train_from_index(0)[1][source] - result = {} - for mnemo, versions in adq412_keys.items(): - for key in versions: - if key in data: - result[mnemo] = data[key] - result['period'] = result.pop('upperLimit') - \ - result.pop('initialDelay') - return result - - -def find_integ_params(trace, height=None, width=1): - """ - Find integration parameters for peak integration of a raw - digitizer trace. Based on scipy find_peaks(). - - Parameters - ---------- - trace: numpy array or xarray DataArray - The digitier raw trace used to find peaks - height: int - minimum threshold for peak determination - width: int - minimum width of peak - - Returns - ------- - dict with keys 'pulseStart', 'pulseStop', 'baseStart', 'baseStop', - 'period', 'npulses' and values in number of samples. - """ - if isinstance(trace, xr.DataArray): - trace = trace.values - trace_norm = trace - np.median(trace) - trace_norm = -trace_norm if np.mean(trace_norm) < 0 else trace_norm - SNR = np.max(np.abs(trace_norm)) / np.std(trace_norm[:100]) - if SNR < 10: - log.warning('signal-to-noise ratio too low: cannot ' - 'automatically find peaks.') - return {'pulseStart': 2, 'pulseStop': 3, - 'baseStart': 0, 'baseStop': 1, - 'period': 0, 'npulses': 1} - # Compute autocorrelation using fftconvolve - # from "https://stackoverflow.com/questions/12323959/" - # "fast-cross-correlation-method-in-python" - if len(trace_norm) % 2 == 0: - n = len(trace_norm) - else: - n = len(trace_norm) - 1 - hl = int(n/2) - a = trace_norm[:n] - b = np.zeros(2*n) - b[hl: hl + n] = a - # Do an array flipped convolution, which is a correlation. - ac_trace = fftconvolve(b, a[::-1], mode='valid') - # slower approach: - # ac_trace = np.correlate(trace_norm, trace_norm, mode='same') - n = int(len(ac_trace)/2) - # estimate quality of ac_trace and define min height to detect peaks - factor = np.abs(ac_trace.max() / np.median(ac_trace)) - factor = 3 if factor > 20 else 1.5 - ac_peaks = find_peaks(ac_trace[n:], height=ac_trace[n:].max()/factor) - if len(ac_peaks[0]) == 0: - period = 0 - distance = 1 - elif len(ac_peaks[0]) == 1: - period = ac_peaks[0][0] - distance = max(1, period-6) - else: - period = int(np.median(ac_peaks[0][1:] - ac_peaks[0][:-1])) - distance = max(1, period-6) # smaller than period to account for all peaks - # define min height to detect peaks depending on signal quality - f = np.max([3, np.min([(SNR/10), 4])]) - height = trace_norm.max() / f - peaks, dic = find_peaks(trace_norm, distance=distance, - height=height, width=1) - # filter out peaks that are not periodically spaced - idx = np.ones(len(peaks), dtype=bool) - idx[1:] = np.isclose(peaks[1:] - peaks[:-1], - np.ones(len(peaks[1:]))*period, atol=6) - peaks = peaks[idx] - for d in dic: - dic[d] = dic[d][idx] - npulses = len(peaks) - if npulses == 0: - raise ValueError('Could not automatically find peaks.') - pulseStart = np.mean(dic['left_ips'] - peaks + peaks[0], dtype=int) - pulseStop = np.mean(dic['right_ips'] - peaks + peaks[0], dtype=int) - baseStop = pulseStart - np.mean(peaks - dic['left_ips'])/2 - 1 - baseStop = np.min([baseStop, pulseStart]).astype(int) - baseStop = np.max([baseStop, 0]).astype(int) - baseStart = baseStop - np.mean(dic['widths'])/2 - baseStart = np.max([baseStart, 0]).astype(int) - result = {'pulseStart': pulseStart, 'pulseStop': pulseStop, - 'baseStart': baseStart, 'baseStop': baseStop, - 'period': period, 'npulses': npulses} - return result - - -def get_peak_params(run, mnemonic, raw_trace=None, ntrains=None): - """ - Get the peak region and baseline region of a raw digitizer trace used - to compute the peak integration. These regions are either those of the - digitizer peak-integration settings or those determined in get_tim_peaks - or get_fast_adc_peaks when the inputs are raw traces. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. - raw_trace: optional, 1D numpy array or xarray DataArray - Raw trace to display. If None, the average raw trace over - ntrains of the corresponding channel is loaded (this can - be time-consuming). - ntrains: optional, int - Only used if raw_trace is None. Number of trains used to - calculate the average raw trace of the corresponding channel. - """ - run_mnemonics = mnemonics_for_run(run) - if mnemonic not in run_mnemonics: - raise ToolBoxValueError("mnemonic must be a ToolBox mnemonics") - if "raw" not in mnemonic: - params = channel_peak_params(run, mnemonic) - if 'enable' in params and params['enable'] == 0: - log.warning('The digitizer did not record peak-integrated data.') - else: - if raw_trace is None: - raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) - params = find_integ_params(raw_trace) - return params - - -def find_peak_integration_parameters(run, mnemonic, raw_trace=None, - integParams=None, pattern=None): - ''' - Finds peak integration parameters. - ''' - run_mnemonics = mnemonics_for_run(run) - digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) - pulse_period = 24 if digitizer == "FastADC" else 440 - - if integParams is None: # load raw trace and find peaks - autoFind = True - if raw_trace is None: - raw_trace = get_dig_avg_trace(run, mnemonic) - params = get_peak_params(run, mnemonic, raw_trace) - else: # inspect provided parameters - autoFind = False - required_keys = ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop'] - add_text = '' - if pattern is None and not hasattr(integParams['pulseStart'], - '__len__'): - required_keys += ['period', 'npulses'] - add_text = 'Bunch pattern not provided. ' - if not all(name in integParams for name in required_keys): - raise ValueError(add_text + 'All keys of integParams argument ' - f'{required_keys} are required.') - params = integParams.copy() - - # extract the pulse ids from the parameters (starting at 0) - if hasattr(params['pulseStart'], '__len__'): - if params.get('npulses') is not None and ( - params.get('npulses') != len(params['pulseStart']) ): - log.warning('The number of pulses does not match the length ' - 'of pulseStart. Using length of pulseStart as ' - 'the number of pulses.') - params['npulses'] = len(params['pulseStart']) - pulse_ids_params = ((np.array(params['pulseStart']) - - params['pulseStart'][0]) / pulse_period).astype(int) - else: - pulse_ids_params = np.arange(0, params['npulses'] * params['period'] / pulse_period, - params['period'] / pulse_period).astype(int) - - # Extract pulse_ids and npulses from bunch pattern - pulse_ids, npulses, period = None, None, None - regular = True - if pattern is not None: - bunchPattern = 'sase3' if hasattr(pattern, 'sase') else 'scs_ppl' - if pattern.is_constant_pattern() is False: - log.info('The number of pulses changed during the run.') - pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) - npulses, period = None, None - regular = False - else: - pulse_ids = pattern.peek_pulse_ids(labelled=False) - npulses = len(pulse_ids) - if npulses > 1: - periods = np.diff(pulse_ids) - if len(np.unique(periods)) > 1: - regular = False - else: - period = np.unique(periods)[0] * pulse_period - # Compare parameters with bunch pattern - if len(pulse_ids_params) == len(pulse_ids): - if not (pulse_ids_params == pulse_ids - pulse_ids[0]).all(): - log.warning('The provided pulseStart parameters do not match the ' - f'{bunchPattern} bunch pattern pulse ids. Using ' - 'bunch pattern parameters.') - pulse_ids_params = pulse_ids - - if (npulses != params.get('npulses') or - period != params.get('period')): - if autoFind: - add_text = 'Automatically found ' - else: - add_text = 'Provided ' - log.warning(add_text + 'integration parameters ' - f'(npulses={params.get("npulses")}, ' - f'period={params.get("period")}) do not match the ' - f'{bunchPattern} bunch pattern (npulses={npulses}, ' - f'period={period}). Using bunch pattern parameters.') - pulse_ids_params = pulse_ids - params['npulses'] = npulses - params['period'] = period - if not hasattr(params['pulseStart'], '__len__'): - start = params['pulseStart'] - else: - start = params['pulseStart'][0] - - params['pulseStart'] = np.array([int(start + (pid - pulse_ids_params[0]) * pulse_period) - for pid in pulse_ids_params]) - return params, pulse_ids_params, regular, raw_trace - - -def check_peak_params(run, mnemonic, raw_trace=None, ntrains=200, params=None, - plot=True, show_all=False, bunchPattern='sase3'): - """ - Checks and plots the peak parameters (pulse window and baseline window - of a raw digitizer trace) used to compute the peak integration. These - parameters are either set by the digitizer peak-integration settings, - or are determined by a peak finding algorithm (used in get_tim_peaks - or get_fast_adc_peaks) when the inputs are raw traces. The parameters - can also be provided manually for visual inspection. The plot either - shows the first and last pulse of the trace or the entire trace. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - ToolBox mnemonic of the digitizer data, e.g. 'MCP2apd'. - raw_trace: optional, 1D numpy array or xarray DataArray - Raw trace to display. If None, the average raw trace over - ntrains of the corresponding channel is loaded (this can - be time-consuming). - ntrains: optional, int - Only used if raw_trace is None. Number of trains used to - calculate the average raw trace of the corresponding channel. - plot: bool - If True, displays the raw trace and peak integration regions. - show_all: bool - If True, displays the entire raw trace and all peak integration - regions (this can be time-consuming). - If False, shows the first and last pulse according to the bunchPattern. - bunchPattern: optional, str - Only used if plot is True. Checks the bunch pattern against - the digitizer peak parameters and shows potential mismatch. - - Returns - ------- - dictionnary of peak integration parameters - """ - ''' - run_mnemonics = mnemonics_for_run(run) - if raw_trace is None: - raw_trace = get_dig_avg_trace(run, mnemonic, ntrains) - if params is None: - integParams = get_peak_params(run, mnemonic, raw_trace) - title = 'Auto-find peak params' - else: - title = 'Provided peak params' - integParams = params.copy() - if 'enable' in integParams and integParams['enable'] == 0: - log.warning('The digitizer did not record peak-integrated data.') - if not plot: - return integParams - - digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) - pulse_period = 24 if digitizer == "FastADC" else 440 - pattern = None - try: - if bunchPattern == 'sase3': - pattern = XrayPulses(run) - if bunchPattern == 'scs_ppl': - pattern = OpticalLaserPulses(run) - except Exception as e: - log.warning(e) - bunchPattern = None - - # Extract pulse_ids, npulses and period from bunch pattern - pulse_ids, npulses, period = None, None, None - regular = True - if pattern is not None: - if pattern.is_constant_pattern() is False: - log.info('The number of pulses changed during the run.') - pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) - npulses, period = None, None - regular = False - else: - pulse_ids = pattern.peek_pulse_ids(labelled=False) - npulses = len(pulse_ids) - if npulses > 1: - periods = np.diff(pulse_ids) - if len(np.unique(periods)) > 1: - regular = False - else: - period = np.unique(periods)[0] * pulse_period - bp_params = {} - bp_params['npulses'] = npulses - bp_params['period'] = period - bp_params['pulse_ids'] = pulse_ids - bp_params['regular'] = regular - - if regular: - print(f'bunch pattern {bunchPattern}: {bp_params["npulses"]} pulses,' - f' {bp_params["period"]} samples between two pulses') - else: - print(f'bunch pattern {bunchPattern}: Not a regular pattern. ' - f'{bp_params["npulses"]} pulses, pulse_ids=' - f'{bp_params["pulse_ids"]}.') - if (npulses != integParams.get('npulses') or - period != integParams.get('period')): - log.warning(f'Integration parameters ' - f'(npulses={integParams.get("npulses")}, ' - f'period={integParams.get("period")}) do not match the ' - f'the bunch pattern (npulses={npulses}, ' - f'period={period}). Using bunch pattern parameters.') - integParams['npulses'] = npulses - integParams['period'] = period - start = integParams['pulseStart'] - integParams['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period - for pid in pulse_ids] - else: - bp_params = None - print(f'{title}: {integParams["npulses"]} pulses, {integParams["period"]}' - ' samples between two pulses') - fig, ax = plotPeakIntegrationWindow(raw_trace, integParams, bp_params, show_all) - ax[0].set_ylabel(mnemonic.replace('peaks', 'raw').replace('apd', 'raw')) - fig.suptitle(title, size=12) - ''' - run_mnemonics = mnemonics_for_run(run) - digitizer = digitizer_type(run, run_mnemonics[mnemonic]['source']) - pulse_period = 24 if digitizer == "FastADC" else 440 - if params is None: - title = 'Auto-find peak parameters' - else: - title = 'Provided peak parameters' - pattern = None - try: - if bunchPattern == 'sase3': - pattern = XrayPulses(run) - if bunchPattern == 'scs_ppl': - pattern = OpticalLaserPulses(run) - except Exception as e: - log.warning(e) - bunchPattern = None - integParams, pulse_ids, regular, raw_trace = find_peak_integration_parameters( - run, mnemonic, raw_trace, params, pattern) - if bunchPattern: - if regular: - print(f'bunch pattern {bunchPattern}: {len(pulse_ids)} pulses,' - f' {(pulse_ids[1] - pulse_ids[0]) * pulse_period} samples between two pulses') - print(title + ': ' + f' {integParams["npulses"]} pulses, ' - f'{integParams["period"]} samples between two pulses') - else: - print(f'bunch pattern {bunchPattern}: Not a regular pattern. ' - f'{len(pulse_ids)} pulses, pulse_ids=' - f'{pulse_ids}.') - if plot: - if raw_trace is None: - raw_trace = get_dig_avg_trace(run, mnemonic) - fig, ax = plotPeakIntegrationWindow(raw_trace, integParams, - show_all=show_all) - fig.suptitle(title, size=12) - return integParams - - -def plotPeakIntegrationWindow(raw_trace, params, show_all=False): - if hasattr(params['pulseStart'], '__len__'): - pulseStarts = np.array(params['pulseStart']) - pulseStops = params['pulseStop'] + pulseStarts - params['pulseStart'][0] - baseStarts = params['baseStart'] + pulseStarts - params['pulseStart'][0] - baseStops = params['baseStop'] + pulseStarts - params['pulseStart'][0] - else: - n = params['npulses'] - p = params['period'] - pulseStarts = [params['pulseStart'] + i*p for i in range(n)] - pulseStops = [params['pulseStop'] + i*p for i in range(n)] - baseStarts = [params['baseStart'] + i*p for i in range(n)] - baseStops = [params['baseStop'] + i*p for i in range(n)] - - if show_all: - fig, ax = plt.subplots(figsize=(6, 3), constrained_layout=True) - for i in range(len(pulseStarts)): - lbl = 'baseline' if i == 0 else None - lp = 'peak' if i == 0 else None - ax.axvline(baseStarts[i], ls='--', color='k') - ax.axvline(baseStops[i], ls='--', color='k') - ax.axvspan(baseStarts[i], baseStops[i], - alpha=0.5, color='grey', label=lbl) - ax.axvline(pulseStarts[i], ls='--', color='r') - ax.axvline(pulseStops[i], ls='--', color='r') - ax.axvspan(pulseStarts[i], pulseStops[i], - alpha=0.2, color='r', label=lp) - ax.plot(raw_trace, color='C0', label='raw trace') - ax.legend(fontsize=8) - return fig, [ax] - - fig, ax = plt.subplots(1, 2, figsize=(6, 3), constrained_layout=True) - for plot in range(2): - title = 'First pulse' if plot == 0 else 'Last pulse' - i = 0 if plot == 0 else -1 - ax[plot].axvline(baseStarts[i], ls='--', color='k') - ax[plot].axvline(baseStops[i], ls='--', color='k') - ax[plot].axvspan(baseStarts[i], baseStops[i], - alpha=0.5, color='grey', label='baseline') - ax[plot].axvline(pulseStarts[i], ls='--', color='r') - ax[plot].axvline(pulseStops[i], ls='--', color='r') - ax[plot].axvspan(pulseStarts[i], pulseStops[i], - alpha=0.2, color='r', label='peak') - xmin = np.max([0, baseStarts[i]-200]) - xmax = np.min([pulseStops[i]+200, raw_trace.size]) - ax[plot].plot(np.arange(xmin, xmax), raw_trace[xmin:xmax], color='C0', - label=title) - ax[plot].legend(fontsize=8) - ax[plot].set_xlim(xmin, xmax) - ax[plot].set_xlabel('digitizer samples') - ax[plot].set_title(title, size=10) - return fig, ax - - -def digitizer_type(run, source): - """ - Finds the digitizer type based on the class Id / name of the source. - Example source: 'SCS_UTC1_MCP/ADC/1'. Defaults to ADQ412 if not found. - """ - ret = None - if '_MCP/ADC/1' in source: - ret = 'FastADC' - if '_ADQ/ADC/1' in source: - ret = 'ADQ412' - if ret is None: - digi_dict = {'FastAdc': 'FastADC', - 'FastAdcLegacy': 'FastADC', - 'AdqDigitizer': 'ADQ412', - 'PyADCChannel': 'FastADC', - 'PyADCChannelLegacy': 'FastADC'} - try: - source = source.split(':')[0] - classId = run.get_run_value(source, 'classId.value') - ret = digi_dict.get(classId) - except Exception as e: - log.warning(str(e)) - log.warning(f'Could not find digitizer type from source {source}.') - ret = 'ADQ412' - return ret - - -def get_tim_peaks(run, mnemonic=None, merge_with=None, - bunchPattern='sase3', integParams=None, - keepAllSase=False): - """ - Automatically computes TIM peaks. Sources can be loaded on the - fly via the mnemonics argument, or processed from an existing data - set (merge_with). The bunch pattern table is used to assign the - pulse id coordinates. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - mnemonics for TIM, e.g. "MCP2apd". - merge_with: xarray Dataset - If provided, the resulting Dataset will be merged with this - one. The TIM variables of merge_with (if any) will also be - computed and merged. - bunchPattern: str - 'sase1' or 'sase3' or 'scs_ppl', bunch pattern - used to extract peaks. The pulse ID dimension will be named - 'sa1_pId', 'sa3_pId' or 'ol_pId', respectively. - integParams: dict - dictionnary for raw trace integration, e.g. - {'pulseStart':100, 'pulsestop':200, 'baseStart':50, - 'baseStop':99, 'period':24, 'npulses':500}. - If None, integration parameters are computed automatically. - keepAllSase: bool - Only relevant in case of sase-dedicated trains. If True, all - trains are kept, else only those of the bunchPattern are kept. - - Returns - ------- - xarray Dataset with TIM variables substituted by - the peak caclulated values (e.g. "MCP2raw" becomes - "MCP2peaks"), merged with Dataset *merge_with* if provided. - """ - return get_digitizer_peaks(run, mnemonic, merge_with, - bunchPattern, integParams, - keepAllSase) - - -def get_laser_peaks(run, mnemonic=None, merge_with=None, - bunchPattern='scs_ppl', integParams=None): - """ - Extracts laser photodiode signal (peak intensity) from Fast ADC - digitizer. Sources can be loaded on the fly via the mnemonics - argument, and/or processed from an existing data set (merge_with). - The PP laser bunch pattern is used to assign the pulse idcoordinates. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - mnemonic for FastADC corresponding to laser signal, e.g. - "FastADC2peaks" or 'I0_ILHraw'. - merge_with: xarray Dataset - If provided, the resulting Dataset will be merged with this - one. The FastADC variables of merge_with (if any) will also be - computed and merged. - bunchPattern: str - 'sase1' or 'sase3' or 'scs_ppl', bunch pattern - used to extract peaks. - integParams: dict - dictionnary for raw trace integration, e.g. - {'pulseStart':100, 'pulsestop':200, 'baseStart':50, - 'baseStop':99, 'period':24, 'npulses':500}. - If None, integration parameters are computed - automatically. - - Returns - ------- - xarray Dataset with all Fast ADC variables substituted by - the peak caclulated values (e.g. "FastADC2raw" becomes - "FastADC2peaks"). - """ - return get_digitizer_peaks(run, mnemonic, merge_with, - bunchPattern, integParams, False) - - -def get_digitizer_peaks(run, mnemonic, merge_with=None, - bunchPattern='sase3', integParams=None, - keepAllSase=False): - """ - Automatically computes digitizer peaks. A source can be loaded on the - fly via the mnemonic argument, or processed from an existing data set - (merge_with). The bunch pattern table is used to assign the pulse - id coordinates. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - mnemonic: str - mnemonic for FastADC or ADQ412, e.g. "I0_ILHraw" or "MCP3apd". - The data is either loaded from the DataCollection or taken from - merge_with. - merge_with: xarray Dataset - If provided, the resulting Dataset will be merged with this one. - bunchPattern: str or dict - 'sase1' or 'sase3' or 'scs_ppl', 'None': bunch pattern - integParams: dict - dictionnary for raw trace integration, e.g. - {'pulseStart':100, 'pulsestop':200, 'baseStart':50, - 'baseStop':99, 'period':24, 'npulses':500}. - If None, integration parameters are computed automatically. - keepAllSase: bool - Only relevant in case of sase-dedicated trains. If True, all - trains are kept, else only those of the bunchPattern are kept. - - Returns - ------- - xarray Dataset with digitizer peak variables. Raw variables are - substituted by the peak caclulated values (e.g. "FastADC2raw" becomes - "FastADC2peaks"). - """ - run_mnemonics = mnemonics_for_run(run) - if mnemonic not in run_mnemonics: - log.warning('Mnemonic not found in run. Skipping.') - return merge_with - if bool(merge_with) and mnemonic in merge_with: - for d in merge_with[mnemonic].dims: - if d in ['sa3_pId', 'ol_pId']: - log.warning(f'{mnemonic} already extracted. ' - 'Skipping.') - return merge_with - # check if bunch pattern table exists - bpt = None - if bool(merge_with) and 'bunchPatternTable' in merge_with: - bpt = merge_with['bunchPatternTable'] - log.debug('Using bpt from merge_with dataset.') - elif 'bunchPatternTable' in run_mnemonics: - m = run_mnemonics['bunchPatternTable'] - bpt = run.get_array(m['source'], m['key'], m['dim']) - log.debug('Loaded bpt from DataCollection.') - elif 'bunchPatternTable_SA3' in run_mnemonics: - m = run_mnemonics['bunchPatternTable_SA3'] - bpt = run.get_array(m['source'], m['key'], m['dim']) - log.debug('Loaded bpt from DataCollection.') - else: - log.warning('Could not load bunch pattern table.') - # prepare resulting dataset - if bool(merge_with): - mw_ds = merge_with.drop(mnemonic, errors='ignore') - else: - mw_ds = xr.Dataset() - # iterate over mnemonics and merge arrays in dataset - autoFind = True if integParams is None else False - m = run_mnemonics[mnemonic] - useRaw = True if 'raw' in mnemonic else False - if bool(merge_with) and mnemonic in merge_with: - data = merge_with[mnemonic] - else: - data = run.get_array(m['source'], m['key'], m['dim']) - peaks = get_peaks(run, data, mnemonic, - useRaw=useRaw, - autoFind=autoFind, - integParams=integParams, - bunchPattern=bunchPattern, - bpt=bpt) - name = mnemonic.replace('raw', 'peaks').replace('apd', 'peaks') - join = 'outer' if keepAllSase else 'inner' - ds = mw_ds.merge(peaks.rename(name), join=join) - return ds - - -def digitizer_signal_description(run, digitizer=None): - """ - Check for the existence of signal description and return all corresponding - channels in a dictionnary. - - Parameters - ---------- - run: extra_data.DataCollection - DataCollection containing the digitizer data. - digitizer: str or list of str (default=None) - Name of the digitizer: one in ['FastADC', FastADC2', - 'ADQ412', 'ADQ412_2] - If None, all digitizers are used - - Returns - ------- - signal_description: dictionnary containing the signal description of - the digitizer channels. - - - Example - ------- - import toolbox_scs as tb - run = tb.open_run(3481, 100) - signals = tb.digitizer_signal_description(run) - signals_fadc2 = tb.digitizer_signal_description(run, 'FastADC2') - """ - if digitizer not in [None, 'FastADC', 'FastADC2']: - raise ValueError('digitizer must be one in ' - '["FastADC", "FastADC2"]') - if digitizer is None: - digitizer = ['FastADC', 'FastADC2', 'ADQ412', 'ADQ412_2'] - else: - digitizer = [digitizer] - def key_fadc(i): - if i > 9: - return None - return f'channel_{i}.signalDescription.value' - def key_adq412(i): - if i > 3: - return None - return f'board1.channel_{i}.description.value' - - sources = {'FastADC': ['SCS_UTC1_MCP/ADC/1', key_fadc], - 'FastADC2': ['SCS_UTC2_FADC/ADC/1', key_fadc], - 'ADQ412': ['SCS_UTC1_ADQ/ADC/1', key_adq412], - 'ADQ412_2': ['SCS_UTC2_ADQ/ADC/1', key_adq412]} - res = {} - for d in digitizer: - if sources[d][0] not in run.all_sources: - continue - if sources[d][1](0) not in run.get_run_values(sources[d][0]): - raise ValueError('No signal description available for ' - f'{d}: {sources[d][0]}') - for ch in range(10): - val = sources[d][1](ch) - if val is None: - break - desc = run.get_run_value(sources[d][0], val) - res[f'{d}_Ch{ch}'] = desc - return res - - -def calibrateTIM(data, rollingWindow=200, mcp=1, plot=False, use_apd=True, intstart=None, - intstop=None, bkgstart=None, bkgstop=None, t_offset=None, npulses_apd=None): - ''' Calibrate TIM signal (Peak-integrated signal) to the slow ion signal of SCS_XGM - (photocurrent read by Keithley, channel 'pulseEnergy.photonFlux.value'). - The aim is to find F so that E_tim_peak[uJ] = F x TIM_peak. For this, we want to - match the SASE3-only average TIM pulse peak per train (TIM_avg) to the slow XGM - signal E_slow. - Since E_slow is the average energy per pulse over all SASE1 and SASE3 - pulses (N1 and N3), we first extract the relative contribution C of the SASE3 pulses - by looking at the pulse-resolved signals of the SA3_XGM in the tunnel. - There, the signal of SASE1 is usually strong enough to be above noise level. - Let TIM_avg be the average of the TIM pulses (SASE3 only). - The calibration factor is then defined as: F = E_slow * C * (N1+N3) / ( N3 * TIM_avg ). - If N3 changes during the run, we locate the indices for which N3 is maximum and define - a window where to apply calibration (indices start/stop). - - Warning: the calibration does not include the transmission by the KB mirrors! - - Inputs: - data: xarray Dataset - rollingWindow: length of running average to calculate TIM_avg - mcp: MCP channel - plot: boolean. If True, plot calibration results. - use_apd: boolean. If False, the TIM pulse peaks are extract from raw traces using - getTIMapd - intstart: trace index of integration start - intstop: trace index of integration stop - bkgstart: trace index of background start - bkgstop: trace index of background stop - t_offset: index separation between two pulses - npulses_apd: number of pulses - - Output: - F: float, TIM calibration factor. - - ''' - start = 0 - stop = None - npulses = data['npulses_sase3'] - ntrains = npulses.shape[0] - if not np.all(npulses == npulses[0]): - start = np.argmax(npulses.values) - stop = ntrains + np.argmax(npulses.values[::-1]) - 1 - if stop - start < rollingWindow: - print('not enough consecutive data points with the largest number of pulses per train') - start += rollingWindow - stop = np.min((ntrains, stop+rollingWindow)) - filteredTIM = getTIMapd(data, mcp, use_apd, intstart, intstop, bkgstart, bkgstop, t_offset, npulses_apd) - sa3contrib = saseContribution(data, 'sase3', 'XTD10_XGM') - avgFast = filteredTIM.mean(axis=1).rolling(trainId=rollingWindow).mean() - ratio = ((data['npulses_sase3']+data['npulses_sase1']) * - data['SCS_photonFlux'] * sa3contrib) / (avgFast*data['npulses_sase3']) - F = float(ratio[start:stop].median().values) - - if plot: - fig = plt.figure(figsize=(8,5)) - ax = plt.subplot(211) - ax.set_title('E[uJ] = {:2e} x TIM (MCP{})'.format(F, mcp)) - ax.plot(data['SCS_photonFlux'], label='SCS XGM slow (all SASE)', color='C0') - slow_avg_sase3 = data['SCS_photonFlux']*(data['npulses_sase1'] - +data['npulses_sase3'])*sa3contrib/data['npulses_sase3'] - ax.plot(slow_avg_sase3, label='SCS XGM slow (SASE3 only)', color='C1') - ax.plot(avgFast*F, label='Calibrated TIM rolling avg', color='C2') - ax.legend(loc='upper left', fontsize=8) - ax.set_ylabel('Energy [$\mu$J]', size=10) - ax.plot(filteredTIM.mean(axis=1)*F, label='Calibrated TIM train avg', alpha=0.2, color='C9') - ax.legend(loc='best', fontsize=8, ncol=2) - plt.xlabel('train in run') - - ax = plt.subplot(234) - xgm_fast = selectSASEinXGM(data) - ax.scatter(filteredTIM, xgm_fast, s=5, alpha=0.1, rasterized=True) - fit, cov = np.polyfit(filteredTIM.values.flatten(),xgm_fast.values.flatten(),1, cov=True) - y=np.poly1d(fit) - x=np.linspace(filteredTIM.min(), filteredTIM.max(), 10) - ax.plot(x, y(x), lw=2, color='r') - ax.set_ylabel('Raw HAMP [$\mu$J]', size=10) - ax.set_xlabel('TIM (MCP{}) signal'.format(mcp), size=10) - ax.annotate(s='y(x) = F x + A\n'+ - 'F = %.3e\n$\Delta$F/F = %.2e\n'%(fit[0],np.abs(np.sqrt(cov[0,0])/fit[0]))+ - 'A = %.3e'%fit[1], - xy=(0.5,0.6), xycoords='axes fraction', fontsize=10, color='r') - print('TIM calibration factor: %e'%(F)) - - ax = plt.subplot(235) - ax.hist(filteredTIM.values.flatten()*F, bins=50, rwidth=0.8) - ax.set_ylabel('number of pulses', size=10) - ax.set_xlabel('Pulse energy MCP{} [uJ]'.format(mcp), size=10) - ax.set_yscale('log') - - ax = plt.subplot(236) - if not use_apd: - pulseStart = intstart - pulseStop = intstop - else: - pulseStart = data.attrs['run'].get_array( - 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStart.value')[0].values - pulseStop = data.attrs['run'].get_array( - 'SCS_UTC1_ADQ/ADC/1', 'board1.apd.channel_0.pulseStop.value')[0].values - - if 'MCP{}raw'.format(mcp) not in data: - tid, data = data.attrs['run'].train_from_index(0) - trace = data['SCS_UTC1_ADQ/ADC/1:network']['digitizers.channel_1_D.raw.samples'] - print('no raw data for MCP{}. Loading trace from MCP1'.format(mcp)) - label_trace='MCP1 Voltage [V]' - else: - trace = data['MCP{}raw'.format(mcp)][0] - label_trace='MCP{} Voltage [V]'.format(mcp) - ax.plot(trace[:pulseStop+25], 'o-', ms=2, label='trace') - ax.axvspan(pulseStart, pulseStop, color='C2', alpha=0.2, label='APD region') - ax.axvline(pulseStart, color='gray', ls='--') - ax.axvline(pulseStop, color='gray', ls='--') - ax.set_xlim(pulseStart - 25, pulseStop + 25) - ax.set_ylabel(label_trace, size=10) - ax.set_xlabel('sample #', size=10) - ax.legend(fontsize=8) - - return F - - -''' TIM calibration table - Dict with key= photon energy and value= array of polynomial coefficients for each MCP (1,2,3). - The polynomials correspond to a fit of the logarithm of the calibration factor as a function - of MCP voltage. If P is a polynomial and V the MCP voltage, the calibration factor (in microjoule - per APD signal) is given by -exp(P(V)). - This table was generated from the calibration of March 2019, proposal 900074, semester 201930, - runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 - runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 - runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 -''' -tim_calibration_table = { - 705.5: np.array([ - [-6.85344690e-12, 5.00931986e-08, -1.27206912e-04, 1.15596821e-01, -3.15215367e+01], - [ 1.25613942e-11, -5.41566381e-08, 8.28161004e-05, -7.27230153e-02, 3.10984925e+01], - [ 1.14094964e-12, 7.72658935e-09, -4.27504907e-05, 4.07253378e-02, -7.00773062e+00]]), - 779: np.array([ - [ 4.57610777e-12, -2.33282497e-08, 4.65978738e-05, -6.43305156e-02, 3.73958623e+01], - [ 2.96325102e-11, -1.61393276e-07, 3.32600044e-04, -3.28468195e-01, 1.28328844e+02], - [ 1.14521506e-11, -5.81980336e-08, 1.12518434e-04, -1.19072484e-01, 5.37601559e+01]]), - 851: np.array([ - [ 3.15774215e-11, -1.71452934e-07, 3.50316512e-04, -3.40098861e-01, 1.31064501e+02], - [5.36341958e-11, -2.92533156e-07, 6.00574534e-04, -5.71083140e-01, 2.10547161e+02], - [ 3.69445588e-11, -1.97731342e-07, 3.98203522e-04, -3.78338599e-01, 1.41894119e+02]]) -} - - -def timFactorFromTable(voltage, photonEnergy, mcp=1): - ''' Returns an energy calibration factor for TIM integrated peak signal (APD) - according to calibration from March 2019, proposal 900074, semester 201930, - runs 69 - 111 (Ni edge): https://in.xfel.eu/elog/SCS+Beamline/2323 - runs 113 - 153 (Co edge): https://in.xfel.eu/elog/SCS+Beamline/2334 - runs 163 - 208 (Fe edge): https://in.xfel.eu/elog/SCS+Beamline/2349 - Uses the tim_calibration_table declared above. - - Inputs: - voltage: MCP voltage in volts. - photonEnergy: FEL photon energy in eV. Calibration factor is linearly - interpolated between the known values from the calibration table. - mcp: MCP channel (1, 2, or 3). - - Output: - f: calibration factor in microjoule per APD signal - ''' - energies = np.sort([key for key in tim_calibration_table]) - if photonEnergy not in energies: - if photonEnergy > energies.max(): - photonEnergy = energies.max() - elif photonEnergy < energies.min(): - photonEnergy = energies.min() - else: - idx = np.searchsorted(energies, photonEnergy) - 1 - polyA = np.poly1d(tim_calibration_table[energies[idx]][mcp-1]) - polyB = np.poly1d(tim_calibration_table[energies[idx+1]][mcp-1]) - fA = -np.exp(polyA(voltage)) - fB = -np.exp(polyB(voltage)) - f = fA + (fB-fA)/(energies[idx+1]-energies[idx])*(photonEnergy - energies[idx]) - return f - poly = np.poly1d(tim_calibration_table[photonEnergy][mcp-1]) - f = -np.exp(poly(voltage)) - return f - -############################################################################################# -############################################################################################# -############################################################################################# - -def extract_digitizer_peaks(proposal, runNB, mnemonic, bunchPattern=None, - integParams=None, save=False, - subdir='usr/processed_runs'): - """ - Extract the peaks from digitizer raw traces and saves them into a file. - The calculation is a trapezoidal integration between 'pulseStart' and - 'pulseStop' with subtraction of a baseline defined as the median between - 'baseStart' and 'baseStop'. - The integration parameters can either be provided using integParams, or - determined by a peak finding algorithm using autoFind. If the bunchPattern - argument is provided, the pulse ids are aligned to it. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing raw traces. Example: 'XRD_MCP_BIGraw' - bunchPattern: str - 'sase3' or 'scs_ppl'. If provided, checks the bunch pattern table using - Extra XrayPulses or OpticalPulses and aligns the pulse ids. - If None, the pulse ids are not aligned and indexed between 0 and the - number of pulses per train. - integParams: dict - dictionnary with keys ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop', 'period', 'npulses']. If provided, autoFind is set to False. - If bunchPattern is not None, 'period' and 'npulses' are adjusted to match - the bunch pattern and align the pulse ids. - save: bool - whether to save the result to a file or not. - subdir: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - """ - if integParams is None and autoFind is False: - log.warning('integParams not provided and autoFind is False. ' - 'Cannot compute peak integration parameters.') - return xr.DataArray() - - run = open_run(proposal, runNB) - run_mnemonics = mnemonics_for_run(run) - mnemo = run_mnemonics.get(mnemonic) - if mnemo is None: - log.warning('Mnemonic not found. Skipping.') - return xr.DataArray() - source, key = mnemo['source'], mnemo['key'] - extra_dim = {'sase3': 'sa3_pId', 'scs_ppl': 'ol_pId'}.get(bunchPattern) - if extra_dim is None: - extra_dim = 'pulseId' - digitizer = digitizer_type(run, source) - if digitizer == 'FastADC': - pulse_period = 24 - if digitizer == 'ADQ412': - pulse_period = 440 - pattern = None - try: - if bunchPattern == 'sase3': - pattern = XrayPulses(run) - if bunchPattern == 'scs_ppl': - pattern = OpticalLaserPulses(run) - except Exception as e: - log.warning(e) - ''' - pattern = None - try: - if bunchPattern == 'sase3': - pattern = XrayPulses(run) - if bunchPattern == 'scs_ppl': - pattern = OpticalLaserPulses(run) - except Exception as e: - log.warning(e) - bunchPattern = None - - # Extract pulse_ids, npulses and period from bunch pattern - pulse_ids, npulses, period = None, None, None - regular = True - if pattern is not None: - if pattern.is_constant_pattern() is False: - log.info('The number of pulses changed during the run.') - pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) - npulses, period = None, None - regular = False - else: - pulse_ids = pattern.peek_pulse_ids(labelled=False) - npulses = len(pulse_ids) - if npulses > 1: - periods = np.diff(pulse_ids) - if len(np.unique(periods)) > 1: - regular = False - else: - period = np.unique(periods)[0] * pulse_period - - # Use integration parameters, adjust them to match bunch pattern if necessary - if integParams is not None: - required_keys = ['pulseStart', 'pulseStop', 'baseStart', - 'baseStop'] - if bunchPattern is None: - required_keys += ['period', 'npulses'] - if not all(name in integParams for name in required_keys): - raise TypeError('All keys of integParams argument ' - f'{required_keys} are required.') - params = integParams.copy() - if pattern is not None: - if (npulses != params.get('npulses') or - period != params.get('period')): - log.warning(f'Integration parameters ' - f'(npulses={params.get("npulses")}, ' - f'period={params.get("period")}) do not match the ' - f'the bunch pattern (npulses={npulses}, ' - f'period={period}). Using bunch pattern parameters.') - params['npulses'] = npulses - params['period'] = period - else: - pulse_ids = np.arange(0, params['npulses'], params['period'], - dtype=np.uint64) - start = params['pulseStart'] - params['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period - for pid in pulse_ids] - ''' - - # load traces and calculate the average - traces = run[source, key].xarray(name=mnemonic, extra_dims=mnemo['dim']) - trace = traces.mean('trainId').rename(mnemonic.replace('raw', 'avg')) - - params, pulse_ids, regular, trace = find_peak_integration_parameters( - run, mnemonic, trace, integParams, pattern) - - ''' - # find peak integration parameters - if integParams is None: - params = find_integ_params(trace) - if (period is not None and params['period'] != period - or npulses is not None and params['npulses'] != npulses): - log.warning(f'Bunch pattern (npulses={npulses}, period={period}) and ' - f'found integration parameters (npulses={params["npulses"]}, ' - f'period={params["period"]}) do not match. Using bunch ' - 'pattern parameters.') - params["npulses"] = npulses - params["period"] = period - if pulse_ids is None: - pulse_ids = np.arange(params['npulses'], dtype=np.uint64) - start = params['pulseStart'] - params['pulseStart'] = [start + (pid - pulse_ids[0]) * pulse_period - for pid in pulse_ids] - ''' - - # extract peaks - data = peaks_from_raw_trace(traces, **params, extra_dim=extra_dim) - data = data.rename(mnemonic.replace('raw', 'peaks')) - - if regular is True: - data = data.assign_coords({extra_dim: pulse_ids}) - else: - mask = pattern.pulse_mask(labelled=False) - mask = xr.DataArray(mask, dims=['trainId', extra_dim], - coords={'trainId': run[source, key].train_ids, - extra_dim: np.arange(mask.shape[1])}) - mask = mask.sel({extra_dim: pulse_ids}) - data = data.where(mask, drop=True) - - data.attrs['params_keys'] = list(params.keys()) - if regular: - params['pulseStart'] = params['pulseStart'][0] - for p in params: - if params[p] is None: - params[p] = 'None' - data.attrs[f'params_{data.name}'] = list(params.values()) - if save: - save_peaks(proposal, runNB, data, trace, subdir) - return data - - -def save_peaks(proposal, runNB, peaks, avg_trace, subdir): - ''' - Save the peaks extracted with extract_digitizer_peaks() as well as the - average raw trace in a dataset at the proposal + subdir location. - If a dataset already exists, the new data is merged with it. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - peaks: xarray DataArray - the 2D-array obtained by extract_digitizer_peaks() - avg_trace: xarray DataArray - the average raw trace over the trains - subdir: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - - Returns - ------- - None - ''' - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, subdir + f'/r{runNB:04d}/') - os.makedirs(path, exist_ok=True) - fname = path + f'r{runNB:04d}-digitizers-data.h5' - ds_peaks = peaks.to_dataset(promote_attrs=True) - - if os.path.isfile(fname): - ds = xr.load_dataset(fname) - ds = ds.drop_vars([peaks.name, avg_trace.name], errors='ignore') - for dim in ds.dims: - if all(dim not in ds[v].dims for v in ds): - ds = ds.drop_dims(dim) - dim_name = 'sampleId' - if 'sampleId' in ds.dims and ds.sizes['sampleId'] != len(avg_trace): - dim_name = 'sampleId2' - avg_trace = avg_trace.rename({avg_trace.dims[0]: dim_name}) - if f'params_{peaks.name}' in ds.attrs: - del ds.attrs[f'params_{peaks.name}'] - ds = xr.merge([ds, ds_peaks, avg_trace], - combine_attrs='drop_conflicts', join='inner') - else: - ds = ds_peaks.merge(avg_trace.rename({avg_trace.dims[0]: 'sampleId'})) - ds.to_netcdf(fname, format='NETCDF4') - print(f'saved data into {fname}.') - return None - - -def load_processed_peaks(proposal, runNB, mnemonic=None, - data='usr/processed_runs', merge_with=None): - """ - Load processed digitizer peaks data. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. - If None, the entire dataset is loaded - data: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - merge_with: xarray Dataset - A dataset to merge the data with. - - Returns - ------- - xarray DataArray if menmonic is not None and merge_with is None - xarray Dataset if mnemonic is None or merge_with is not None. - - Example - ------- - # load the mono energy and the MCP_BIG peaks - run, ds = tb.load(proposal, runNB, 'nrj') - ds = tb.load_processed_peaks(proposal, runNB,'XRD_MCP_BIGpeaks', merge_with=ds) - """ - if mnemonic is None: - return load_all_processed_peaks(proposal, runNB, data, merge_with) - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, data + f'/r{runNB:04d}/') - fname = path + f'r{runNB:04d}-digitizers-data.h5' - if os.path.isfile(fname): - ds = xr.load_dataset(fname) - if mnemonic not in ds: - print(f'Mnemonic {mnemonic} not found in {fname}') - return {} - da = ds[mnemonic] - da.attrs['params_keys'] = ds.attrs['params_keys'] - da.attrs[f'params_{mnemonic}'] = ds.attrs[f'params_{mnemonic}'] - if merge_with is not None: - return merge_with.merge(da.to_dataset(promote_attrs=True), - combine_attrs='drop_conflicts', join='inner') - else: - return da - else: - print(f'Mnemonic {mnemonic} not found in {fname}') - return merge_with - - -def load_all_processed_peaks(proposal, runNB, data='usr/processed_runs', - merge_with=None): - """ - Load processed digitizer peaks dataset. The data contains the peaks, - the average raw trace and the integration parameters (attribute) of - each processed digitizer source. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - data: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - merge_with: xarray Dataset - A dataset to merge the data with. - - Returns - ------- - xarray Dataset - """ - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, data + f'/r{runNB:04d}/') - fname = path + f'r{runNB:04d}-digitizers-data.h5' - if os.path.isfile(fname): - if merge_with is not None: - return merge_with.merge(xr.load_dataset(fname), - combine_attrs='drop_conflicts', join='inner') - return xr.load_dataset(fname) - else: - print(f'{fname} not found.') - return merge_with - - -def check_processed_peak_params(proposal, runNB, mnemonic, data='usr/processed_runs', - plot=True, show_all=False): - """ - Check the integration parameters used to generate the processed peak values. - - Parameters - ---------- - proposal: int - the proposal number - runNB: int - the run number - mnemonic: str - the mnemonic containing peaks. Example: 'XRD_MCP_BIGpeaks'. - If None, the entire dataset is loaded - data: str - subdirectory. The data is stored in - <proposal path>/<subdir>/r{runNB:04d}/f'r{runNB:04d}-digitizers-data.h5' - plot: bool - If True, displays the raw trace and peak integration regions. - show_all: bool - If True, displays the entire raw trace and all peak integration - regions (this can be time-consuming). - If False, shows the first and last pulses. - - Returns - ------- - params: dict - the integration parameters with keys ['pulseStart', 'pulseStop', - 'baseStart', 'baseStop', 'period', 'npulses']. - See `extract_digitizer_peaks()`. - """ - root = find_proposal(f'p{proposal:06d}') - path = os.path.join(root, data + f'/r{runNB:04d}/') - fname = path + f'r{runNB:04d}-digitizers-data.h5' - if os.path.isfile(fname): - ds = xr.load_dataset(fname) - if mnemonic.replace('raw', 'peaks') not in ds: - print(f'Mnemonic {mnemonic} not found in {fname}') - return {} - da = ds[mnemonic] - params = dict(zip(ds.attrs['params_keys'], ds.attrs[f'params_{mnemonic}'])) - if plot: - plotPeakIntegrationWindow(ds[mnemonic.replace('peaks', 'avg')], - params, show_all=show_all) - return params - else: - print(f'{fname} not found.') - return {} \ No newline at end of file