Something went wrong on our end
-
Karim Ahmed authoredKarim Ahmed authored
dssclib.py 5.29 KiB
import binascii
import os
import re
import struct
from hashlib import blake2b
from typing import Dict, Tuple
import h5py
import numpy as np
def get_num_cells(fname, h5path):
with h5py.File(fname, "r") as f:
cells = f[f"{h5path}/cellId"][()]
if cells == []:
return
maxcell = np.max(cells)
return maxcell+1
def get_pulseid_checksum(fname, h5path, h5path_idx):
"""generates hash value from pulse pattern (veto defined)."""
with h5py.File(fname, "r") as infile:
count = np.squeeze(infile[f"{h5path_idx}/count"])
first = np.squeeze(infile[f"{h5path_idx}/first"])
last_index = int(first[count != 0][-1] + count[count != 0][-1])
first_index = int(first[count != 0][0])
pulseids = infile[f"{h5path}/pulseId"][first_index:
int(first[count != 0][1])]
bveto = blake2b(pulseids.data, digest_size=8)
pulseid_checksum = struct.unpack(
'd', binascii.unhexlify(bveto.hexdigest()))[0]
return pulseid_checksum
def _extr_gainparam_conffilename(fileName: str) -> Tuple[int]:
"""extracts target gain from config filename, if provided."""
vals = re.search(".*_TG(?P<TG>\\d+.?\\d+)", fileName)
if vals:
return vals.group('TG')
def _get_gain_encoded_val(gainSettingsMap: Dict[str, int]) -> int:
"""The description of the paramters one can find in:
https://docs.xfel.eu/share/page/site/dssc/documentlibrary
Documents> DSSC - Documentation> DSSC - ASIC Documentation.
"""
result = np.uint64(0)
result += (np.int64(gainSettingsMap['csaFbCap']) +
np.int64(gainSettingsMap['csaResistor'] << 8) +
np.int64(gainSettingsMap['fcfEnCap'] << 16) +
np.int64(gainSettingsMap['trimmed'] << 32))
return result
def get_dssc_ctrl_data(in_folder, slow_data_pattern,
slow_data_aggregators, run_number, slow_data_path):
"""Obtaining dssc specific slow data like:
operating frequency, target gain and encoded gain code from filename, etc.
"""
# returned dictionaries
targetGainAll = {}
encodedGainAll = {}
operatingFreqAll = {}
for i in range(16):
qm = 'Q{}M{}'.format(i // 4 + 1, i % 4 + 1)
targetGainAll[qm] = None
encodedGainAll[qm] = None
operatingFreqAll[qm] = None
ctrlDataFiles = {}
for quadrant, aggregator in enumerate(slow_data_aggregators):
quad_sd_pattern = slow_data_pattern.format("{:04d}".format(run_number),
"{:02d}".format(aggregator))
f = os.path.join(in_folder, quad_sd_pattern)
if os.path.exists(f):
ctrlDataFiles[quadrant + 1] = f
if not ctrlDataFiles:
print("ERROR: no Slow Control Data found!")
return targetGainAll, encodedGainAll, operatingFreqAll
daq_format = None
tGain = {}
encodedGain = {}
operatingFreqs = {}
for quadrant in range(1,5):
if quadrant in ctrlDataFiles.keys():
file = ctrlDataFiles[quadrant]
with h5py.File(file) as h5file:
iramp_path = f"/RUN/{slow_data_path}{quadrant}/gain/irampFineTrm/value"
if not daq_format:
tGain[quadrant] = 0.0 # 0.0 is default value for TG
irampSettings = h5file[iramp_path][0] if iramp_path in h5file else "Various"
else:
epcConfig = h5file[f'/RUN/{slow_data_path}{quadrant}/epcRegisterFilePath/value'][0]\
.decode("utf-8")
epcConfig = epcConfig[epcConfig.rfind('/') + 1:]
print(f"EPC configuration: {epcConfig}")
targGain = _extr_gainparam_conffilename(epcConfig)
tGain[quadrant] = float(
targGain) if targGain is not None else 0.0
irampSettings = h5file[iramp_path][0].decode("utf-8")
gainSettingsMap = {
coarseParam: int(
h5file[
f'/RUN/{slow_data_path}{quadrant}/gain/{coarseParam}/value'
][0]
)
for coarseParam in ['fcfEnCap', 'csaFbCap', 'csaResistor']
}
gainSettingsMap['trimmed'] = np.int64(
1) if irampSettings == "Various" else np.int64(0)
encodedGain[quadrant] = _get_gain_encoded_val(gainSettingsMap)
opFreq = h5file[f'/RUN/{slow_data_path}{quadrant}/sequencer/cycleLength/value'][0]
# The Operating Frequency of the detector should be in MHz.
# Here the karabo operation mode is converted to acquisition rate:
# 22 corresponds to 4.5 MHz, 44 to 2.25 MHz, etc.
operatingFreqs[quadrant] = 4.5 * (22.0 / opFreq)
else:
print(f"ERROR: no slow data for quadrant {quadrant} is found")
for varpair in [
(targetGainAll, tGain),
(encodedGainAll, encodedGain),
(operatingFreqAll, operatingFreqs)]:
for quadrant, value in varpair[1].items():
for module in range(1, 5):
qm = f'Q{quadrant}M{module}'
varpair[0][qm] = value
return targetGainAll, encodedGainAll, operatingFreqAll