Skip to content
Snippets Groups Projects

[Generic] Injection code for DynamicFF corrections

Merged Philipp Schmidt requested to merge feat/shimadzu-injection into feat/shimadzu-correction
All threads resolved!
1 file
+ 327
0
Compare changes
  • Side-by-side
  • Inline
+ 327
0
from datetime import datetime, timezone
from struct import pack, unpack
from pathlib import Path
from shutil import copyfile
from hashlib import md5
import binascii
import time
import numpy as np
import h5py
from calibration_client import CalibrationClient
from cal_tools.calcat_interface2 import _get_default_caldb_root, get_client
from cal_tools.tools import run_prop_seq_from_path
from cal_tools.restful_config import calibration_client
def write_ccv(
const_path,
pdu_name, pdu_uuid, detector_type,
calibration, conditions, created_at, proposal, runs,
data, dims, key='0'
):
"""Write CCV data file.
Args:
const_path (os.PathLike): Path to CCV file to write
pdu_name (str): Physical detector unit name
pdu_uuid (int): Physical detector unit UUID
detector_type (str): Detector type name
calibration (str): Calibration name
conditions (ConditionsBase): Detector operating conditions
created_at (datetime): Validity start for calibration
proposal (int): Raw data proposal the calibration data is
generated from
runs (Iterable of int): Raw data runs the calibration data is
generated from
data (ndarray): Calibration constant data
dims (Iterable of str):
key (str, optional):
Returns:
(str) CCV HDF group name.
"""
if data.ndim != len(dims):
raise ValueError('data.ndims != len(dims)')
with h5py.File(const_path, 'a') as const_file:
const_file.attrs['version'] = 0
pdu_group = const_file.require_group(pdu_name)
pdu_group.attrs['uuid'] = pdu_uuid
pdu_group.attrs['detector_type'] = detector_type
calibration_group = pdu_group.require_group(calibration)
if key is None:
key = str(len(calibration_group))
ccv_group = calibration_group.create_group(key)
ccv_group.attrs['begin_at'] = created_at.isoformat()
ccv_group.attrs['proposal'] = proposal
ccv_group.attrs['runs'] = np.array(runs, dtype=np.int32)
ccv_group_name = ccv_group.name
opcond_group = ccv_group.create_group('operating_condition')
opcond_dict = conditions.make_dict(
conditions.calibration_types[calibration])
for db_name, value in opcond_dict.items():
key = db_name.lower().replace(' ', '_')
dset = opcond_group.create_dataset(key, data=value,
dtype=np.float64)
dset.attrs['lower_deviation'] = 0.0
dset.attrs['upper_deviation'] = 0.0
dset.attrs['database_name'] = db_name
dset = ccv_group.create_dataset('data', data=data)
dset.attrs['dims'] = dims
return ccv_group_name
def inject_ccv(const_src, ccv_root, report_to):
"""Inject new CCV into CalCat.
Args:
const_path (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
Returns:
None
Raises:
RuntimeError: If CalCat POST request fails.
"""
pdu_name, calibration, key = ccv_root.lstrip('/').split('/')
with h5py.File(const_src, 'r') as const_file:
pdu_group = const_file[pdu_name]
pdu_uuid = pdu_group.attrs['uuid']
detector_type = pdu_group.attrs['detector_type']
ccv_group = const_file[ccv_root]
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs']
begin_at_str = ccv_group.attrs['begin_at']
condition_group = ccv_group['operating_condition']
cond_params = []
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
cond_params.append({
'parameter_name': param_dset.attrs['database_name'],
'value': float(param_dset[()]),
'lower_deviation_value': param_dset.attrs['lower_deviation'],
'upper_deviation_value': param_dset.attrs['upper_deviation'],
'flg_available': True
})
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
if proposal and len(runs) > 0:
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
else:
pass # Fallback for non-run based constants
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
for param_dict in cond_params:
cond_hash.update(str(param_dict['parameter_name']).encode())
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
# Add PDU "UUID" to parameters.
cond_params.append({
'parameter_name': 'Detector UUID',
'value': unpack('d', pack('q', pdu_uuid))[0],
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
})
inject_h = {
'detector_condition': {
'name': unique_name,
'parameters': cond_params
},
'calibration_constant': {
'calibration_name': calibration,
'detector_type_name': detector_type,
'flg_auto_approve': True
},
'calibration_constant_version': {
'raw_data_location': raw_data_location,
'file_name': const_filename,
'path_to_file': const_rel_path,
'data_set_name': f'{pdu_name}/{calibration}/0',
'start_idx': '0',
'end_idx': '0',
'begin_validity_at': begin_at_str,
'end_validity_at': '',
'begin_at': begin_at_str,
'pdu_physical_name': pdu_name,
'flg_good_quality': True
}
}
if report_to:
report_path = Path(report_to).with_suffix('.pdf')
inject_h['report'] = {
'name': report_path.stem,
'file_path': str(report_path)
}
const_dest = _get_default_caldb_root() / const_rel_path / const_filename
const_dest.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_src, const_dest)
resp = CalibrationClient.inject_new_calibration_constant_version(
calibration_client(), inject_h)
if not resp['success']:
const_dest.unlink() # Delete already copied CCV file.
raise RuntimeError(resp)
def inject_ccv_legacy(in_folder, metadata_folder, runs, calibration, cond,
pdu, const_input, begin_at):
"""Inject new CCV into CalCat.
LEGACY VERSION - DO NOT USE
(from before most details were saved in CCV file itself)
metadata_folder
calibration -> in file, but pass anyway as there could be multiple
const_input -> doh!
Recommended way of running this from pycalibration notebooks:
inject_ccv(
in_folder, # Same as notebook parameter
metadata_folder, # Same as notebook parameter
[run_high, run_mid, run_low],
'Offset',
current_pdu, # Element of calibration_client.from
# get_all_phy_det_units_from_detector
out_folder / 'my-constant-file.h5',
get_dir_creation_date(in_folder, run_high)
)
Args:
in_folder (str or Path): Root for input data, i.e. *not*
containing r{run:04d} folder
metadata_folder (str or Path): Metadata location
runs (Iterable of str or int): Run number(s) used for characterization.
calibration (str): Calibration name, e.g. 'Offset', must exist in CalCat.
cond (ConditionsBase): Operating conditions for injected CCV.
pdu (dict): PDU information, should be passed as given by CalCat.
const_input (str or Path): Path to local calibration constant HDF file.
begin_at (datetime): Begin of calibration constant validity.
Returns:
None
Raises:
RuntimeError: If CalCat POST request fails.
"""
cond_dict = cond.make_dict(cond.calibration_types[calibration])
pdu_name = pdu['physical_name']
detector_type = pdu['detector_type']['name']
const_root = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
report_path = Path(metadata_folder).with_suffix('.pdf')
_, proposal, _ = run_prop_seq_from_path(in_folder)
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = hashlib.md5(pdu_name.encode())
cond_hash.update(pdu['uuid'].to_bytes(
length=8, byteorder='little', signed=False))
for key, value in cond_dict.items():
cond_hash.update(str(key).encode())
cond_hash.update(str(value).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
# Build condition dict for CalCat.
cond_params = [
{
'parameter_name': key,
'value': value,
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
}
for key, value in cond_dict.items()
]
# Add PDU "UUID" to parameters.
cond_params.append({
'parameter_name': 'Detector UUID',
'value': unpack('d', pack('q', pdu['uuid']))[0],
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
})
inject_h = {
'report': {
'name': report_path.stem,
'file_path': str(report_path)
},
'detector_condition': {
'name': unique_name,
'parameters': cond_params
},
'calibration_constant': {
'calibration_name': calibration,
'detector_type_name': detector_type,
'flg_auto_approve': True
},
'calibration_constant_version': {
'raw_data_location': raw_data_location,
'file_name': const_filename,
'path_to_file': const_root,
'data_set_name': f'{pdu_name}/{calibration}/0',
'start_idx': '0',
'end_idx': '0',
'begin_validity_at': begin_at.isoformat(),
'end_validity_at': '',
'begin_at': begin_at.isoformat(),
'pdu_physical_name': pdu_name,
'flg_good_quality': True
}
}
resp = CalibrationClient.inject_new_calibration_constant_version(
calibration_client(), inject_h)
if not resp['success']:
raise RuntimeError(resp)
const_path = _get_default_caldb_root() / const_root / const_filename
const_path.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_input, const_path)
Loading