diff --git a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb index 2f1b0b0a48df7ccba136ecc2df98061618617a8c..b9555856554f2d7a1292e30cb86572345f6c7c7d 100644 --- a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb @@ -92,6 +92,7 @@ "from cal_tools import step_timing\n", "from cal_tools.calcat_interface2 import (\n", " CalibrationData,\n", + " CCVAlreadyInjectedError,\n", " JUNGFRAUConditions,\n", ")\n", "from cal_tools.constants import (\n", @@ -100,7 +101,6 @@ " write_ccv,\n", ")\n", "from cal_tools.enums import BadPixels, JungfrauGainMode\n", - "from cal_tools.inject import CCVAlreadyInjectedError, inject_ccv\n", "from cal_tools.jungfrau import jungfraulib\n", "from cal_tools.restful_config import (\n", " extra_calibration_client,\n", diff --git a/src/cal_tools/calcat_interface2.py b/src/cal_tools/calcat_interface2.py index d120b472a50f81843cc983de6f2a0b3c8d84ebdb..42775e51739284399cbf6aeeb1e6e7fd85661ea0 100644 --- a/src/cal_tools/calcat_interface2.py +++ b/src/cal_tools/calcat_interface2.py @@ -30,6 +30,32 @@ class ModuleNameError(KeyError): class CalCatAPIError(requests.HTTPError): """Used when the response includes error details as JSON""" +class InjectAPIError(CalCatAPIError): + ... + + +class CCVAlreadyInjectedError(InjectAPIError): + ... + + +CALIBRATION_CONSTANT_VERSIONS = "calibration_constant_versions" + +def _failed_response(resp): + # TODO: Add more errors if needed + if CALIBRATION_CONSTANT_VERSIONS in resp.url.lstrip("/"): + if resp.status_code == 422: + raise CCVAlreadyInjectedError + elif resp.status_code >= 400: + try: + d = json.loads(resp.content.decode("utf-8")) + except Exception: + resp.raise_for_status() + else: + raise InjectAPIError( + f"Error {resp.status_code} from API: " + f"{d.get('info', 'missing details')}" + ) + class CalCatAPIClient: def __init__(self, base_api_url, oauth_client=None, user_email=""): @@ -79,6 +105,17 @@ class CalCatAPIClient: _headers.update(headers) return self.session.get(url, params=params, headers=_headers, **kwargs) + def post_request(self, relative_url, data=None, headers=None, **kwargs): + """Make a POST request, return the HTTP response object""" + # Base URL may include e.g. '/api/'. This is a prefix for all URLs; + # even if they look like an absolute path. + url = urljoin(self.base_api_url, relative_url.lstrip("/")) + _headers = self.default_headers() + if headers: + _headers.update(headers) + return self.session.post( + url, data=json.dumps(data), headers=_headers, **kwargs) + @staticmethod def _parse_response(resp: requests.Response): if resp.status_code >= 400: @@ -97,11 +134,27 @@ class CalCatAPIClient: else: return json.loads(resp.content.decode("utf-8")) + @staticmethod + def _parse_post_response(resp: requests.Response): + + if resp.status_code >= 400: + _failed_response(resp) + + if resp.content == b"": + return None + else: + return json.loads(resp.content.decode("utf-8")) + def get(self, relative_url, params=None, **kwargs): """Make a GET request, return response content from JSON""" resp = self.get_request(relative_url, params, **kwargs) return self._parse_response(resp) + def post(self, relative_url, params=None, **kwargs): + """Make a POST request, return response content from JSON""" + resp = self.post_request(relative_url, params, **kwargs) + return self._parse_post_response(resp) + _pagination_headers = ( "X-Total-Pages", "X-Count-Per-Page", @@ -133,27 +186,62 @@ class CalCatAPIClient: def detector_by_id(self, det_id): return self.get(f"detectors/{det_id}") + def pdus_by_detector_name(self, det_name, snapshot_at=""): + det_id = self.detector_by_identifier(det_name)['id'] + return self.pdus_by_detector_id(det_id, snapshot_at) + + def get_calibration_constant( + self, calibration_constant: dict): + return self.get( + f"calibrations/{calibration_constant['calibration_id']}/get_calibration_constant", # noqa # noqa + calibration_constant) + # -------------------- # Shortcuts to find 1 of something by an ID-like field (e.g. name) other # than CalCat's own integer IDs. Error on no match or >1 matches. - @lru_cache() - def detector_by_identifier(self, identifier): - # The "identifier", "name" & "karabo_name" fields seem to have the same names - res = self.get("detectors", {"identifier": identifier}) + @lru_cache + def get_by_name(self, endpoint, name, name_key="name"): + res = self.get(endpoint, {name_key: name}) if not res: - raise KeyError(f"No detector with identifier {identifier}") + raise KeyError(f"No {endpoint[:-1]} with name {name}") elif len(res) > 1: - raise ValueError(f"Multiple detectors found with identifier {identifier}") + raise ValueError(f"Multiple {endpoint} found with name {name}") return res[0] - @lru_cache() + def detector_by_identifier(self, identifier): + return self.get_by_name( + "detectors", identifier, name_key="identifier") + def calibration_by_name(self, name): - res = self.get("calibrations", {"name": name}) - if not res: - raise KeyError(f"No calibration with name {name}") - elif len(res) > 1: - raise ValueError(f"Multiple calibrations found with name {name}") - return res[0] + return self.get_by_name("calibrations", name) + + def parameter_by_name(self, name): + return self.get_by_name("parameters", name) + + # Keeping the cache here instead of in other methods + # because it's less likely a new detector type will be edited + # in CalCat without the need to add new ConditionBase class for it. + @lru_cache + def detector_type_by_name(self, name): + return self.get_by_name("detector_types", name) + + def pdu_by_name(self, name): + return self.get_by_name( + "physical_detector_units", name, name_key="physical_name") + + # Posting APIs + def set_expected_condition(self, conditions: dict): + return self.post("conditions/set_expected_condition", conditions) + + def create_calibration_constant(self, calibration_constant: dict): + return self.post("calibration_constants", calibration_constant) + + def get_or_create_report(self, report: dict): + # Based on create or get API + return self.post("reports/set", report) + + def create_calibration_constant_version(self, ccv: dict): + return self.post(CALIBRATION_CONSTANT_VERSIONS, ccv) global_client = None diff --git a/src/cal_tools/constants.py b/src/cal_tools/constants.py index 57718d4cfda6a15f55c08ad40baa40c8209cb6d7..18783a1025d6f423bddc29daf5c6ace7aa6bfde9 100644 --- a/src/cal_tools/constants.py +++ b/src/cal_tools/constants.py @@ -15,20 +15,47 @@ from cal_tools.calcat_interface2 import ( get_default_caldb_root, ) from cal_tools.restful_config import calibration_client +from dataclasses import asdict, dataclass, field +from typing import List, Optional, Tuple, Union +from datetime import datetime, timezone +@dataclass +class ParameterConditionAttribute: + value: str + parameter_id: str = "" # Default value, allowing assignment later + lower_deviation_value: float = 0 + upper_deviation_value: float = 0 + flg_available: str = 'true' + description: str = '' -class CCVAlreadyInjected(UserWarning): - """Exception when same CCV was already injected. - expected response: { - 'success': False, 'status_code': 422, - 'info': 'Error creating calibration_constant_version', - 'app_info': { - 'calibration_constant_id': ['has already been taken'], - 'physical_detector_unit_id': ['has already been taken'], - 'begin_at': ['has already been taken'] - }, 'pagination': {}, 'data': {}} + def validate(self): + if not self.parameter_id: + raise ValueError(f"The parameter_id for '{self.parameter_name}' is not set.") + + + +def generate_unique_cond_name(detector_type, pdu_name, pdu_uuid, cond_params): + # Generate condition name. + unique_name = detector_type[:detector_type.index('-Type')] + ' Def' + cond_hash = md5(pdu_name.encode()) + cond_hash.update(int(pdu_uuid).to_bytes( + length=8, byteorder='little', signed=False)) + + for pname, pattrs in cond_params.items(): + cond_hash.update(pname.encode()) + cond_hash.update(str(pattrs.value).encode()) + + unique_name += binascii.b2a_base64(cond_hash.digest()).decode() + return unique_name[:60] + + +def create_unique_cc_name(det_type, calibration, condition_name): """ - pass + Generating CC name from condition name, + detector type, and calibration name. + """ + cc_name_1 = f'{det_type}_{calibration}' + return f'{cc_name_1[:40]}_{condition_name}' # I guess there is a limit to the name characters? CONDITION_NAME_MAX_LENGTH = 60 @@ -41,13 +68,40 @@ class CCVAlreadyInjectedError(InjectAPIError): ... -def custom_warning_formatter( - message, category, filename, lineno, file=None, line=None): - """Custom warning format to avoid display filename and lineno.""" - return f"{category.__name__}: {message}\n" +def create_unique_ccv_name(start_idx): + # Generate unique name if it doesn't exist + datetime_str = datetime_str = datetime.now( + timezone.utc).strftime('%Y%m%d_%H%M%S') + return f'{datetime_str}_sIdx={start_idx}' + + +def extract_parameter_conditions( + ccv_group: dict, pdu_uuid: int) -> dict: + def _to_string(value): + """Send only accepted value types to CALCAT.""" + if isinstance(value, bool): + value = float(value) + return str(value) + + cond_params = {} + condition_group = ccv_group['operating_condition'] + # It's really not ideal we're mixing conditionS and condition now. + # Get parameter data and create a list of `ParameterConditionAttribute`s + for parameter in condition_group: + param_dset = condition_group[parameter] + param_name = param_dset.attrs['database_name'] + cond_params[param_name] = ParameterConditionAttribute( + value=_to_string(param_dset[()]), + lower_deviation_value=param_dset.attrs['lower_deviation'], + upper_deviation_value=param_dset.attrs['upper_deviation'], + ) + + # Add PDU "UUID" to parameters. + cond_params['Detector UUID'] = ParameterConditionAttribute( + value=_to_string(unpack('d', pack('q', pdu_uuid))[0]), + ) + return cond_params -# Apply the custom warning formatter -warnings.formatwarning = custom_warning_formatter def write_ccv( const_path, @@ -188,6 +242,62 @@ def generate_unique_condition_name( return unique_name[:CONDITION_NAME_MAX_LENGTH] +def get_or_create_calibration_constant( + client: CalCatAPIClient, + calibration: str, + detector_type: str, + condition_id: int, + condition_name: str, +): + cond_id = condition_id + + # Prepare some parameters to set Calibration Constant. + cal_id = client.calibration_by_name(calibration)['id'] + det_type_id = client.detector_type_by_name(detector_type)['id'] + cc_name = create_unique_cc_name(detector_type, calibration, condition_name) + + calibration_constant = dict( + name=cc_name, + calibration_id=cal_id, + condition_id=cond_id, + detector_type_id=det_type_id, + flg_auto_approve = 'true', + flg_available = 'true', + description = "", + ) + resp = client.get_calibration_constant(calibration_constant) + return resp['id'] if resp else client.create_calibration_constant( + calibration_constant)['id'] # calibration constant id + + +def create_condition( + client: CalCatAPIClient, + detector_type: str, + pdu_name: str, + pdu_uuid: float, + cond_params: dict, + ) -> Tuple[int, str]: + # Create condition unique name + cond_name = generate_unique_cond_name( + detector_type, pdu_name, pdu_uuid, cond_params) + + # Add the missing parameter_id value in `ParameterConditionAttribute`s. + for param, cond in cond_params.items(): + cond.parameter_id = client.parameter_by_name(param)['id'] + cond_params[param] = asdict(cond) + + # Create condition table in database, if not available. + cond = dict( + name=cond_name, + parameters_conditions_attributes=list(cond_params.values()), + flg_available = 'true', + event_at = str(datetime.today()), # TODO: Why is this needed? it is not written in swagger. + description = '', + ) + resp = client.set_expected_condition({"condition":cond}) + return resp['id'], cond_name + + def get_raw_data_location(proposal: str, runs: list): if proposal and len(runs) > 0: return ( @@ -196,112 +306,105 @@ def get_raw_data_location(proposal: str, runs: list): return "" # Fallback for non-run based constants -def inject_ccv( - const_src: Union[Path, str], - ccv_root: str, - report_to: Optional[str] = None, -): - """Inject new CCV into CalCat. - - Args: - const_path (str or Path): Path to CCV data file. - ccv_root (str): CCV HDF group name. - report_to (str): Metadata location. +def get_ccv_info_from_file( + cfile: Union[str, Path], pdu: str, ccv_root: str): - Raises: - RuntimeError: If CalCat POST request fails. """ - pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') + Read CCV HDF5 file to get calibration parameters. - with h5py.File(const_src, 'r') as const_file: - if ccv_root not in const_file: - raise ValueError( - f"Invalid HDF5 structure: {ccv_root} not found in file.") + Args: + cfile (str, Path): The CalibrationConstantVersion file path. + pdu (str): The Physical detector unit name for the stored constant. + ccv_root (str): The CCV root dataset path to access the data. - pdu_group = const_file[pdu_name] + Returns: + List[ParameterConditionAttribute], str, float, str, str + """ + with h5py.File(cfile, 'r') as const_file: + pdu_group = const_file[pdu] pdu_uuid = pdu_group.attrs['uuid'] detector_type = pdu_group.attrs['detector_type'] - ccv_group = const_file[ccv_root] + raw_data_location = get_raw_data_location( + ccv_group.attrs['proposal'], + ccv_group.attrs['runs'] + ) + begin_at = ccv_group.attrs['begin_at'] + cond_params = extract_parameter_conditions(ccv_group, pdu_uuid) - proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs'] - begin_at_str = ccv_group.attrs['begin_at'] - - condition_group = ccv_group['operating_condition'] + return cond_params, begin_at, pdu_uuid, detector_type, raw_data_location - cond_params = [] - # It's really not ideal we're mixing conditionS and condition now. - for parameter in condition_group: - param_dset = condition_group[parameter] - cond_params.append(get_condition_dict( - param_dset.attrs['database_name'], - param_dset[()], - param_dset.attrs['lower_deviation'], - param_dset.attrs['upper_deviation'], - )) - const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' - const_filename = f'cal.{time.time()}.h5' - - unique_name = generate_unique_condition_name( - detector_type, pdu_name, pdu_uuid, cond_params) +def inject_ccv(const_src, ccv_root, report_to=None, client=None): + """Inject new CCV into CalCat. - raw_data_location = get_raw_data_location(proposal, runs) + Args: + const_path (str or Path): Path to CCV data file. + ccv_root (str): CCV HDF group name. + report_to (str): Metadata location. + Raises: + RuntimeError: If CalCat POST request fails. + """ + if client is None: + from cal_tools.calcat_interface2 import get_client + client = get_client() - # Add PDU "UUID" to parameters. - cond_params.append(get_condition_dict( - 'Detector UUID', - unpack('d', pack('q', pdu_uuid))[0] - )) - - inject_h = { - 'detector_condition': { - 'name': unique_name, - 'parameters': cond_params - }, - 'calibration_constant': { - 'calibration_name': calibration, - 'detector_type_name': detector_type, - 'flg_auto_approve': True - }, - 'calibration_constant_version': { - 'raw_data_location': raw_data_location, - 'file_name': const_filename, - 'path_to_file': const_rel_path, - 'data_set_name': f'{pdu_name}/{calibration}/0', - 'start_idx': '0', - 'end_idx': '0', - 'begin_validity_at': begin_at_str, - 'end_validity_at': '', - 'begin_at': begin_at_str, - 'pdu_physical_name': pdu_name, - 'flg_good_quality': True - } - } + pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') - if report_to: - report_path = Path(report_to).absolute().with_suffix('.pdf') - inject_h['report'] = { - 'name': report_path.stem, - 'file_path': str(report_path) - } + ( + cond_params, begin_at, pdu_uuid, detector_type, raw_data_location + ) = get_ccv_info_from_file(const_src, pdu_name, ccv_root) + const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' + const_filename = f'cal.{time.time()}.h5' const_dest = get_default_caldb_root() / const_rel_path / const_filename const_dest.parent.mkdir(parents=True, exist_ok=True) copyfile(const_src, const_dest) - # TODO: Consider catching `RequestException`s - # when bypassing calibration_client - resp = CalibrationClient.inject_new_calibration_constant_version( - calibration_client(), inject_h) - - if not resp['success']: + try: + condition_id, condition_name = create_condition( + client, detector_type, pdu_name, pdu_uuid, cond_params) + + # Create Calibration Constant in database, if not available. + cc_id = get_or_create_calibration_constant( + client, calibration, detector_type, condition_id, condition_name) + + # Create report in database, if not available + report_id = None + if report_to: + report_path = Path(report_to).absolute().with_suffix('.pdf') + resp = client.get_or_create_report(dict( + name=report_path.stem, + file_path=str(report_path), + flg_available='true', + description="", + )) + report_id = resp['id'] + + # Get PDU ID before creating new CCV. + pdu_id = client.pdu_by_name(pdu_name)['id'] + + # Prepare CCV data and inject it to CALCAT. + start_idx = '0' + ccv = dict( + name=create_unique_ccv_name(start_idx), + file_name=const_filename, + path_to_file=const_rel_path, + data_set_name=ccv_root, + calibration_constant_id=cc_id, + physical_detector_unit_id=pdu_id, + raw_data_location=raw_data_location, + report_id=report_id, + begin_validity_at=begin_at, + end_validity_at='', + begin_at=begin_at, + start_idx=start_idx, + end_idx = '0', + flg_deployed = 'true', + flg_good_quality = 'true', + description = '', + ) + client.create_calibration_constant_version(ccv) + except Exception: const_dest.unlink() # Delete already copied CCV file. - # TODO: Remove this when the new injection code is added. - if ( - resp['status_code'] == 422 and - "taken" in resp['app_info'].get("begin_at", [""])[0] - ): - raise CCVAlreadyInjectedError - else: - raise RuntimeError(resp) + raise diff --git a/src/cal_tools/inject.py b/src/cal_tools/inject.py deleted file mode 100644 index b26da79a7da129b7193484878f143adc6a76d028..0000000000000000000000000000000000000000 --- a/src/cal_tools/inject.py +++ /dev/null @@ -1,433 +0,0 @@ -''' -1- Set Condition - a. /parameters [GET] [DONE] - b. /conditions/set_expected_condition [POST] -2. Set CC - a. /calibrations [GET] - b. /detector_types [GET] - c. /calibration_constants/{id} [GET] - d. /calibration_constants [POST] -3. Set Report - a. /reports [GET] - b. /reports/set [POST] -4. Set CCV - a. /physical_detector_units [GET] - b. /calibration_constant_versions [POST] -''' -import binascii -import json -import time -from dataclasses import asdict, dataclass, field -from datetime import datetime, timezone -from functools import lru_cache -from hashlib import md5 -from pathlib import Path -from shutil import copyfile -from struct import pack, unpack -from typing import List, Optional, Union -from urllib.parse import urljoin - -import h5py -import requests - -from cal_tools.calcat_interface2 import ( - CalCatAPIClient, - CalCatAPIError, - _get_default_caldb_root, -) -from cal_tools.restful_config import extra_calibration_client - -CALIBRATION_CONSTANT_VERSIONS = "calibration_constant_versions" - - -class InjectAPIError(CalCatAPIError): - ... - - -class CCVAlreadyInjectedError(InjectAPIError): - ... - - -@dataclass -class ParameterConditionAttribute: - parameter_id: int - value: str - lower_deviation_value: float = 0 - upper_deviation_value: float = 0 - flg_available: str = 'true' - description: str = '' - - -@dataclass(frozen=True) -class Condition: - name: str - parameters_conditions_attributes: List[ParameterConditionAttribute] = field(default_factory=list) - flg_available: str = 'true' - event_at: str = str(datetime.today()) # TODO: Why is this needed? it is not written in swagger. - description: str = '' - - -@dataclass(frozen=True) -class ConditionRequest: - condition: Condition - - -@dataclass(frozen=True) -class CalibrationConstant: - name: str - detector_type_id: int - calibration_id: int - condition_id: int - flg_auto_approve: str = 'true' - flg_available: str = 'true' - description: str = "" - - -@dataclass(frozen=True) -class Report: - name: str - file_path: str - flg_available: str = 'true' - description: str = "" - - -@dataclass(frozen=True) -class CalibrationConstantVersion: - name: str - file_name: str - path_to_file: str - data_set_name: str - calibration_constant_id: int - physical_detector_unit_id: int - begin_validity_at: str - end_validity_at: str - begin_at: str - start_idx: str = '0' - end_idx: str = '0' - flg_deployed: str = 'true' - flg_good_quality: str = 'true' - raw_data_location: str = '' - report_id: Optional[int] = None - description: str = '' - - -def generate_unique_cond_name(detector_type, pdu_name, pdu_uuid, cond_params): - # Generate condition name. - unique_name = detector_type[:detector_type.index('-Type')] + ' Def' - cond_hash = md5(pdu_name.encode()) - cond_hash.update(int(pdu_uuid).to_bytes( - length=8, byteorder='little', signed=False)) - - for pname, pattrs in cond_params.items(): - cond_hash.update(pname.encode()) - cond_hash.update(str(pattrs.value).encode()) - - unique_name += binascii.b2a_base64(cond_hash.digest()).decode() - return unique_name[:60] - - -def create_unique_cc_name(det_type, calibration, condition_name): - """ - Generating CC name from condition name, - detector type, and calibration name. - """ - cc_name_1 = f'{det_type}_{calibration}' - return f'{cc_name_1[:40]}_{condition_name}' # I guess there is a limit to the name characters? - - -def create_unique_ccv_name(start_idx): - # Generate unique name if it doesn't exist - datetime_str = datetime_str = datetime.now( - timezone.utc).strftime('%Y%m%d_%H%M%S') - return f'{datetime_str}_sIdx={start_idx}' - - -def get_raw_data_location(proposal, runs): - if proposal and len(runs) > 0: - return ( - f'proposal:{proposal} runs: {" ".join([str(x) for x in runs])}') - else: - return "" # Fallback for non-run based constants - - -def _failed_response(resp): - # TODO: Add more errors if needed - if CALIBRATION_CONSTANT_VERSIONS in resp.url.lstrip("/"): - if resp.status_code == 422: - raise CCVAlreadyInjectedError - elif resp.status_code >= 400: - try: - d = json.loads(resp.content.decode("utf-8")) - except Exception: - resp.raise_for_status() - else: - raise InjectAPIError( - f"Error {resp.status_code} from API: " - f"{d.get('info', 'missing details')}" - ) - - -class InjectAPI(CalCatAPIClient): - def __init__(self, base_api_url, oauth_client=None, user_email=""): - super().__init__( - base_api_url=base_api_url, - oauth_client=oauth_client, - user_email=user_email - ) - - @staticmethod - def _parse_post_response(resp: requests.Response): - - if resp.status_code >= 400: - _failed_response(resp) - - if resp.content == b"": - return None - else: - return json.loads(resp.content.decode("utf-8")) - - # ------------------ - # Cached wrappers for simple ID lookups of fixed-ish info - # - # N.B. lru_cache behaves oddly with instance methods (it's a global cache, - # with the instance as part of the key), but in this case it should be OK. - @lru_cache - def get_by_name(self, endpoint, name, name_key="name"): - res = self.get(endpoint, {name_key: name}) - if not res: - raise KeyError(f"No {endpoint[:-1]} with name {name}") - elif len(res) > 1: - raise ValueError(f"Multiple {endpoint} found with name {name}") - return res[0] - - @lru_cache - def parameter_by_name(self, name): - return self.get_by_name("parameters", name) - - @lru_cache - def calibration_by_name(self, name): - return self.get_by_name("calibrations", name) - - @lru_cache - def detector_type_by_name(self, name): - return self.get_by_name("detector_types", name) - - @lru_cache - def pdu_by_name(self, name): - return self.get_by_name( - "physical_detector_units", name, name_key="physical_name") - - @lru_cache - def get_report(self, report: Report): - resp = self.get("reports", asdict(report)) - # `Get all reports` response is a list - return resp if not resp else resp[0] - - @lru_cache - def get_calibration_constant( - self, calibration_constant: CalibrationConstant): - return self.get( - f"calibrations/{calibration_constant.calibration_id}/get_calibration_constant", - asdict(calibration_constant) - ) - - def post_request(self, relative_url, data=None, headers=None, **kwargs): - """Make a POST request, return the HTTP response object""" - # Base URL may include e.g. '/api/'. This is a prefix for all URLs; - # even if they look like an absolute path. - url = urljoin(self.base_api_url, relative_url.lstrip("/")) - _headers = self.default_headers() - if headers: - _headers.update(headers) - return self.session.post(url, data=json.dumps(data), headers=_headers, **kwargs) - - def post(self, relative_url, params=None, **kwargs): - """Make a POST request, return response content from JSON""" - resp = self.post_request(relative_url, params, **kwargs) - return self._parse_post_response(resp) - - def set_expected_condition(self, conditions: ConditionRequest): - return self.post("conditions/set_expected_condition", asdict(conditions)) - - def create_calibration_constant( - self, calibration_constant: CalibrationConstant): - return self.post("calibration_constants", asdict(calibration_constant)) - - def get_or_create_report(self, report: Report): - # Based on create or get API - return self.post("reports/set", asdict(report)) - - def create_calibration_constant_version( - self, ccv: CalibrationConstantVersion): - return self.post(CALIBRATION_CONSTANT_VERSIONS, asdict(ccv)) - - def create_new_ccv( - self, - cond_params, - begin_at, - pdu_name, - pdu_uuid, - detector_type, - calibration, - const_rel_path, - const_filename, - ccv_root, - report_to=None, - raw_data_location='', - ): - """Inject new CCV into CalCat.""" - - cond_name = generate_unique_cond_name( - detector_type, pdu_name, pdu_uuid, cond_params) - - # Create condition table in database, if not available. - resp = self.set_condition(cond_name, list(cond_params.values())) - condition_id = resp['id'] - - # Prepare some parameters to set Calibration Constant. - cal_id = self.calibration_by_name(calibration)['id'] - det_type_id = self.detector_type_by_name(detector_type)['id'] - cc_name = create_unique_cc_name(detector_type, calibration, cond_name) - - # Create Calibration Constant in database, if not available. - resp = self.get_or_create_calibration_constant( - cc_name, cal_id, det_type_id, condition_id) - cc_id = resp["id"] - - # Create report in database, if not available - report_id = None - if report_to: - report_path = Path(report_to).absolute().with_suffix('.pdf') - resp = self.get_or_create_report( - Report(name=report_path.stem, file_path=str(report_path))) - report_id = resp["id"] - - # Get PDU ID before creating new CCV. - resp = self.pdu_by_name(pdu_name) - pdu_id = resp["id"] - start_idx = 0 - ccv = CalibrationConstantVersion( - name=create_unique_ccv_name(start_idx), - file_name=const_filename, - path_to_file=const_rel_path, - data_set_name=ccv_root, - calibration_constant_id=cc_id, - physical_detector_unit_id=pdu_id, - raw_data_location=raw_data_location, - report_id=report_id, - begin_validity_at=begin_at, - end_validity_at='', - begin_at=begin_at, - start_idx=start_idx - ) - - return self.create_calibration_constant_version(ccv) - - def set_condition(self, name, parameter_conditions): - cond = Condition( - name=name, parameters_conditions_attributes=parameter_conditions) - return self.set_expected_condition(ConditionRequest(cond)) - - def get_or_create_calibration_constant( - self, name: str, cal_id: int, det_type_id: int, condition_id: int): - cond_id = condition_id - - calibration_constant = CalibrationConstant( - name=name, - calibration_id=cal_id, - condition_id=cond_id, - detector_type_id=det_type_id - ) - resp = self.get_calibration_constant(calibration_constant) - return resp if resp else self.create_calibration_constant( - calibration_constant) - - def set_calibration_constant_version(self, ccv: CalibrationConstantVersion): - return self.create_calibration_constant_version(ccv) - - -def extract_parameter_conditions(client, ccv_group, pdu_uuid): - def _to_string(value): - """Send only accepted value types to CALCAT.""" - if isinstance(value, bool): - value = float(value) - return str(value) - - cond_params = {} - condition_group = ccv_group['operating_condition'] - # It's really not ideal we're mixing conditionS and condition now. - # Get parameter data and create a list of `ParameterConditionAttribute`s - for parameter in condition_group: - param_dset = condition_group[parameter] - param_name = param_dset.attrs['database_name'] - cond_params[param_name] = ParameterConditionAttribute( - parameter_id=client.parameter_by_name(param_name)['id'], - value=_to_string(param_dset[()]), - lower_deviation_value=param_dset.attrs['lower_deviation'], - upper_deviation_value=param_dset.attrs['upper_deviation'], - ) - det_uuid = 'Detector UUID' - # Add PDU "UUID" to parameters. - cond_params[det_uuid] = ParameterConditionAttribute( - value=_to_string(unpack('d', pack('q', pdu_uuid))[0]), - parameter_id=client.parameter_by_name(det_uuid)['id'], - ) - return cond_params - - -def get_ccv_info_from_file( - client: InjectAPI, cfile: Union[str, Path], pdu: str, ccv_root: str): - - with h5py.File(cfile, 'r') as const_file: - pdu_group = const_file[pdu] - pdu_uuid = pdu_group.attrs['uuid'] - detector_type = pdu_group.attrs['detector_type'] - ccv_group = const_file[ccv_root] - raw_data_location = get_raw_data_location( - ccv_group.attrs['proposal'], - ccv_group.attrs['runs'] - ) - begin_at = ccv_group.attrs['begin_at'] - cond_params = extract_parameter_conditions(client, ccv_group, pdu_uuid) - - return cond_params, begin_at, pdu_uuid, detector_type, raw_data_location - - -def inject_ccv( - const_src: Union[str, Path], - ccv_root: str, # pdu/calibration/key - report_to: Optional[Union[str, Path]] = None, - client: Optional[InjectAPI] = None, -): - if client is None: - client = extra_calibration_client(inject=True) - - pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') - - ( - cond_params, begin_at, pdu_uuid, detector_type, raw_data_location - ) = client.get_ccv_info_from_file(client, const_src, pdu_name, ccv_root) - - const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' - const_filename = f'cal.{time.time()}.h5' - const_dest = _get_default_caldb_root() / const_rel_path / const_filename - const_dest.parent.mkdir(parents=True, exist_ok=True) - copyfile(const_src, const_dest) - - try: - client.create_new_ccv( - cond_params, - begin_at, - pdu_name, - pdu_uuid, - detector_type, - calibration, - const_rel_path, - const_filename, - ccv_root, - report_to, - raw_data_location, - ) - except Exception as e: - const_dest.unlink() - raise e