Skip to content
Snippets Groups Projects

[Generic] Injection code for DynamicFF corrections

Merged Philipp Schmidt requested to merge feat/shimadzu-injection into feat/shimadzu-correction
1 file
+ 327
0
Compare changes
  • Side-by-side
  • Inline
+ 199
0
from datetime import datetime, timezone
from struct import pack, unpack
from pathlib import Path
from shutil import copyfile
from hashlib import md5
import binascii
import time
import numpy as np
import h5py
from calibration_client import CalibrationClient
from cal_tools.calcat_interface2 import _get_default_caldb_root, get_client
from cal_tools.tools import run_prop_seq_from_path
from cal_tools.restful_config import calibration_client
def write_ccv(
const_path,
pdu_name, pdu_uuid, detector_type,
calibration, conditions, created_at, proposal, runs,
data, dims, key='0'
):
"""Write CCV data file.
Args:
const_path (os.PathLike): Path to CCV file to write
pdu_name (str): Physical detector unit name
pdu_uuid (int): Physical detector unit UUID
detector_type (str): Detector type name
calibration (str): Calibration name
conditions (ConditionsBase): Detector operating conditions
created_at (datetime): Validity start for calibration
proposal (int): Raw data proposal the calibration data is
generated from
runs (Iterable of int): Raw data runs the calibration data is
generated from
data (ndarray): Calibration constant data
dims (Iterable of str):
key (str, optional):
Returns:
(str) CCV HDF group name.
"""
if data.ndim != len(dims):
raise ValueError('data.ndims != len(dims)')
with h5py.File(const_path, 'a') as const_file:
const_file.attrs['version'] = 0
pdu_group = const_file.require_group(pdu_name)
pdu_group.attrs['uuid'] = pdu_uuid
pdu_group.attrs['detector_type'] = detector_type
calibration_group = pdu_group.require_group(calibration)
if key is None:
key = str(len(calibration_group))
ccv_group = calibration_group.create_group(key)
ccv_group.attrs['begin_at'] = created_at.isoformat()
ccv_group.attrs['proposal'] = proposal
ccv_group.attrs['runs'] = np.array(runs, dtype=np.int32)
ccv_group_name = ccv_group.name
opcond_group = ccv_group.create_group('operating_condition')
opcond_dict = conditions.make_dict(
conditions.calibration_types[calibration])
for db_name, value in opcond_dict.items():
key = db_name.lower().replace(' ', '_')
dset = opcond_group.create_dataset(key, data=value,
dtype=np.float64)
dset.attrs['lower_deviation'] = 0.0
dset.attrs['upper_deviation'] = 0.0
dset.attrs['database_name'] = db_name
dset = ccv_group.create_dataset('data', data=data)
dset.attrs['dims'] = dims
return ccv_group_name
def inject_ccv(const_src, ccv_root, report_to=None):
"""Inject new CCV into CalCat.
Args:
const_path (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
Returns:
None
Raises:
RuntimeError: If CalCat POST request fails.
"""
pdu_name, calibration, key = ccv_root.lstrip('/').split('/')
with h5py.File(const_src, 'r') as const_file:
pdu_group = const_file[pdu_name]
pdu_uuid = pdu_group.attrs['uuid']
detector_type = pdu_group.attrs['detector_type']
ccv_group = const_file[ccv_root]
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs']
begin_at_str = ccv_group.attrs['begin_at']
condition_group = ccv_group['operating_condition']
cond_params = []
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
cond_params.append({
'parameter_name': param_dset.attrs['database_name'],
'value': float(param_dset[()]),
'lower_deviation_value': param_dset.attrs['lower_deviation'],
'upper_deviation_value': param_dset.attrs['upper_deviation'],
'flg_available': True
})
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
if proposal and len(runs) > 0:
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
else:
pass # Fallback for non-run based constants
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
for param_dict in cond_params:
cond_hash.update(str(param_dict['parameter_name']).encode())
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
# Add PDU "UUID" to parameters.
cond_params.append({
'parameter_name': 'Detector UUID',
'value': unpack('d', pack('q', pdu_uuid))[0],
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
})
inject_h = {
'detector_condition': {
'name': unique_name,
'parameters': cond_params
},
'calibration_constant': {
'calibration_name': calibration,
'detector_type_name': detector_type,
'flg_auto_approve': True
},
'calibration_constant_version': {
'raw_data_location': raw_data_location,
'file_name': const_filename,
'path_to_file': const_rel_path,
'data_set_name': f'{pdu_name}/{calibration}/0',
'start_idx': '0',
'end_idx': '0',
'begin_validity_at': begin_at_str,
'end_validity_at': '',
'begin_at': begin_at_str,
'pdu_physical_name': pdu_name,
'flg_good_quality': True
}
}
if report_to:
report_path = Path(report_to).absolute().with_suffix('.pdf')
inject_h['report'] = {
'name': report_path.stem,
'file_path': str(report_path)
}
const_dest = _get_default_caldb_root() / const_rel_path / const_filename
const_dest.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_src, const_dest)
resp = CalibrationClient.inject_new_calibration_constant_version(
calibration_client(), inject_h)
if not resp['success']:
const_dest.unlink() # Delete already copied CCV file.
raise RuntimeError(resp)
Loading