Skip to content
Snippets Groups Projects
lib2828.py 5.96 KiB
Newer Older
Lars Funke's avatar
Lars Funke committed
from euxfel_bunch_pattern import indices_at_sase  # Installable from PyPI
import numpy as np
import string
from scipy.interpolate import interp1d
from pathlib import Path

Lars Funke's avatar
Lars Funke committed

# List of detector infos from external txt file:
# name, digitizer, channel, angle, sample_rate
detectors = np.genfromtxt(Path(__file__).parent / 'detectors.txt',
Lars Funke's avatar
Lars Funke committed
                          names=True, dtype=('|U5', '|U4', '|U3', '<f8', '<i4'))
Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
def correct_adq_common_mode(trace, region=np.s_[1000:], sym=8):
Lars Funke's avatar
Lars Funke committed
    """Baseline subtraction based on common mode.

Lars Funke's avatar
Lars Funke committed
    Since ADQ digitizers always interleave multiple ADCs per channel to sample
Lars Funke's avatar
Lars Funke committed
    a trace, regular baseline subtraction will cause an ADC-dependent common
Lars Funke's avatar
Lars Funke committed
    mode to appear. This correction directly uses a per-ADC baseline instead
Lars Funke's avatar
Lars Funke committed
    to perform include this in the substraction.

Lars Funke's avatar
Lars Funke committed
    Empirical testing has shown that a symmetry of 8 (should be 4 for
    non-interleaved) regularly shows better results in suppressing high
    frequency signals from the common mode. Going 16 or higher is not recommend
Lars Funke's avatar
Lars Funke committed
    in most cases, as additional artifacts appear.

Lars Funke's avatar
Lars Funke committed
    Parameters
    ----------
Lars Funke's avatar
Lars Funke committed
    trace : array_like
        Vector to correct, will be modified in-place.
Lars Funke's avatar
Lars Funke committed
    region : slice, optional
Lars Funke's avatar
Lars Funke committed
        Region to use for computing baseline.
Lars Funke's avatar
Lars Funke committed
    sym : int, optional
Lars Funke's avatar
Lars Funke committed
        Periodic symmetry of ADC common mode.

Lars Funke's avatar
Lars Funke committed
    Returns
    -------
Lars Funke's avatar
Lars Funke committed
    trace : array_like
        Corrected vector, same shape as trace.
Lars Funke's avatar
Lars Funke committed
    """
Lars Funke's avatar
Lars Funke committed
    trace = trace.astype(np.float32)
Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
    for x in range(sym):
Lars Funke's avatar
Lars Funke committed
        trace[..., x::sym] -= trace[..., region][..., x::sym].mean()
Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
    return trace

Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
def separate_pulses(traces, ppt, adq_train_region=np.s_[2200:], adq_pulse_region=np.s_[:2000], adq_sample_rate=4, sase=3):
Lars Funke's avatar
Lars Funke committed
    """Separate train into seperate pulses using the pulse pattern table.

Lars Funke's avatar
Lars Funke committed
    Parameters
    ----------
Lars Funke's avatar
Lars Funke committed
    traces : array_like
        Array of traces to be split up
    ppt : array_like
        Pulse pattern table from time server device.
    adq_train_region : slice, optional
        Region containing actual signal for the entire train.
    adq_pulse_region : slice, optional
        Region after pulse separation containing actual signal for the pulse.
    adq_pulse_region : slice, optional
        Region after pulse separation containing actual signal for the pulse.
    adq_sample_rate : array_like or int, optional
        Sample rate for all digitizer channels in GS/s.
    sase : int, optional
        Which SASE the experiments run at, 3 by default.

Lars Funke's avatar
Lars Funke committed
    Returns
    -------
Lars Funke's avatar
Lars Funke committed
    traces : array_like
        Separated vector of trains.
Lars Funke's avatar
Lars Funke committed
    """
    traces = traces[..., adq_train_region]
Lars Funke's avatar
Lars Funke committed
    pulse_ids = indices_at_sase(ppt, sase=sase)
    num_pulses = len(pulse_ids)
Lars Funke's avatar
Lars Funke committed
    if num_pulses < 2:
        return traces[..., adq_pulse_region].reshape(traces.shape[0], 1, -1)
Lars Funke's avatar
Lars Funke committed
    else:
