Skip to content
Snippets Groups Projects
lib2828.py 5.08 KiB
Newer Older
Lars Funke's avatar
Lars Funke committed
from euxfel_bunch_pattern import indices_at_sase  # Installable from PyPI
import numpy as np
import string
from scipy.interpolate import interp1d
from pathlib import Path

detectors = np.genfromtxt(Path(__file__).parent / 'detectors.txt', 
                          names=True, dtype=('|U5', '|U4', '|U3', '<f8', '<f8'))
Lars Funke's avatar
Lars Funke committed

def correct_adq_common_mode(trace, region, sym):
    """Baseline substraction based on common mode.
    
    Since ADQ digitizers always interleave multiple ADCs per channel to sample
    a trace, regular baseline substraction will cause an ADC-dependant common
    mode to appear. This correction directly uses a per-ADC baseline instead
    to perform include this in the substraction. 
    
    Empirical testing has shown that a symmetry of 8 (should be 4 for
    non-interleaved) regularly shows better results in suppressing high
    frequency signals from the common mode. Going 16 or higher is not recommend
    in most cases, as additional artifacts appear. 
    
Lars Funke's avatar
Lars Funke committed
    Parameters
    ----------
        trace : array_like
            Vector to correct, will be modified in-place.
        region : slice
            Region to use for computing baseline.
        sym : int
            Periodic symmetry of ADC common mode.
Lars Funke's avatar
Lars Funke committed
    Returns
    -------
        trace : array_like
            Corrected vector, same shape as trace.
Lars Funke's avatar
Lars Funke committed
    """
Lars Funke's avatar
Lars Funke committed
    trace = trace.astype(np.float32)
Lars Funke's avatar
Lars Funke committed
    
    for x in range(sym):
Lars Funke's avatar
Lars Funke committed
        trace[x::sym] -= trace[region][x::sym].mean()
Lars Funke's avatar
Lars Funke committed
        
    return trace

Lars Funke's avatar
Lars Funke committed
def separate_pulses(traces, ppt, adq_train_region, adq_pulse_region, adq_sample_rate=4, sase=3):
    """Separate train into seperate pulses using the pulse pattern table
Lars Funke's avatar
Lars Funke committed
    Parameters
    ----------
        traces : array_like
Lars Funke's avatar
Lars Funke committed
            Array of traces to be split up
        ppt : array_like
            Pulse pattern table from time server device.
        adq_train_region : slice, optional
            Region containing actual signal for the entire train.
        adq_pulse_region : slice, optional
            Region after pulse separation containing actual signal for the pulse.
        adq_pulse_region : slice, optional
            Region after pulse separation containing actual signal for the pulse.
        adq_sample_rate : array_like or int, optional
            Sample rate for all digitizer channels in GS/s.
        sase : int, optional
            Which SASE the experiments run at, 3 by default.
Lars Funke's avatar
Lars Funke committed
    Returns
    -------
        (array_like) Separated vector.
Lars Funke's avatar
Lars Funke committed
    """
Lars Funke's avatar
Lars Funke committed
    trace = traces[:, adq_train_region]
Lars Funke's avatar
Lars Funke committed
    pulse_ids = indices_at_sase(ppt, sase=sase)
    num_pulses = len(pulse_ids)
Lars Funke's avatar
Lars Funke committed
    adq_sample_rate = np.asarray(adq_sample_rate)
    if num_pulses < 2:
        return trace[:, adq_pulse_region].reshape(trace.shape[0], 1, -1)
    else:
Lars Funke's avatar
Lars Funke committed
        # ADQ412s run at a clock factor of 440 relative to the PPT un-interleaved.
Lars Funke's avatar
Lars Funke committed
        pulse_spacing = (pulse_ids[1] - pulse_ids[0]) * 220 * adq_sample_rate
        return traces[:, :(len(trace) // pulse_spacing) * pulse_spacing].reshape(
            -1, pulse_spacing)[..., adq_pulse_region]
Lars Funke's avatar
Lars Funke committed
def _f(t, a, b, c, d):
Lars Funke's avatar
Lars Funke committed
        return a / t**3 + b / t**2 + c / t + d
Lars Funke's avatar
Lars Funke committed

def _df(t, a, b, c, d):
Lars Funke's avatar
Lars Funke committed
        return -3 * a / t**4 - 2 * b / t**3 - c / t**2
Lars Funke's avatar
Lars Funke committed
def energy_calibration(traces, calib_params, sample_rate, valid_energies, 
                       energy_nodes=None, model=_f, model_derivative=_df):
    """Calibrate time of flight traces to energy spectra
    
    Parameters
    ----------
        traces : array_like
            Traces to be calibrated
        calib_params : array_like
            Calibration parameters passed to the model, first dimension needs 
            to be number of function arguments
        sample_rate : array_like or int, optional
            Sample rate for all digitizer channels in GS/s.
        energy_nodes : array_like
            Energy values to be evaluated in interpolation
        valid_energies :slice
            Slice applied to input allowing to discard divergent values
        model : function
            Mapping from time to energy. Arguments (t, *args)
        model_derivative : function
            Derivative of `model`
Lars Funke's avatar
Lars Funke committed
        
    Returns
    -------
        energy : ndarray
            Energy values corresponding to time of flight
        spectra : ndarray
            Reweighted spectra
        resampled_spectra : ndarray
            Spectra resampled according to `energy_nodes`
        time : ndarray
            Times of flight corresponding to samples
Lars Funke's avatar
Lars Funke committed
    """
    time = np.arange(traces.shape[2])[:, np.newaxis] / sample_rate
    energy = model(1e-9 * time, *(calib_params.T))[valid_energies].T
    dE = model_derivative(1e-9 * time[:, None], *(calib_params.T))[valid_energies].T
    spectra = traces[:,:,valid_energies] / dE
        
    try:
        resampled_spectra = (np.asarray([interp1d(e, t)(energy_nodes) 
                             for e, t in zip(energy, spectra)])) \
            if energy_nodes is not None else None
    except ValueError as e:
        print(e)
        resampled_spectra = np.full((*traces.shape[:2], len(energy_nodes)), np.nan)
    return energy, spectra, resampled_spectra, time[valid_energies]