from euxfel_bunch_pattern import indices_at_sase # Installable from PyPI import numpy as np import string from scipy.interpolate import interp1d from pathlib import Path # List of detector infos from external txt file: # name, digitizer, channel, angle, sample_rate detectors = np.genfromtxt(Path(__file__).parent / 'detectors.txt', names=True, dtype=('|U5', '|U4', '|U3', '<f8', '<i4')) def correct_adq_common_mode(trace, region=np.s_[1000:], sym=8): """Baseline subtraction based on common mode. Since ADQ digitizers always interleave multiple ADCs per channel to sample a trace, regular baseline subtraction will cause an ADC-dependent common mode to appear. This correction directly uses a per-ADC baseline instead to perform include this in the substraction. Empirical testing has shown that a symmetry of 8 (should be 4 for non-interleaved) regularly shows better results in suppressing high frequency signals from the common mode. Going 16 or higher is not recommend in most cases, as additional artifacts appear. Parameters ---------- trace : array_like Vector to correct, will be modified in-place. region : slice, optional Region to use for computing baseline. sym : int, optional Periodic symmetry of ADC common mode. Returns ------- trace : array_like Corrected vector, same shape as trace. """ trace = trace.astype(np.float32) for x in range(sym): trace[..., x::sym] -= trace[..., region][..., x::sym].mean() return trace def separate_pulses(traces, ppt, adq_train_region=np.s_[2200:], adq_pulse_region=np.s_[:2000], adq_sample_rate=4, sase=3): """Separate train into seperate pulses using the pulse pattern table. Parameters ---------- traces : array_like Array of traces to be split up ppt : array_like Pulse pattern table from time server device. adq_train_region : slice, optional Region containing actual signal for the entire train. adq_pulse_region : slice, optional Region after pulse separation containing actual signal for the pulse. adq_pulse_region : slice, optional Region after pulse separation containing actual signal for the pulse. adq_sample_rate : array_like or int, optional Sample rate for all digitizer channels in GS/s. sase : int, optional Which SASE the experiments run at, 3 by default. Returns ------- traces : array_like Separated vector of trains. """ traces = traces[..., adq_train_region] pulse_ids = indices_at_sase(ppt, sase=sase) num_pulses = len(pulse_ids) if num_pulses < 2: return traces[..., adq_pulse_region].reshape(traces.shape[0], 1, -1) else: # ADQ412s run at a clock factor of 440 relative to the PPT un-interleaved. pulse_spacing = (pulse_ids[1] - pulse_ids[0]) * 220 * adq_sample_rate traces_aligned = traces[..., :(num_pulses * pulse_spacing)] assert num_pulses * pulse_spacing == traces_aligned.shape[-1] # digitizer traces are too short for pulse pattern? return traces_aligned.reshape(*(traces_aligned.shape[:-1]), num_pulses, pulse_spacing)[..., adq_pulse_region] # Polynomial computation using Horner's scheme '''def f(t, *coeffs): x = 1/t result = 0 for coeff in coeffs: result = x * result + coeff return result def df(t, *coeffs): x = 1/t c = 1 - len(coeffs) result = 0 for idx, coeff in enumerate(coeffs): fac = c + idx result = x * result + coeff * fac return result * x''' # Old version def f(t, a, b): return a * t + b def df(t, a, b): return 0 * t + a def energy_calibration(traces, calib_params, sample_rate, valid_energies, energy_nodes=None, model=f, model_derivative=df): """Calibrate time of flight traces to energy spectra using given coefficients. Parameters ---------- traces : array_like Traces to be calibrated, shape (detectors, ..., samples). calib_params : array_like Calibration parameters passed to the model, first dimension needs to be number of function arguments. sample_rate : array_like or int, optional Sample rate for all digitizer channels in GS/s. valid_energies : slice Slice applied to input allowing to discard divergent values. energy_nodes : array_like, optional Energy values to be evaluated in interpolation, if you want. model : function or int, optional Mapping from time to energy with arguments (t, *args), or an integer defining the polynomial order. Defaults to a polynomial with the parameter inversed. model_derivative : function, optional Derivative of `model' function. Returns ------- energy : ndarray Energy values corresponding to time of flight. spectra : ndarray Reweighted spectra. resampled_spectra : ndarray Spectra resampled according to `energy_nodes`. time : ndarray Times of flight corresponding to samples. """ # Convert digitizer channels into ToF [ns] time = np.arange(traces.shape[-1])[:, np.newaxis] / sample_rate # Calibrate ToF to energy energy = f(time, *calib_params['energy'].T)[valid_energies].T dE = df(time, *calib_params['energy'].T)[valid_energies].T spectra = traces[..., valid_energies] / dE[:, None] spectra = np.clip(spectra, 0, None) if energy_nodes is not None: resampled_spectra = [] for e, t, en in zip(energy, spectra, calib_params['enabled']): if en: interp = interp1d(e, t)(energy_nodes) else: newsh = list(spectra.shape)[1:] newsh[-1] = energy_nodes.shape[0] interp = np.full(newsh, np.nan) resampled_spectra.append(interp) else: resampled_spectra = None return energy, spectra, np.array(resampled_spectra), time[valid_energies]