Lars Funke's avatar
Lars Funke committed
        # ADQ412s run at a clock factor of 440 relative to the PPT un-interleaved.
Lars Funke's avatar
Lars Funke committed
        pulse_spacing = (pulse_ids[1] - pulse_ids[0]) * 220 * adq_sample_rate
Lars Funke's avatar
Lars Funke committed
        traces_aligned = traces[..., :(num_pulses * pulse_spacing)]
        assert num_pulses * pulse_spacing == traces_aligned.shape[-1] # digitizer traces are too short for pulse pattern?
Lars Funke's avatar
Lars Funke committed
        return traces_aligned.reshape(*(traces_aligned.shape[:-1]), num_pulses, pulse_spacing)[..., adq_pulse_region]
Lars Funke's avatar
Lars Funke committed
# Polynomial computation using Horner's scheme
'''def f(t, *coeffs):
    x = 1/t
    result = 0
    for coeff in coeffs:
        result = x * result + coeff
    return result

def df(t, *coeffs):
    x = 1/t
    c = 1 - len(coeffs)
    result = 0
    for idx, coeff in enumerate(coeffs):
        fac = c + idx
        result = x * result + coeff * fac
    return result * x'''

# Old version
def f(t, a, b):
    return a * t + b

def df(t, a, b):
    return 0 * t + a
Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
def energy_calibration(traces, calib_params, sample_rate, valid_energies,
Lars Funke's avatar
Lars Funke committed
                       energy_nodes=None, model=f, model_derivative=df):
    """Calibrate time of flight traces to energy spectra using given coefficients.
Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
    Parameters
    ----------
Lars Funke's avatar
Lars Funke committed
    traces : array_like
        Traces to be calibrated, shape (detectors, ..., samples).
    calib_params : array_like
        Calibration parameters passed to the model, first dimension needs
        to be number of function arguments.
    sample_rate : array_like or int, optional
        Sample rate for all digitizer channels in GS/s.
    valid_energies : slice
        Slice applied to input allowing to discard divergent values.
    energy_nodes : array_like, optional
        Energy values to be evaluated in interpolation, if you want.
Lars Funke's avatar
Lars Funke committed
    model : function or int, optional
        Mapping from time to energy with arguments (t, *args),
        or an integer defining the polynomial order.
        Defaults to a polynomial with the parameter inversed.
Lars Funke's avatar
Lars Funke committed
    model_derivative : function, optional
Lars Funke's avatar
Lars Funke committed
        Derivative of `model' function.
Lars Funke's avatar
Lars Funke committed

Lars Funke's avatar
Lars Funke committed
    Returns
    -------
Lars Funke's avatar
Lars Funke committed
    energy : ndarray
        Energy values corresponding to time of flight.
    spectra : ndarray
        Reweighted spectra.
    resampled_spectra : ndarray
        Spectra resampled according to `energy_nodes`.
    time : ndarray
        Times of flight corresponding to samples.
Lars Funke's avatar
Lars Funke committed
    """
Lars Funke's avatar
Lars Funke committed
    # Convert digitizer channels into ToF [ns]
Lars Funke's avatar
Lars Funke committed
    time = np.arange(traces.shape[-1])[:, np.newaxis] / sample_rate
    
Lars Funke's avatar
Lars Funke committed
    # Calibrate ToF to energy
Lars Funke's avatar
Lars Funke committed
    energy = f(time, *calib_params['energy'].T)[valid_energies].T
    dE = df(time, *calib_params['energy'].T)[valid_energies].T
    spectra = traces[..., valid_energies] / dE[:, None]
    spectra = np.clip(spectra, 0, None)
                    
    if energy_nodes is not None:
Lars Funke's avatar
Lars Funke committed
        resampled_spectra = []
        for e, t, en in zip(energy, spectra, calib_params['enabled']):
            if en:
                interp = interp1d(e, t)(energy_nodes)
            else:
                newsh = list(spectra.shape)[1:]
                newsh[-1] = energy_nodes.shape[0]
                interp = np.full(newsh, np.nan)
            resampled_spectra.append(interp)
    else:
        resampled_spectra = None
Lars Funke's avatar
Lars Funke committed
    return energy, spectra, np.array(resampled_spectra), time[valid_energies]