diff --git a/src/cal_tools/constants.py b/src/cal_tools/constants.py new file mode 100644 index 0000000000000000000000000000000000000000..bef79dc4e7e307f5f2e0fff66c51c00b3613c815 --- /dev/null +++ b/src/cal_tools/constants.py @@ -0,0 +1,315 @@ + +from datetime import datetime, timezone +from struct import pack, unpack +from pathlib import Path +from shutil import copyfile +from hashlib import md5 +import binascii +import time + +import numpy as np +import h5py + +from calibration_client import CalibrationClient +from cal_tools.calcat_interface2 import _get_default_caldb_root, get_client +from cal_tools.tools import run_prop_seq_from_path +from cal_tools.restful_config import calibration_client + + +def write_ccv( + const_path, + pdu_name, pdu_uuid, detector_type, + calibration, conditions, created_at, proposal, runs, + data, dims, key='0' +): + """Write CCV data file. + + Args: + const_path (os.PathLike): Path to CCV file to write + pdu_name (str): Physical detector unit name + pdu_uuid (int): Physical detector unit UUID + detector_type (str): Detector type name + calibration (str): Calibration name + conditions (ConditionsBase): Detector operating conditions + created_at (datetime): Validity start for calibration + proposal (int): Raw data proposal the calibration data is + generated from + runs (Iterable of int): Raw data runs the calibration data is + generated from + data (ndarray): Calibration constant data + dims (Iterable of str): + key (str, optional): + + Returns: + (str) CCV HDF group name. + + """ + + if data.ndims != len(dims): + raise ValueError('data.ndims != len(dims)') + + with h5py.File(const_path, 'a') as const_file: + const_file.attrs['version'] = 0 + + pdu_group = const_file.require_group(pdu_name) + pdu_group.attrs['uuid'] = pdu_uuid + pdu_group.attrs['detector_type'] = detector_type + + calibration_group = pdu_group.require_group(calibration) + + if key is None: + key = str(len(calibration_group)) + + ccv_group = CCVGroup(calibration_group.create_group(key).id) + ccv_group.attrs['created_at'] = created_at.isoformat() + ccv_group.attrs['proposal'] = proposal + ccv_group.attrs['runs'] = np.array(runs, dtype=np.int32) + + opcond_group = ccv_group.create_group('operating_condition') + opcond_dict = conditions.make_dict( + conditions.calibration_types[calibration]) + for db_name, value in opcond_dict.items(): + key = db_name.lower().replace(' ', '_') + dset = opcond_group.create_dataset(key, data=value, + dtype=np.float64) + dset.attrs['lower_deviation'] = 0.0 + dset.attrs['upper_deviation'] = 0.0 + dset.attrs['database_name'] = db_name + + dset = ccv_group.create_dataset('data', data='data') + dset.attrs['dims'] = dims + + return ccv_group.name + + +def inject_ccv(const_src, ccv_root, report_to): + """Inject new CCV into CalCat. + + Args: + const_path (str or Path): Path to CCV data file. + ccv_root (str): CCV HDF group name. + report_to (str): Metadata location. + + Returns: + None + + Raises: + RuntimeError: If CalCat POST request fails. + """ + + pdu_name, calibration, key = ccv_root.split('/') + + with h5py.File(const_src, 'r') as const_file: + pdu_group = const_file[pdu_name] + pdu_uuid = pdu_group.attrs['uuid'] + detector_type = pdu_group.attrs['detector_type'] + + ccv_group = const_file[ccv_root] + proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs'] + begin_at_str = ccv_group.attrs['begin_at'] + + condition_group = ccv_group['operating_condition'] + + cond_params = [] + + # It's really not ideal we're mixing conditionS and condition now. + for parameter in condition_group: + param_dset = condition_group[parameter] + cond_params.append({ + 'parameter_name': param_dset.attrs['database_name'], + 'value': param_dset, + 'lower_deviation_value': param_dset.attrs['lower_deviation'], + 'upper_deviation_value': param_dset.attrs['upper_deviation'], + 'flg_available': True + }) + + const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' + const_filename = f'cal.{time.time()}.h5' + report_path = Path(report_to).with_suffix('.pdf') + + if proposal and runs: + raw_data_location = 'proposal:{} runs: {}'.format( + proposal, ' '.join([str(x) for x in runs])) + else: + pass # Fallback for non-run based constants + + # Generate condition name. + unique_name = detector_type[:detector_type.index('-Type')] + ' Def' + cond_hash = md5(pdu_name.encode()) + cond_hash.update(pdu_uuid.to_bytes( + length=8, byteorder='little', signed=False))3 + + for param_dict in cond_params: + cond_hash.update(str(param_dict['parameter_name']).encode()) + cond_hash.update(str(param_dict['value']).encode()) + + unique_name += binascii.b2a_base64(cond_hash.digest()).decode() + unique_name = unique_name[:60] + + # Add PDU "UUID" to parameters. + cond_params.append({ + 'parameter_name': 'Detector UUID', + 'value': unpack('d', pack('q', pdu['uuid']))[0], + 'lower_deviation_value': 0.0, + 'upper_deviation_value': 0.0, + 'flg_available': True + }) + + inject_h = { + 'report': { + 'name': report_path.stem, + 'file_path': str(report_path) + }, + 'detector_condition': { + 'name': unique_name, + 'parameters': cond_params + }, + 'calibration_constant': { + 'calibration_name': calibration, + 'detector_type_name': detector_type, + 'flg_auto_approve': True + }, + 'calibration_constant_version': { + 'raw_data_location': raw_data_location, + 'file_name': const_filename, + 'path_to_file': const_rel_path, + 'data_set_name': f'{pdu_name}/{calibration}/0', + 'start_idx': '0', + 'end_idx': '0', + 'begin_validity_at': begin_at_str, + 'end_validity_at': '', + 'begin_at': begin_at_str, + 'pdu_physical_name': pdu_name, + 'flg_good_quality': True + } + } + + resp = CalibrationClient.inject_new_calibration_constant_version(calibration_client(), inject_h) + + if not resp['success']: + raise RuntimeError(resp) + + const_dest = _get_default_caldb_root() / const_rel_path / const_filename + const_dest.parent.mkdir(parents=True, exist_ok=True) + copyfile(const_src, const_dest) + + +def inject_ccv_legacy(in_folder, metadata_folder, runs, calibration, cond, pdu, const_input, begin_at): + """Inject new CCV into CalCat. + + + metadata_folder + calibration -> in file, but pass anyway as there could be multiple + const_input -> doh! + + Recommended way of running this from pycalibration notebooks: + + inject_ccv( + in_folder, # Same as notebook parameter + metadata_folder, # Same as notebook parameter + [run_high, run_mid, run_low], + 'Offset', + current_pdu, # Element of calibration_client.from get_all_phy_det_units_from_detector + out_folder / 'my-constant-file.h5', + get_dir_creation_date(in_folder, run_high) + ) + + Args: + in_folder (str or Path): Root for input data, i.e. *not* containing r{run:04d} folder + metadata_folder (str or Path): Metadata location + runs (Iterable of str or int): Run number(s) used for characterization. + calibration (str): Calibration name, e.g. 'Offset', must exist in CalCat. + cond (ConditionsBase): Operating conditions for injected CCV. + pdu (dict): PDU information, should be passed as given by CalCat. + const_input (str or Path): Path to local calibration constant HDF file. + begin_at (datetime): Begin of calibration constant validity. + + Returns: + None + + Raises: + RuntimeError: If CalCat POST request fails. + + """ + + cond_dict = cond.make_dict(cond.calibration_types[calibration]) + pdu_name = pdu['physical_name'] + detector_type = pdu['detector_type']['name'] + + const_root = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' + const_filename = f'cal.{time.time()}.h5' + report_path = Path(metadata_folder).with_suffix('.pdf') + _, proposal, _ = run_prop_seq_from_path(in_folder) + raw_data_location = 'proposal:{} runs: {}'.format( + proposal, ' '.join([str(x) for x in runs])) + + # Generate condition name. + unique_name = detector_type[:detector_type.index('-Type')] + ' Def' + cond_hash = hashlib.md5(pdu_name.encode()) + cond_hash.update(pdu['uuid'].to_bytes(length=8, byteorder='little', signed=False)) + + for key, value in cond_dict.items(): + cond_hash.update(str(key).encode()) + cond_hash.update(str(value).encode()) + + unique_name += binascii.b2a_base64(cond_hash.digest()).decode() + unique_name = unique_name[:60] + + # Build condition dict for CalCat. + cond_params = [ + { + 'parameter_name': key, + 'value': value, + 'lower_deviation_value': 0.0, + 'upper_deviation_value': 0.0, + 'flg_available': True + } + for key, value in cond_dict.items() + ] + + # Add PDU "UUID" to parameters. + cond_params.append({ + 'parameter_name': 'Detector UUID', + 'value': unpack('d', pack('q', pdu['uuid']))[0], + 'lower_deviation_value': 0.0, + 'upper_deviation_value': 0.0, + 'flg_available': True + }) + + inject_h = { + 'report': { + 'name': report_path.stem, + 'file_path': str(report_path) + }, + 'detector_condition': { + 'name': unique_name, + 'parameters': cond_params + }, + 'calibration_constant': { + 'calibration_name': calibration, + 'detector_type_name': detector_type, + 'flg_auto_approve': True + }, + 'calibration_constant_version': { + 'raw_data_location': raw_data_location, + 'file_name': const_filename, + 'path_to_file': const_root, + 'data_set_name': f'{pdu_name}/{calibration}/0', + 'start_idx': '0', + 'end_idx': '0', + 'begin_validity_at': begin_at.isoformat(), + 'end_validity_at': '', + 'begin_at': begin_at.isoformat(), + 'pdu_physical_name': pdu_name, + 'flg_good_quality': True + } + } + + resp = CalibrationClient.inject_new_calibration_constant_version(calibration_client(), inject_h) + + if not resp['success']: + raise RuntimeError(resp) + + const_path = _get_default_caldb_root() / const_root / const_filename + const_path.parent.mkdir(parents=True, exist_ok=True) + copyfile(const_input, const_path)