From 60eb6f61a63b28724658d518ba0a2b68a5c4fdef Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas.kluyver@xfel.eu> Date: Wed, 4 Dec 2024 18:44:53 +0100 Subject: [PATCH] Refactor injection API to wrap CalCatAPIClient instead of subclassing --- src/cal_tools/calcat_interface2.py | 53 +++------ src/cal_tools/constants.py | 178 +++++++---------------------- src/cal_tools/restful_config.py | 36 +++--- 3 files changed, 72 insertions(+), 195 deletions(-) diff --git a/src/cal_tools/calcat_interface2.py b/src/cal_tools/calcat_interface2.py index dd159e011..611692e06 100644 --- a/src/cal_tools/calcat_interface2.py +++ b/src/cal_tools/calcat_interface2.py @@ -29,27 +29,10 @@ class ModuleNameError(KeyError): class CalCatAPIError(requests.HTTPError): """Used when the response includes error details as JSON""" - ... - -class CalibrationConstantNotFound(CalCatAPIError): - ... - + @property + def status_code(self): + return self.response.status_code -def _get_failed_response(resp): - # TODO: Add more errors if needed - if "calibration_constant" in resp.url.lstrip("/"): - if resp.status_code == 404: - raise CalibrationNotFound("Calibration Constant was not found in CALCAT.") - if resp.status_code >= 400: - try: - d = json.loads(resp.content.decode("utf-8")) - except Exception: - resp.raise_for_status() - else: - raise CalCatAPIError( - f"Error {resp.status_code} from API: " - f"{d.get('info', 'missing details')}" - ) class CalCatAPIClient: def __init__(self, base_api_url, oauth_client=None, user_email=""): @@ -89,7 +72,7 @@ class CalCatAPIClient: return dt - def get_request(self, relative_url, params=None, headers=None, **kwargs): + def request(self, method, relative_url, params=None, headers=None, **kwargs): """Make a GET request, return the HTTP response object""" # Base URL may include e.g. '/api/'. This is a prefix for all URLs; # even if they look like an absolute path. @@ -97,7 +80,9 @@ class CalCatAPIClient: _headers = self.default_headers() if headers: _headers.update(headers) - return self.session.get(url, params=params, headers=_headers, **kwargs) + return self.session.request( + method, url, params=params, headers=_headers, **kwargs + ) @staticmethod def _parse_response(resp: requests.Response): @@ -109,7 +94,8 @@ class CalCatAPIClient: else: raise CalCatAPIError( f"Error {resp.status_code} from API: " - f"{d.get('info', 'missing details')}" + f"{d.get('info', 'missing details')}", + response=resp ) if resp.content == b"": @@ -119,7 +105,7 @@ class CalCatAPIClient: def get(self, relative_url, params=None, **kwargs): """Make a GET request, return response content from JSON""" - resp = self.get_request(relative_url, params, **kwargs) + resp = self.request('GET', relative_url, params, **kwargs) return self._parse_response(resp) _pagination_headers = ( @@ -131,7 +117,7 @@ class CalCatAPIClient: def get_paged(self, relative_url, params=None, **kwargs): """Make a GET request, return response content & pagination info""" - resp = self.get_request(relative_url, params, **kwargs) + resp = self.request('GET', relative_url, params, **kwargs) content = self._parse_response(resp) pagination_info = { k[2:].lower().replace("-", "_"): int(resp.headers[k]) @@ -140,6 +126,11 @@ class CalCatAPIClient: } return content, pagination_info + def post(self, relative_url, json, **kwargs): + """Make a POST request, return response content from JSON""" + resp = self.request('POST', relative_url, json=json, **kwargs) + return self._parse_response(resp) + # ------------------ # Cached wrappers for simple ID lookups of fixed-ish info # @@ -153,16 +144,6 @@ class CalCatAPIClient: def detector_by_id(self, det_id): return self.get(f"detectors/{det_id}") - def pdus_by_detector_name(self, det_name, snapshot_at=""): - det_id = self.detector_by_identifier(det_name)['id'] - return self.pdus_by_detector_id(det_id, snapshot_at) - - def get_calibration_constant( - self, calibration_constant: dict): - return self.get( - f"calibrations/{calibration_constant['calibration_id']}/get_calibration_constant", # noqa # noqa - calibration_constant) - # -------------------- # Shortcuts to find 1 of something by an ID-like field (e.g. name) other # than CalCat's own integer IDs. Error on no match or >1 matches. @@ -248,7 +229,7 @@ def setup_client( if oauth_client is None and base_url == CALCAT_PROXY_URL: try: # timeout=(connect_timeout, read_timeout) - global_client.get_request("me", timeout=(1, 5)) + global_client.request("GET", "me", timeout=(1, 5)) except requests.ConnectionError as e: raise RuntimeError( "Could not connect to calibration catalog proxy. This proxy allows " diff --git a/src/cal_tools/constants.py b/src/cal_tools/constants.py index e62940280..ac56c1e7a 100644 --- a/src/cal_tools/constants.py +++ b/src/cal_tools/constants.py @@ -1,5 +1,4 @@ import binascii -import json import time from dataclasses import asdict, dataclass @@ -8,95 +7,52 @@ from hashlib import md5 from pathlib import Path from shutil import copyfile from struct import pack, unpack -from typing import List, Tuple, Union -from urllib.parse import urljoin +from typing import Tuple, Union import h5py import numpy as np -import requests from cal_tools.calcat_interface2 import ( CalCatAPIClient, CalCatAPIError, + get_client, get_default_caldb_root, - CalibrationConstantNotFound ) -from oauth2_xfel_client import Oauth2ClientBackend CONDITION_NAME_MAX_LENGTH = 60 -class InjectAPIError(CalCatAPIError): +class InjectionError(Exception): ... - -class CCVAlreadyInjectedError(InjectAPIError): +class CCVAlreadyInjectedError(InjectionError): ... -def _post_failed_response(resp): - # TODO: Add more errors if needed - if "calibration_constant_versions" in resp.url.lstrip("/"): - if resp.status_code == 422: - raise CCVAlreadyInjectedError - if resp.status_code >= 400: - try: - d = json.loads(resp.content.decode("utf-8")) - except Exception: - resp.raise_for_status() - else: - raise InjectAPIError( - f"Error {resp.status_code} from API: " - f"{d.get('info', 'missing details')}" - ) - - -class InjectAPI(CalCatAPIClient): - def __init__(self, base_api_url, oauth_client=None, user_email=""): - # Call the parent class initializer - super().__init__(base_api_url, oauth_client, user_email) - self.base_api_url = base_api_url - - def post_request(self, relative_url, data=None, headers=None, **kwargs): - """Make a POST request, return the HTTP response object""" - # Base URL may include e.g. '/api/'. This is a prefix for all URLs; - # even if they look like an absolute path. - url = urljoin(self.base_api_url, relative_url.lstrip("/")) - _headers = self.default_headers() - if headers: - _headers.update(headers) - return self.session.post( - url, data=json.dumps(data), headers=_headers, **kwargs) - @staticmethod - def _parse_post_response(resp: requests.Response): - - if resp.status_code >= 400: - _post_failed_response(resp) - - if resp.content == b"": - return None - else: - return json.loads(resp.content.decode("utf-8")) - - def post(self, relative_url, params=None, **kwargs): - """Make a POST request, return response content from JSON""" - resp = self.post_request(relative_url, params, **kwargs) - return self._parse_post_response(resp) +class CalCatInjector: + """Helper methods for requests to CalCat""" + def __init__(self, client: CalCatAPIClient): + self.client = client # Posting APIs def set_expected_condition(self, conditions: dict): - return self.post("conditions/set_expected_condition", conditions) + return self.client.post("conditions/set_expected_condition", conditions) def create_calibration_constant(self, calibration_constant: dict): - return self.post("calibration_constants", calibration_constant) + return self.client.post("calibration_constants", calibration_constant) def get_or_create_report(self, report: dict): # Based on create or get API - return self.post("reports/set", report) + return self.client.post("reports/set", report) def create_calibration_constant_version(self, ccv: dict): - return self.post("calibration_constant_versions", ccv) + try: + return self.client.post("calibration_constant_versions", ccv) + except CalCatAPIError as e: + if e.status_code == 422: + raise CCVAlreadyInjectedError + raise @dataclass @@ -113,7 +69,7 @@ def generate_unique_condition_name( detector_type: str, pdu_name: str, pdu_uuid: float, - cond_params: List[dict], + cond_params: dict, ): """Generate a unique condition using UUID and timestamp. @@ -121,7 +77,7 @@ def generate_unique_condition_name( detector_type (str): detector type. pdu_name (str): Physical detector unit db name. pdu_uuid (float): Physical detector unit db id. - cond_params (List[dict]): A list of dictionary with each condition + cond_params (dict): A list of dictionary with each condition e.g. [{ "parameter_name": "Memory Cells", "value": 352.0, @@ -294,7 +250,7 @@ def get_condition_dict( def get_or_create_calibration_constant( - client: InjectAPI, + injector: CalCatInjector, calibration: str, detector_type: str, condition_id: int, @@ -303,8 +259,8 @@ def get_or_create_calibration_constant( cond_id = condition_id # Prepare some parameters to set Calibration Constant. - cal_id = client.calibration_by_name(calibration)['id'] - det_type_id = client.detector_type_by_name(detector_type)['id'] + cal_id = injector.client.calibration_by_name(calibration)['id'] + det_type_id = injector.client.detector_type_by_name(detector_type)['id'] cc_name = create_unique_cc_name(detector_type, calibration, condition_name) calibration_constant = dict( @@ -317,14 +273,19 @@ def get_or_create_calibration_constant( description = "", ) try: - cc_id = client.get_calibration_constant(calibration_constant) - except CalibrationConstantNotFound: - cc_id = client.create_calibration_constant(calibration_constant)['id'] + cc_id = injector.client.get( + f"calibrations/{cal_id}/get_calibration_constant", # noqa + calibration_constant + ) + except CalCatAPIError as e: + if e.status_code != 404: + raise + cc_id = injector.create_calibration_constant(calibration_constant)['id'] return cc_id def create_condition( - client: CalCatAPIClient, + injector: CalCatInjector, detector_type: str, pdu_name: str, pdu_uuid: float, @@ -336,7 +297,7 @@ def create_condition( # Add the missing parameter_id value in `ParameterConditionAttribute`s. for param, cond in cond_params.items(): - cond.parameter_id = client.parameter_by_name(param)['id'] + cond.parameter_id = injector.client.parameter_by_name(param)['id'] cond_params[param] = asdict(cond) # Create condition table in database, if not available. @@ -347,7 +308,7 @@ def create_condition( event_at = str(datetime.today()), # TODO: Why is this needed? it is not written in swagger. description = '', ) - resp = client.set_expected_condition({"condition":cond}) + resp = injector.set_expected_condition({"condition":cond}) return resp['id'], cond_name @@ -387,74 +348,12 @@ def get_ccv_info_from_file( return cond_params, begin_at, pdu_uuid, detector_type, raw_data_location -global_client = None - - -def get_client(): - """Get the global CalCat API client. - - The default assumes we're running in the DESY network; this is used unless - `setup_client()` has been called to specify otherwise. - """ - global global_client - if global_client is None: - setup_client(CALCAT_PROXY_URL, None, None, None) - return global_client - - -def setup_client( - base_url, - client_id, - client_secret, - user_email, - scope="", - session_token=None, - oauth_retries=3, - oauth_timeout=12, - ssl_verify=True, -): - """Configure the global CalCat API client.""" - global global_client - if client_id is not None: - oauth_client = Oauth2ClientBackend( - client_id=client_id, - client_secret=client_secret, - scope=scope, - token_url=f"{base_url}/oauth/token", - session_token=session_token, - max_retries=oauth_retries, - timeout=oauth_timeout, - ssl_verify=ssl_verify, - ) - else: - oauth_client = None - global_client = InjectAPI( - f"{base_url}/api/", - oauth_client=oauth_client, - user_email=user_email, - ) - - # Check we can connect to exflcalproxy - if oauth_client is None and base_url == CALCAT_PROXY_URL: - try: - # timeout=(connect_timeout, read_timeout) - global_client.get_request("me", timeout=(1, 5)) - except requests.ConnectionError as e: - raise RuntimeError( - "Could not connect to calibration catalog proxy. This proxy allows " - "unauthenticated access inside the XFEL/DESY network. To look up " - "calibration constants from outside, you will need to create an Oauth " - "client ID & secret in the CalCat web interface. You will still not " - "be able to load constants without the constant store folder." - ) from e - - def inject_ccv(const_src, ccv_root, report_to=None, client=None): """Inject new CCV into CalCat. Args: - const_path (str or Path): Path to CCV data file. + const_src (str or Path): Path to CCV data file. ccv_root (str): CCV HDF group name. report_to (str): Metadata location. Raises: @@ -462,6 +361,7 @@ def inject_ccv(const_src, ccv_root, report_to=None, client=None): """ if client is None: client = get_client() + injector = CalCatInjector(client) pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') @@ -477,17 +377,17 @@ def inject_ccv(const_src, ccv_root, report_to=None, client=None): try: condition_id, condition_name = create_condition( - client, detector_type, pdu_name, pdu_uuid, cond_params) + injector, detector_type, pdu_name, pdu_uuid, cond_params) # Create Calibration Constant in database, if not available. cc_id = get_or_create_calibration_constant( - client, calibration, detector_type, condition_id, condition_name) + injector, calibration, detector_type, condition_id, condition_name) # Create report in database, if not available report_id = None if report_to: report_path = Path(report_to).absolute().with_suffix('.pdf') - resp = client.get_or_create_report(dict( + resp = injector.get_or_create_report(dict( name=report_path.stem, file_path=str(report_path), flg_available='true', @@ -518,7 +418,7 @@ def inject_ccv(const_src, ccv_root, report_to=None, client=None): flg_good_quality = 'true', description = '', ) - client.create_calibration_constant_version(ccv) + injector.create_calibration_constant_version(ccv) except Exception: const_dest.unlink() # Delete already copied CCV file. raise diff --git a/src/cal_tools/restful_config.py b/src/cal_tools/restful_config.py index 577d13380..0ee008e61 100644 --- a/src/cal_tools/restful_config.py +++ b/src/cal_tools/restful_config.py @@ -47,7 +47,7 @@ def calibration_client(): def extra_calibration_client(inject=False): """Obtain an initialized CalCatAPIClient object.""" - from cal_tools import calcat_interface2, constants + from cal_tools import calcat_interface2 calcat_config = restful_config.get('calcat') user_id = user_secret = None @@ -57,23 +57,19 @@ def extra_calibration_client(inject=False): base_api_url = calcat_config['base-api-url'].rstrip('/') assert base_api_url.endswith('/api') base_url = base_api_url[:-4] - if inject: - constants.setup_client( - base_url, - client_id=user_id, - client_secret=user_secret, - user_email=calcat_config['user-email'], + if inject and '//exflcalproxy' in base_url: + raise ValueError( + "cal_tools would use exflcalproxy to talk to CalCat, but this " + "provides read-only access, and we want to inject constants. " + "You need to configure a connection to CalCat using Oauth." ) - if calcat_config['caldb-root']: - calcat_interface2.set_default_caldb_root(Path(calcat_config['caldb-root'])) - return constants.get_client() - else: - calcat_interface2.setup_client( - base_url, - client_id=user_id, - client_secret=user_secret, - user_email=calcat_config['user-email'], - ) - if calcat_config['caldb-root']: - calcat_interface2.set_default_caldb_root(Path(calcat_config['caldb-root'])) - return calcat_interface2.get_client() + + calcat_interface2.setup_client( + base_url, + client_id=user_id, + client_secret=user_secret, + user_email=calcat_config['user-email'], + ) + if calcat_config['caldb-root']: + calcat_interface2.set_default_caldb_root(Path(calcat_config['caldb-root'])) + return calcat_interface2.get_client() -- GitLab