import binascii import time from hashlib import md5 from pathlib import Path from shutil import copyfile from struct import pack, unpack from typing import List, Optional, Union import h5py import numpy as np from calibration_client import CalibrationClient from cal_tools.calcat_interface2 import ( CalCatAPIError, _get_default_caldb_root, ) from cal_tools.restful_config import calibration_client CONDITION_NAME_MAX_LENGTH = 60 class InjectAPIError(CalCatAPIError): ... class CCVAlreadyInjectedError(InjectAPIError): ... def write_ccv( const_path, pdu_name, pdu_uuid, detector_type, calibration, conditions, created_at, proposal, runs, data, dims, key='0', deviations={}, ): """Write CCV data file. Args: const_path (os.PathLike): Path to CCV file to write pdu_name (str): Physical detector unit name pdu_uuid (int): Physical detector unit UUID detector_type (str): Detector type name calibration (str): Calibration name conditions (ConditionsBase): Detector operating conditions created_at (datetime): Validity start for calibration proposal (int): Raw data proposal the calibration data is generated from runs (Iterable of int): Raw data runs the calibration data is generated from data (ndarray): Calibration constant data dims (Iterable of str): Dimension names for the constant data. key (str, optional): Key added in constant file dataset when constant data are stored. Defaults to '0'. deviations (dict, optional): Deviation values for operating conditions. Each value can be a tuple (lower, upper) or a single value for symmetric deviations. Defaults to {}. e.g. {"integration_time": 0.025} Returns: (str) CCV HDF group name. """ if data.ndim != len(dims): raise ValueError('data.ndims != len(dims)') with h5py.File(const_path, 'a') as const_file: const_file.attrs['version'] = 0 pdu_group = const_file.require_group(pdu_name) pdu_group.attrs['uuid'] = pdu_uuid pdu_group.attrs['detector_type'] = detector_type calibration_group = pdu_group.require_group(calibration) if key is None: key = str(len(calibration_group)) ccv_group = calibration_group.create_group(key) ccv_group.attrs['begin_at'] = created_at.isoformat() ccv_group.attrs['proposal'] = proposal ccv_group.attrs['runs'] = np.array(runs, dtype=np.int32) ccv_group_name = ccv_group.name opcond_group = ccv_group.create_group('operating_condition') opcond_dict = conditions.make_dict( conditions.calibration_types[calibration]) for db_name, value in opcond_dict.items(): cond_name = db_name.lower().replace(' ', '_') dset = opcond_group.create_dataset( cond_name, data=value, dtype=np.float64) deviation = deviations.get(cond_name, (0.0, 0.0)) if isinstance(deviation, (int, float)): lower_dev = upper_dev = deviation else: lower_dev, upper_dev = deviation dset.attrs['lower_deviation'] = lower_dev dset.attrs['upper_deviation'] = upper_dev dset.attrs['database_name'] = db_name dset = ccv_group.create_dataset('data', data=data) dset.attrs['dims'] = dims return ccv_group_name def get_condition_dict( name: str, value: Union[float, str, int, bool], lower_deviation: float = 0.0, upper_deviation: float = 0.0, ): def to_float_or_string(value): """CALCAT expects data to either be float or a string. """ try: # Any digit or boolean return float(value) except: return str(value) return { 'parameter_name': name, 'value': to_float_or_string(value), 'lower_deviation_value': lower_deviation, 'upper_deviation_value': upper_deviation, 'flg_available': True } def generate_unique_condition_name( detector_type: str, pdu_name: str, pdu_uuid: float, cond_params: List[dict], ): """Generate a unique condition using UUID and timestamp. Args: detector_type (str): detector type. pdu_name (str): Physical detector unit db name. pdu_uuid (float): Physical detector unit db id. cond_params (List[dict]): A list of dictionary with each condition e.g. [{ "parameter_name": "Memory Cells", "value": 352.0, "lower-deviation": 0.0, "upper-deviation": 0.0 }] Returns: str: A unique name used for the table of conditions. """ unique_name = detector_type[:detector_type.index('-Type')] + ' Def' cond_hash = md5(pdu_name.encode()) cond_hash.update(int(pdu_uuid).to_bytes( length=8, byteorder='little', signed=False)) for param_dict in cond_params: cond_hash.update(str(param_dict['parameter_name']).encode()) cond_hash.update(str(param_dict['value']).encode()) unique_name += binascii.b2a_base64(cond_hash.digest()).decode() return unique_name[:CONDITION_NAME_MAX_LENGTH] def get_raw_data_location(proposal: str, runs: list): if proposal and len(runs) > 0: return ( f'proposal:{proposal} runs: {" ".join([str(x) for x in runs])}') else: return "" # Fallback for non-run based constants def inject_ccv( const_src: Union[Path, str], ccv_root: str, report_to: Optional[str] = None, ): """Inject new CCV into CalCat. Args: const_path (str or Path): Path to CCV data file. ccv_root (str): CCV HDF group name. report_to (str): Metadata location. Raises: RuntimeError: If CalCat POST request fails. """ pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') with h5py.File(const_src, 'r') as const_file: if ccv_root not in const_file: raise ValueError( f"Invalid HDF5 structure: {ccv_root} not found in file.") pdu_group = const_file[pdu_name] pdu_uuid = pdu_group.attrs['uuid'] detector_type = pdu_group.attrs['detector_type'] ccv_group = const_file[ccv_root] proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs'] begin_at_str = ccv_group.attrs['begin_at'] condition_group = ccv_group['operating_condition'] cond_params = [] # It's really not ideal we're mixing conditionS and condition now. for parameter in condition_group: param_dset = condition_group[parameter] cond_params.append(get_condition_dict( param_dset.attrs['database_name'], param_dset[()], param_dset.attrs['lower_deviation'], param_dset.attrs['upper_deviation'], )) const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' const_filename = f'cal.{time.time()}.h5' unique_name = generate_unique_condition_name( detector_type, pdu_name, pdu_uuid, cond_params) raw_data_location = get_raw_data_location(proposal, runs) # Add PDU "UUID" to parameters. cond_params.append(get_condition_dict( 'Detector UUID', unpack('d', pack('q', pdu_uuid))[0] )) inject_h = { 'detector_condition': { 'name': unique_name, 'parameters': cond_params }, 'calibration_constant': { 'calibration_name': calibration, 'detector_type_name': detector_type, 'flg_auto_approve': True }, 'calibration_constant_version': { 'raw_data_location': raw_data_location, 'file_name': const_filename, 'path_to_file': const_rel_path, 'data_set_name': f'{pdu_name}/{calibration}/0', 'start_idx': '0', 'end_idx': '0', 'begin_validity_at': begin_at_str, 'end_validity_at': '', 'begin_at': begin_at_str, 'pdu_physical_name': pdu_name, 'flg_good_quality': True } } if report_to: report_path = Path(report_to).absolute().with_suffix('.pdf') inject_h['report'] = { 'name': report_path.stem, 'file_path': str(report_path) } const_dest = _get_default_caldb_root() / const_rel_path / const_filename const_dest.parent.mkdir(parents=True, exist_ok=True) copyfile(const_src, const_dest) # TODO: Consider catching `RequestException`s # when bypassing calibration_client resp = CalibrationClient.inject_new_calibration_constant_version( calibration_client(), inject_h) if not resp['success']: const_dest.unlink() # Delete already copied CCV file. # TODO: Remove this when the new injection code is added. if ( resp['status_code'] == 422 and "taken" in resp['app_info'].get("begin_at", [""])[0] ): raise CCVAlreadyInjectedError else: raise RuntimeError(resp)