From a66ba964cb92231b5391b1a45315ca731d8f94b0 Mon Sep 17 00:00:00 2001 From: Laurent Mercadier <laurent.mercadier@xfel.eu> Date: Sat, 5 Oct 2024 16:26:08 +0200 Subject: [PATCH] Add extract_pes_spectra() and get_pes_rv() functions --- src/toolbox_scs/detectors/pes.py | 145 +++++++++++++++++++++++++++++-- 1 file changed, 139 insertions(+), 6 deletions(-) diff --git a/src/toolbox_scs/detectors/pes.py b/src/toolbox_scs/detectors/pes.py index c8cbbbb..be5ee30 100644 --- a/src/toolbox_scs/detectors/pes.py +++ b/src/toolbox_scs/detectors/pes.py @@ -11,6 +11,7 @@ import numpy as np import xarray as xr import extra_data as ed import re +from extra.components import XrayPulses, AdqRawChannel from ..misc.bunch_pattern_external import is_sase_3 from ..mnemonics_machinery import (mnemonics_to_process, @@ -20,12 +21,110 @@ from ..constants import mnemonics as _mnemonics __all__ = [ 'get_pes_params', 'get_pes_tof', + 'extract_pes_spectra', ] log = logging.getLogger(__name__) +def extract_pes_spectra(proposal, runNB, mnemonic, + start=0, origin=None, width=None): + """ + Extracts time-of-flight spectra from raw digitizer traces. The spectra + are aligned by pulse Id using the SASE 3 bunch pattern, and have time + coordinates in nanoseconds. + + Parameters + ---------- + proposal:int + The proposal number. + runNB: int + The run number. + mnemonic: str + mnemonic for PES, e.g. "PES_2Araw". + start: int + starting sample of the first spectrum in the raw trace. + origin: int + sample of the raw trace that corresponds to time-of-flight origin. + Used to compute the 'time_ns' coordinates. + If None, computation of 'time_ns' is skipped. + width: int + number of samples per spectra. If None, the number of samples + for 4.5 MHz repetition rate is used. + + Returns + ------- + pes: xarray DataArray + DataArray containing the PES time-of-flight spectra. + + Example + ------- + >>> import toolbox_scs as tb + >>> import toolbox_scs.detectors as tbdet + >>> proposal, runNB = 900447, 12 + >>> pes = tbdet.get_pes_tof(proposal, runNB, 'PES_2Araw') + """ + run = ed.open_run(proposal, runNB) + all_mnemonics = mnemonics_for_run(run) + mnemo = all_mnemonics.get(mnemonic) + if mnemo is None: + print(f'Mnemonic {mnemonic} not found in run. Skipping.') + return None + PULSE_PERIOD = 440 + pattern = XrayPulses(run) + regular = True + if pattern.is_constant_pattern() is False: + print('The number of pulses changed during the run.') + pulse_ids = np.unique(pattern.pulse_ids(labelled=False, copy=False)) + regular = False + else: + pulse_ids = pattern.peek_pulse_ids(labelled=False) + npulses = len(pulse_ids) + period = 1 + if npulses > 1: + periods = np.diff(pulse_ids) + if len(np.unique(periods)) > 1: + regular = False + period = min(periods) + npulses = int((max(pulse_ids) - min(pulse_ids)) / period) + 1 + period *= PULSE_PERIOD + kd = run[mnemo['source'], mnemo['key']] + nsamples = kd.shape[1] + npulses_trace = int(np.floor((nsamples - start) / period)) + if npulses_trace < npulses: + log.warning(f'The digitizer only recorded {npulses_trace} pulses ' + f'out of the {npulses} pulses in the train.') + else: + npulses_trace = npulses + indices = np.array([start + i*period for i in range(npulses_trace + 1)]) + + outp = kd.ndarray() + outp = outp[:, indices[0]:indices[-1]].reshape([outp.shape[0], + npulses_trace, period]) + spectra = xr.DataArray(outp, dims=['trainId', 'sa3_pId', 'sampleId'], + coords={'trainId': kd.train_id_coordinates(), + 'sa3_pId': pulse_ids[:npulses_trace], + 'sampleId': np.arange(period)}) + if width is None: + width = PULSE_PERIOD + spectra = spectra.isel(sampleId=slice(0, width), drop=True) + + spectra.attrs['start'] = start + if origin is not None: + try: + adq = AdqRawChannel(run, mnemonic.split('_')[1].split('raw')[0]) + sample_rate = adq.sampling_rate + except Exception as e: + log.warning(e) + sample_rate = 1986111111.1111112 + time_ns = (np.arange(spectra.sizes['sampleId']) + - origin)/sample_rate * 1e9 + spectra = spectra.assign_coords(time_ns=('sampleId', time_ns)) + spectra.attrs['origin'] = origin + return spectra.rename(mnemonic.replace('raw', 'spectrum')) + + def get_pes_tof(run, mnemonics=None, merge_with=None, start=31390, width=300, origin=None, width_ns=None, subtract_baseline=True, @@ -153,7 +252,7 @@ def get_pes_tof(run, mnemonics=None, merge_with=None, return ds -def get_pes_params(run): +def get_pes_params(run, channel=None): """ Extract PES parameters for a given extra_data DataCollection. Parameters are gas, binding energy, voltages of the MPOD. @@ -162,7 +261,10 @@ def get_pes_params(run): ---------- run: extra_data.DataCollection DataCollection containing the digitizer data - + channel: str + Channel name, e.g. '2A'. If None, or if the channel is not + found in the data, the retardation voltage for all channels is + retrieved. Returns ------- params: dict @@ -181,11 +283,39 @@ def get_pes_params(run): if 'gas' not in params: params['gas'] = 'unknown' log.warning('Could not find which PES gas was used.') - voltages = get_pes_voltages(run) - params.update(voltages) + mpod_mapper = 'SA3_XTD10_PES/MDL/MPOD_MAPPER' + if mpod_mapper in run.all_sources: + if channel is None: + channel = [f'{a//4 + 1}{b}' for a, b in enumerate( + ['A', 'B', 'C', 'D']*4)] + else: + channel = [channel] + for c in channel: + rv = get_pes_rv(run, c, mpod_mapper) + if rv is not None: + params[f'{c}_RV'] = rv + elif 'SA3_XTD10_PES/MDL/DAQ_MPOD' in run.all_sources: + voltages = get_pes_voltages(run) + if voltages is not None: + params.update(voltages) + return params +def get_pes_rv(run, channel, mpod_mapper='SA3_XTD10_PES/MDL/MPOD_MAPPER'): + channel_to_group = {'4D': 1, '4B': 1, '4C': 1, '4A': 1, + '2A': 2, '1C': 2, '2C': 2, '2D': 2, + '3D': 3, '3B': 3, '2B': 3, '3A': 3, + '1A': 4, '3C': 4, '1B': 4, '1D': 4} + group = channel_to_group.get(channel) + if group is None: + log.warning(f'Could not find group for channel {channel}.') + return None + key = f'Group{group}D.channelMeasurementSenseVoltage.value' + voltage = run[mpod_mapper, key].ndarray().mean() + return voltage + + def get_pes_voltages(run, device='SA3_XTD10_PES/MDL/DAQ_MPOD'): """ Extract PES voltages read by the MDL watchdog of the MPOD device. @@ -200,9 +330,12 @@ def get_pes_voltages(run, device='SA3_XTD10_PES/MDL/DAQ_MPOD'): Returns ------- voltages: dict - dictionnary of voltages + dictionnary of voltages, empty if device not found. """ - a = re.compile('[u]\d{3}.value') + if device not in run.all_sources: + log.warning(f'Could not find {device} in run. Skipping.') + return None + a = re.compile("[u]\d{3}.value") tid, da = run.train_from_index(0, devices=device) voltages = {} for k in da[device]: -- GitLab