diff --git a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb index 76c1c1991b311d6cd7f8dc1c94dfac439753a50f..273f01c7badceb5119f496a1875d3f6a8793e4d6 100644 --- a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb @@ -245,7 +245,7 @@ " gain_setting=gain_setting,\n", " gain_mode=gain_mode,\n", ")\n", - "cc = extra_calibration_client()\n", + "cc = extra_calibration_client(inject=True)\n", "det_id = cc.detector_by_identifier(karabo_id)['id']\n", "pdus = pdus_by_detector_id(cc, det_id, snapshot_at=creation_time)\n", "\n", @@ -610,19 +610,19 @@ " constants['Noise10Hz'] = np.moveaxis(noise_map[mod], 0, 1)\n", " constants['BadPixelsDark10Hz'] = np.moveaxis(bad_pixels_map[mod], 0, 1)\n", "\n", - " md = None\n", " for const_name, const_data in constants.items():\n", " with NamedTemporaryFile(dir=out_folder) as tempf:\n", " ccv_root = write_ccv(\n", " tempf.name,\n", - " pdu,\n", - " pdu_to_uuid[pdu],\n", - " detector_info[\"detector_type\"],\n", - " const_name,\n", - " conditions,\n", - " creation_time,\n", - " proposal,[run_high, run_med, run_low],\n", - " const_data,\n", + " pdu_name=pdu,\n", + " pdu_uuid=pdu_to_uuid[pdu],\n", + " detector_type=detector_info[\"detector_type\"],\n", + " calibration=const_name,\n", + " conditions=conditions,\n", + " created_at=creation_time,\n", + " proposal=proposal,\n", + " runs=[run_high, run_med, run_low],\n", + " data=const_data,\n", " dims=[\"fast_scan\", \"slow_scan\", \"cell\", \"gain\"],\n", " deviations={\"integration_time\": time_limits},\n", " )\n", @@ -654,7 +654,7 @@ " f\"• Exposure timeout: {exposure_timeout}\\n\"\n", " f\"• Gain setting: {gain_setting}\\n\"\n", " f\"• Gain mode: {gain_mode}\\n\"\n", - " f\"• Creation time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\\n\") # noqa\n", + " f\"• Creation time: {creation_time}\\n\") # noqa\n", "step_timer.done_step(\"Injecting constants.\")" ] }, @@ -762,7 +762,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.10" + "version": "3.11.8" } }, "nbformat": 4, diff --git a/src/cal_tools/calcat_interface2.py b/src/cal_tools/calcat_interface2.py index f22b745f9d56d23a784de63953941f0af4bb92ad..4cab3893fbc61ad1faacab9cea51d2729cf87327 100644 --- a/src/cal_tools/calcat_interface2.py +++ b/src/cal_tools/calcat_interface2.py @@ -29,6 +29,9 @@ class ModuleNameError(KeyError): class CalCatAPIError(requests.HTTPError): """Used when the response includes error details as JSON""" + @property + def status_code(self): + return self.response.status_code class CalCatAPIClient: @@ -46,6 +49,10 @@ class CalCatAPIClient: # Ensure the base URL has a trailing slash self.base_api_url = base_api_url.rstrip("/") + "/" + def __repr__(self): + auth = " (with Oauth)" if self.oauth_client else "" + return f"<CalCatAPIClient for {self.base_api_url}{auth}>" + def default_headers(self): return { "content-type": "application/json", @@ -61,6 +68,8 @@ class CalCatAPIClient: return dt.astimezone(timezone.utc).isoformat() elif isinstance(dt, date): return cls.format_time(datetime.combine(dt, time())) + elif dt is None: + return "" # Not specified - for searches, this usually means now elif not isinstance(dt, str): raise TypeError( f"Timestamp parameter ({dt!r}) must be a string, datetime or " @@ -69,7 +78,7 @@ class CalCatAPIClient: return dt - def get_request(self, relative_url, params=None, headers=None, **kwargs): + def request(self, method, relative_url, params=None, headers=None, **kwargs): """Make a GET request, return the HTTP response object""" # Base URL may include e.g. '/api/'. This is a prefix for all URLs; # even if they look like an absolute path. @@ -77,7 +86,9 @@ class CalCatAPIClient: _headers = self.default_headers() if headers: _headers.update(headers) - return self.session.get(url, params=params, headers=_headers, **kwargs) + return self.session.request( + method, url, params=params, headers=_headers, **kwargs + ) @staticmethod def _parse_response(resp: requests.Response): @@ -89,7 +100,8 @@ class CalCatAPIClient: else: raise CalCatAPIError( f"Error {resp.status_code} from API: " - f"{d.get('info', 'missing details')}" + f"{d.get('info', 'missing details')}", + response=resp ) if resp.content == b"": @@ -99,7 +111,7 @@ class CalCatAPIClient: def get(self, relative_url, params=None, **kwargs): """Make a GET request, return response content from JSON""" - resp = self.get_request(relative_url, params, **kwargs) + resp = self.request('GET', relative_url, params, **kwargs) return self._parse_response(resp) _pagination_headers = ( @@ -111,7 +123,7 @@ class CalCatAPIClient: def get_paged(self, relative_url, params=None, **kwargs): """Make a GET request, return response content & pagination info""" - resp = self.get_request(relative_url, params, **kwargs) + resp = self.request('GET', relative_url, params, **kwargs) content = self._parse_response(resp) pagination_info = { k[2:].lower().replace("-", "_"): int(resp.headers[k]) @@ -120,6 +132,11 @@ class CalCatAPIClient: } return content, pagination_info + def post(self, relative_url, json, **kwargs): + """Make a POST request, return response content from JSON""" + resp = self.request('POST', relative_url, json=json, **kwargs) + return self._parse_response(resp) + # ------------------ # Cached wrappers for simple ID lookups of fixed-ish info # @@ -136,24 +153,35 @@ class CalCatAPIClient: # -------------------- # Shortcuts to find 1 of something by an ID-like field (e.g. name) other # than CalCat's own integer IDs. Error on no match or >1 matches. - @lru_cache() - def detector_by_identifier(self, identifier): - # The "identifier", "name" & "karabo_name" fields seem to have the same names - res = self.get("detectors", {"identifier": identifier}) + @lru_cache + def get_by_name(self, endpoint, name, name_key="name"): + res = self.get(endpoint, {name_key: name}) if not res: - raise KeyError(f"No detector with identifier {identifier}") + raise KeyError(f"No {endpoint[:-1]} with name {name}") elif len(res) > 1: - raise ValueError(f"Multiple detectors found with identifier {identifier}") + raise ValueError(f"Multiple {endpoint} found with name {name}") return res[0] - @lru_cache() + def detector_by_identifier(self, identifier): + return self.get_by_name( + "detectors", identifier, name_key="identifier") + def calibration_by_name(self, name): - res = self.get("calibrations", {"name": name}) - if not res: - raise KeyError(f"No calibration with name {name}") - elif len(res) > 1: - raise ValueError(f"Multiple calibrations found with name {name}") - return res[0] + return self.get_by_name("calibrations", name) + + def parameter_by_name(self, name): + return self.get_by_name("parameters", name) + + # Keeping the cache here instead of in other methods + # because it's less likely a new detector type will be edited + # in CalCat without the need to add new ConditionBase class for it. + @lru_cache + def detector_type_by_name(self, name): + return self.get_by_name("detector_types", name) + + def pdu_by_name(self, name): + return self.get_by_name( + "physical_detector_units", name, name_key="physical_name") global_client = None @@ -207,7 +235,7 @@ def setup_client( if oauth_client is None and base_url == CALCAT_PROXY_URL: try: # timeout=(connect_timeout, read_timeout) - global_client.get_request("me", timeout=(1, 5)) + global_client.request("GET", "me", timeout=(1, 5)) except requests.ConnectionError as e: raise RuntimeError( "Could not connect to calibration catalog proxy. This proxy allows " diff --git a/src/cal_tools/constants.py b/src/cal_tools/constants.py index b8cb345a3855fb6b271ffa7722233f77cef9e9bb..5759e2ecb02132c92e98ceca64a3070cadd4e024 100644 --- a/src/cal_tools/constants.py +++ b/src/cal_tools/constants.py @@ -1,32 +1,120 @@ import binascii +import logging import time +from dataclasses import asdict, dataclass +from datetime import datetime, timezone from hashlib import md5 from pathlib import Path from shutil import copyfile from struct import pack, unpack -from typing import List, Optional, Union +from typing import Tuple, Union import h5py import numpy as np -from calibration_client import CalibrationClient from cal_tools.calcat_interface2 import ( + CalCatAPIClient, CalCatAPIError, + get_client, get_default_caldb_root, ) -from cal_tools.restful_config import calibration_client CONDITION_NAME_MAX_LENGTH = 60 +log = logging.getLogger(__name__) -class InjectAPIError(CalCatAPIError): - ... +class InjectionError(Exception): + ... -class CCVAlreadyInjectedError(InjectAPIError): +class CCVAlreadyInjectedError(InjectionError): ... +@dataclass +class ParameterConditionAttribute: + value: str + lower_deviation_value: float = 0 + upper_deviation_value: float = 0 + flg_available: bool = True + description: str = '' + + +def generate_unique_condition_name( + detector_type: str, + pdu_name: str, + pdu_uuid: float, + cond_params: dict, +): + """Generate a unique condition using UUID and timestamp. + + Args: + detector_type (str): detector type. + pdu_name (str): Physical detector unit db name. + pdu_uuid (float): Physical detector unit db id. + cond_params (dict): Keys DB names, values ParameterConditionAttribute + + Returns: + str: A unique name used for the table of conditions. + """ + unique_name = detector_type[:detector_type.index('-Type')] + ' Def' + cond_hash = md5(pdu_name.encode()) + cond_hash.update(int(pdu_uuid).to_bytes( + length=8, byteorder='little', signed=False)) + + for pname, pattrs in cond_params.items(): + cond_hash.update(pname.encode()) + cond_hash.update(str(pattrs.value).encode()) + + unique_name += binascii.b2a_base64(cond_hash.digest()).decode() + return unique_name[:CONDITION_NAME_MAX_LENGTH] + + +def create_unique_cc_name(det_type, calibration, condition_name): + """ + Generating CC name from condition name, + detector type, and calibration name. + """ + cc_name_1 = f'{det_type}_{calibration}' + # I guess there is a limit to the name characters? + return f'{cc_name_1[:40]}_{condition_name}' + + +def create_unique_ccv_name(start_idx): + # Generate unique name if it doesn't exist + datetime_str = datetime.now( + timezone.utc).strftime('%Y%m%d_%H%M%S') + return f'{datetime_str}_sIdx={start_idx}' + + +def extract_parameter_conditions( + ccv_group: dict, pdu_uuid: int) -> dict: + def _to_string(value): + """Send only accepted value types to CALCAT.""" + if isinstance(value, bool): + value = float(value) + return str(value) + + cond_params = {} + condition_group = ccv_group['operating_condition'] + # It's really not ideal we're mixing conditionS and condition now. + # Get parameter data and create a list of `ParameterConditionAttribute`s + for parameter in condition_group: + param_dset = condition_group[parameter] + param_name = param_dset.attrs['database_name'] + cond_params[param_name] = ParameterConditionAttribute( + value=_to_string(param_dset[()]), + lower_deviation_value=param_dset.attrs['lower_deviation'], + upper_deviation_value=param_dset.attrs['upper_deviation'], + ) + + # Add PDU "UUID" to parameters. + cond_params['Detector UUID'] = ParameterConditionAttribute( + value=_to_string(unpack('d', pack('q', pdu_uuid))[0]), + ) + return cond_params + + def write_ccv( const_path, pdu_name, pdu_uuid, detector_type, @@ -130,40 +218,68 @@ def get_condition_dict( } -def generate_unique_condition_name( +def get_or_create_calibration_constant( + client: CalCatAPIClient, + calibration: str, + detector_type: str, + condition_id: int, + condition_name: str, +): + cond_id = condition_id + + # Prepare some parameters to set Calibration Constant. + cal_id = client.calibration_by_name(calibration)['id'] + det_type_id = client.detector_type_by_name(detector_type)['id'] + cc_name = create_unique_cc_name(detector_type, calibration, condition_name) + + calibration_constant = dict( + name=cc_name, + calibration_id=cal_id, + condition_id=cond_id, + detector_type_id=det_type_id, + flg_auto_approve=True, + flg_available=True, + description="", + ) + try: + cc_id = client.get( + f"calibrations/{cal_id}/get_calibration_constant", + calibration_constant + )['id'] + log.debug("Retrieved existing calibration constant ID %s", cc_id) + except CalCatAPIError as e: + if e.status_code != 404: + raise + cc_id = client.post("calibration_constants", calibration_constant)['id'] + log.debug("Created calibration constant ID %s", cc_id) + return cc_id + + +def create_condition( + client: CalCatAPIClient, detector_type: str, pdu_name: str, pdu_uuid: float, - cond_params: List[dict], -): - """Generate a unique condition using UUID and timestamp. - - Args: - detector_type (str): detector type. - pdu_name (str): Physical detector unit db name. - pdu_uuid (float): Physical detector unit db id. - cond_params (List[dict]): A list of dictionary with each condition - e.g. [{ - "parameter_name": "Memory Cells", - "value": 352.0, - "lower-deviation": 0.0, - "upper-deviation": 0.0 - }] - - Returns: - str: A unique name used for the table of conditions. - """ - unique_name = detector_type[:detector_type.index('-Type')] + ' Def' - cond_hash = md5(pdu_name.encode()) - cond_hash.update(int(pdu_uuid).to_bytes( - length=8, byteorder='little', signed=False)) - - for param_dict in cond_params: - cond_hash.update(str(param_dict['parameter_name']).encode()) - cond_hash.update(str(param_dict['value']).encode()) + cond_params: dict, + ) -> Tuple[int, str]: + # Create condition unique name + cond_name = generate_unique_condition_name( + detector_type, pdu_name, pdu_uuid, cond_params) - unique_name += binascii.b2a_base64(cond_hash.digest()).decode() - return unique_name[:CONDITION_NAME_MAX_LENGTH] + # Create condition in database, if not already there. + cond = dict( + name=cond_name, + parameters_conditions_attributes=[ + asdict(cond) | {"parameter_name": db_name} + for (db_name, cond) in cond_params.items() + ], + flg_available=True, + description='', + ) + resp = client.post( + "conditions/set_expected_condition", {"condition": cond} + ) + return resp['id'], cond_name def get_raw_data_location(proposal: str, runs: list): @@ -174,112 +290,112 @@ def get_raw_data_location(proposal: str, runs: list): return "" # Fallback for non-run based constants -def inject_ccv( - const_src: Union[Path, str], - ccv_root: str, - report_to: Optional[str] = None, -): - """Inject new CCV into CalCat. - - Args: - const_path (str or Path): Path to CCV data file. - ccv_root (str): CCV HDF group name. - report_to (str): Metadata location. +def get_ccv_info_from_file( + cfile: Union[str, Path], pdu: str, ccv_root: str): - Raises: - RuntimeError: If CalCat POST request fails. """ - pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') + Read CCV HDF5 file to get calibration parameters. - with h5py.File(const_src, 'r') as const_file: - if ccv_root not in const_file: - raise ValueError( - f"Invalid HDF5 structure: {ccv_root} not found in file.") + Args: + cfile (str, Path): The CalibrationConstantVersion file path. + pdu (str): The Physical detector unit name for the stored constant. + ccv_root (str): The CCV root dataset path to access the data. - pdu_group = const_file[pdu_name] + Returns: + List[ParameterConditionAttribute], str, float, str, str + """ + with h5py.File(cfile, 'r') as const_file: + pdu_group = const_file[pdu] pdu_uuid = pdu_group.attrs['uuid'] detector_type = pdu_group.attrs['detector_type'] - ccv_group = const_file[ccv_root] + raw_data_location = get_raw_data_location( + ccv_group.attrs['proposal'], + ccv_group.attrs['runs'] + ) + begin_at = ccv_group.attrs['begin_at'] + cond_params = extract_parameter_conditions(ccv_group, pdu_uuid) - proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs'] - begin_at_str = ccv_group.attrs['begin_at'] - - condition_group = ccv_group['operating_condition'] + return cond_params, begin_at, pdu_uuid, detector_type, raw_data_location - cond_params = [] - # It's really not ideal we're mixing conditionS and condition now. - for parameter in condition_group: - param_dset = condition_group[parameter] - cond_params.append(get_condition_dict( - param_dset.attrs['database_name'], - param_dset[()], - param_dset.attrs['lower_deviation'], - param_dset.attrs['upper_deviation'], - )) - const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' - const_filename = f'cal.{time.time()}.h5' - - unique_name = generate_unique_condition_name( - detector_type, pdu_name, pdu_uuid, cond_params) +def inject_ccv(const_src, ccv_root, report_to=None, client=None): + """Inject new CCV into CalCat. - raw_data_location = get_raw_data_location(proposal, runs) + Args: + const_src (str or Path): Path to CCV data file. + ccv_root (str): CCV HDF group name. + report_to (str): Metadata location. + client (CalCatAPIClient, optional): Client for CalCat API. + Raises: + RuntimeError: If CalCat POST request fails. + """ + if client is None: + client = get_client() - # Add PDU "UUID" to parameters. - cond_params.append(get_condition_dict( - 'Detector UUID', - unpack('d', pack('q', pdu_uuid))[0] - )) - - inject_h = { - 'detector_condition': { - 'name': unique_name, - 'parameters': cond_params - }, - 'calibration_constant': { - 'calibration_name': calibration, - 'detector_type_name': detector_type, - 'flg_auto_approve': True - }, - 'calibration_constant_version': { - 'raw_data_location': raw_data_location, - 'file_name': const_filename, - 'path_to_file': const_rel_path, - 'data_set_name': f'{pdu_name}/{calibration}/0', - 'start_idx': '0', - 'end_idx': '0', - 'begin_validity_at': begin_at_str, - 'end_validity_at': '', - 'begin_at': begin_at_str, - 'pdu_physical_name': pdu_name, - 'flg_good_quality': True - } - } + pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') - if report_to: - report_path = Path(report_to).absolute().with_suffix('.pdf') - inject_h['report'] = { - 'name': report_path.stem, - 'file_path': str(report_path) - } + ( + cond_params, begin_at, pdu_uuid, detector_type, raw_data_location + ) = get_ccv_info_from_file(const_src, pdu_name, ccv_root) + const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' + const_filename = f'cal.{time.time()}.h5' const_dest = get_default_caldb_root() / const_rel_path / const_filename const_dest.parent.mkdir(parents=True, exist_ok=True) copyfile(const_src, const_dest) - # TODO: Consider catching `RequestException`s - # when bypassing calibration_client - resp = CalibrationClient.inject_new_calibration_constant_version( - calibration_client(), inject_h) - - if not resp['success']: + try: + condition_id, condition_name = create_condition( + client, detector_type, pdu_name, pdu_uuid, cond_params) + log.debug("Condition ID: %s & name: %r", condition_id, condition_name) + + # Create Calibration Constant in database, if not available. + cc_id = get_or_create_calibration_constant( + client, calibration, detector_type, condition_id, condition_name) + + # Create report in database, if not available + report_id = None + if report_to: + report_path = Path(report_to).absolute().with_suffix('.pdf') + resp = client.post("reports/set", dict( + name=report_path.name, + file_path=str(report_path), + flg_available=True, + description="", + )) + report_id = resp['id'] + log.debug("CalCat report ID: %s", report_id) + + # Get PDU ID before creating new CCV. + pdu_id = client.pdu_by_name(pdu_name)['id'] + + # Prepare CCV data and inject it to CALCAT. + start_idx = 0 + ccv = dict( + name=create_unique_ccv_name(start_idx), + file_name=const_filename, + path_to_file=const_rel_path, + data_set_name=ccv_root, + calibration_constant_id=cc_id, + physical_detector_unit_id=pdu_id, + raw_data_location=raw_data_location, + report_id=report_id, + begin_validity_at=begin_at, + end_validity_at='', + begin_at=begin_at, + start_idx=start_idx, + end_idx=0, + flg_deployed=True, + flg_good_quality=True, + description='', + ) + try: + client.post("calibration_constant_versions", ccv) + except CalCatAPIError as e: + if e.status_code == 422: + raise CCVAlreadyInjectedError + raise + except Exception: const_dest.unlink() # Delete already copied CCV file. - # TODO: Remove this when the new injection code is added. - if ( - resp['status_code'] == 422 and - "taken" in resp['app_info'].get("begin_at", [""])[0] - ): - raise CCVAlreadyInjectedError - else: - raise RuntimeError(resp) + raise diff --git a/src/cal_tools/restful_config.py b/src/cal_tools/restful_config.py index 671dec619ffe287f26e9d4b415e3e3d86e212948..0ee008e61cc89b43c06824a9494c8aef0ac49627 100644 --- a/src/cal_tools/restful_config.py +++ b/src/cal_tools/restful_config.py @@ -44,7 +44,7 @@ def calibration_client(): scope='') -def extra_calibration_client(): +def extra_calibration_client(inject=False): """Obtain an initialized CalCatAPIClient object.""" from cal_tools import calcat_interface2 @@ -57,6 +57,12 @@ def extra_calibration_client(): base_api_url = calcat_config['base-api-url'].rstrip('/') assert base_api_url.endswith('/api') base_url = base_api_url[:-4] + if inject and '//exflcalproxy' in base_url: + raise ValueError( + "cal_tools would use exflcalproxy to talk to CalCat, but this " + "provides read-only access, and we want to inject constants. " + "You need to configure a connection to CalCat using Oauth." + ) calcat_interface2.setup_client( base_url,