Skip to content
Snippets Groups Projects
dssclib.py 5.29 KiB
import binascii
import os
import re
import struct
from hashlib import blake2b
from typing import Dict, Tuple

import h5py
import numpy as np


def get_num_cells(fname, h5path):

    with h5py.File(fname, "r") as f:
        cells = f[f"{h5path}/cellId"][()]
        if cells == []:
            return
        maxcell = np.max(cells)
        return maxcell+1


def get_pulseid_checksum(fname, h5path, h5path_idx):
    """generates hash value from pulse pattern (veto defined)."""
    with h5py.File(fname, "r") as infile:
        count = np.squeeze(infile[f"{h5path_idx}/count"])
        first = np.squeeze(infile[f"{h5path_idx}/first"])
        last_index = int(first[count != 0][-1] + count[count != 0][-1])
        first_index = int(first[count != 0][0])
        pulseids = infile[f"{h5path}/pulseId"][first_index:
                                               int(first[count != 0][1])]
        bveto = blake2b(pulseids.data, digest_size=8)
        pulseid_checksum = struct.unpack(
            'd', binascii.unhexlify(bveto.hexdigest()))[0]
        return pulseid_checksum


def _extr_gainparam_conffilename(fileName: str) -> Tuple[int]:
    """extracts target gain from config filename, if provided."""
    vals = re.search(".*_TG(?P<TG>\\d+.?\\d+)", fileName)
    if vals:
        return vals.group('TG')


def _get_gain_encoded_val(gainSettingsMap: Dict[str, int]) -> int:
    """The description of the paramters one can find in:
    https://docs.xfel.eu/share/page/site/dssc/documentlibrary
    Documents> DSSC - Documentation> DSSC - ASIC Documentation.
    """
    result = np.uint64(0)
    result += (np.int64(gainSettingsMap['csaFbCap']) +
               np.int64(gainSettingsMap['csaResistor'] << 8) +
               np.int64(gainSettingsMap['fcfEnCap'] << 16) +
               np.int64(gainSettingsMap['trimmed'] << 32))
    return result


def get_dssc_ctrl_data(in_folder, slow_data_pattern,
                       slow_data_aggregators, run_number, slow_data_path):
    """Obtaining dssc specific slow data like:
    operating frequency, target gain and encoded gain code from filename, etc.
    """

    # returned dictionaries
    targetGainAll = {}
    encodedGainAll = {}
    operatingFreqAll = {}
    for i in range(16):
        qm = 'Q{}M{}'.format(i // 4 + 1, i % 4 + 1)
        targetGainAll[qm] = None
        encodedGainAll[qm] = None
        operatingFreqAll[qm] = None

    ctrlDataFiles = {}
    for quadrant, aggregator in enumerate(slow_data_aggregators):
        quad_sd_pattern = slow_data_pattern.format("{:04d}".format(run_number),
                                                   "{:02d}".format(aggregator))

        f = os.path.join(in_folder, quad_sd_pattern)
        if os.path.exists(f):
            ctrlDataFiles[quadrant + 1] = f

    if not ctrlDataFiles:
        print("ERROR: no Slow Control Data found!")
        return targetGainAll, encodedGainAll, operatingFreqAll

    daq_format = None

    tGain = {}
    encodedGain = {}
    operatingFreqs = {}
    for quadrant in range(1,5):
        if quadrant in ctrlDataFiles.keys():
            file = ctrlDataFiles[quadrant]
            with h5py.File(file) as h5file:
                iramp_path = f"/RUN/{slow_data_path}{quadrant}/gain/irampFineTrm/value"
                if not daq_format:
                    tGain[quadrant] = 0.0  # 0.0 is default value for TG

                    irampSettings = h5file[iramp_path][0] if iramp_path in h5file else "Various"
                else:
                    epcConfig = h5file[f'/RUN/{slow_data_path}{quadrant}/epcRegisterFilePath/value'][0]\
                        .decode("utf-8")
                    epcConfig = epcConfig[epcConfig.rfind('/') + 1:]

                    print(f"EPC configuration: {epcConfig}")
                    targGain = _extr_gainparam_conffilename(epcConfig)
                    tGain[quadrant] = float(
                        targGain) if targGain is not None else 0.0
                    irampSettings = h5file[iramp_path][0].decode("utf-8")

                gainSettingsMap = {
                    coarseParam: int(
                        h5file[
                            f'/RUN/{slow_data_path}{quadrant}/gain/{coarseParam}/value'
                        ][0]
                    )
                    for coarseParam in ['fcfEnCap', 'csaFbCap', 'csaResistor']
                }

                gainSettingsMap['trimmed'] = np.int64(
                    1) if irampSettings == "Various" else np.int64(0)

                encodedGain[quadrant] = _get_gain_encoded_val(gainSettingsMap)

                opFreq = h5file[f'/RUN/{slow_data_path}{quadrant}/sequencer/cycleLength/value'][0]
                # The Operating Frequency of the detector should be in MHz.
                # Here the karabo operation mode is converted to acquisition rate:
                # 22 corresponds to 4.5 MHz, 44 to 2.25 MHz, etc.
                operatingFreqs[quadrant] = 4.5 * (22.0 / opFreq)
        else:
            print(f"ERROR: no slow data for quadrant {quadrant} is found")

    for varpair in [
        (targetGainAll, tGain),
        (encodedGainAll, encodedGain),
            (operatingFreqAll, operatingFreqs)]:
        for quadrant, value in varpair[1].items():
            for module in range(1, 5):
                qm = f'Q{quadrant}M{module}'
                varpair[0][qm] = value

    return targetGainAll, encodedGainAll, operatingFreqAll