From 8b35dd706534d1e7215d5890f30380f95ed1379e Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Tue, 1 Aug 2023 16:25:59 +0100 Subject: [PATCH] Working on revise CalCat API --- src/cal_tools/calcat_interface.py | 49 ++++- src/cal_tools/calcat_interface2.py | 320 +++++++++++++++++++++++++++++ tests/test_calcat_interface2.py | 39 ++++ 3 files changed, 400 insertions(+), 8 deletions(-) create mode 100644 src/cal_tools/calcat_interface2.py create mode 100644 tests/test_calcat_interface2.py diff --git a/src/cal_tools/calcat_interface.py b/src/cal_tools/calcat_interface.py index 03f3d1437..2d17b22f9 100644 --- a/src/cal_tools/calcat_interface.py +++ b/src/cal_tools/calcat_interface.py @@ -2,6 +2,7 @@ from datetime import date, datetime, time, timezone from functools import lru_cache from pathlib import Path +from typing import Optional, Sequence from weakref import WeakKeyDictionary import h5py @@ -113,8 +114,7 @@ class CalCatApi(metaclass=ClientWrapper): """Encode operating condition to CalCat API format. Args: - caldata (CalibrationData): Calibration data instance used to - interface with database. + condition (dict): Mapping of parameter DB name to value Returns: (dict) Operating condition for use in CalCat API. @@ -192,6 +192,19 @@ class CalCatApi(metaclass=ClientWrapper): return resp_calibration["data"]["id"] + @lru_cache() + def calibration_name(self, calibration_id): + """Name for a calibration in CalCat.""" + + resp_calibration = Calibration.get_by_id( + self.client, calibration_id + ) + + if not resp_calibration["success"]: + raise CalCatError(resp_calibration) + + return resp_calibration["data"]["name"] + @lru_cache() def parameter_id(self, param_name): """ID for an operating condition parameter in CalCat.""" @@ -203,6 +216,30 @@ class CalCatApi(metaclass=ClientWrapper): return resp_parameter["data"]["id"] + def _closest_ccv_by_time_by_condition( + self, + detector_name: str, + calibration_ids: Sequence[int], + condition: dict, + karabo_da: Optional[str]=None, + event_at=None, + pdu_snapshot_at=None, + ): + resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions( + self.client, + detector_name, + calibration_ids, + self.format_cond(condition), + karabo_da=karabo_da or "", + event_at=self.format_time(event_at), + pdu_snapshot_at=self.format_time(pdu_snapshot_at), + ) + + if not resp["success"]: + raise CalCatError(resp) + + return resp["data"] + def closest_ccv_by_time_by_condition( self, detector_name, @@ -284,8 +321,7 @@ class CalCatApi(metaclass=ClientWrapper): # afterwards, if necessary. karabo_da = next(iter(da_to_modname)) if len(da_to_modname) == 1 else '', - resp_versions = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions( # noqa - self.client, + resp_data = self._closest_ccv_by_time_by_condition( detector_name, calibration_ids, self.format_cond(condition), @@ -294,10 +330,7 @@ class CalCatApi(metaclass=ClientWrapper): pdu_snapshot_at=pdu_snapshot_at, ) - if not resp_versions["success"]: - raise CalCatError(resp_versions) - - for ccv in resp_versions["data"]: + for ccv in resp_data: try: mod = da_to_modname[ccv['physical_detector_unit']['karabo_da']] except KeyError: diff --git a/src/cal_tools/calcat_interface2.py b/src/cal_tools/calcat_interface2.py new file mode 100644 index 000000000..23d3b21f1 --- /dev/null +++ b/src/cal_tools/calcat_interface2.py @@ -0,0 +1,320 @@ +from collections.abc import Mapping +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Dict, Optional, Sequence, Union + +from calibration_client import CalibrationClient +from calibration_client.modules import CalibrationConstantVersion + +from .calcat_interface import CalCatApi, CalCatError +from .tools import module_index_to_qm + +global_client = None + + +def get_client(): + global global_client + if global_client is None: + setup_client("http://exflcalproxy:8080/", None, None, None) + return global_client + + +def setup_client(base_url, client_id, client_secret, user_email, **kwargs): + global global_client + global_client = CalibrationClient( + use_oauth2=(client_id is None), + client_id=client_id, + client_secret=client_secret, + user_email=user_email, + base_api_url=f"{base_url}/api/", + token_url=f"{base_url}/oauth/token", + refresh_url=f"{base_url}/oauth/token", + auth_url=f"{base_url}/oauth/authorize", + scope="", + **kwargs, + ) + + +@dataclass +class SingleConstantVersion: + """A Calibration Constant Version for 1 detector module""" + + id: int + version_name: str + constant_id: int + constant_name: str + condition_id: int + path: Path + dataset: str + begin_validity_at: datetime + end_validity_at: datetime + raw_data_location: str + physical_name: str # PDU name + + @classmethod + def from_response(cls, ccv: dict) -> "SingleConstantVersion": + const = ccv["calibration_constant"] + return cls( + id=ccv["id"], + version_name=ccv["name"], + constant_id=const["id"], + constant_name=const["name"], + condition_id=const["condition_id"], + path=Path(ccv["path_to_file"]) / ccv["file_name"], + dataset=ccv["data_set_name"], + begin_validity_at=ccv["begin_validity_at"], + end_validity_at=ccv["end_validity_at"], + raw_data_location=ccv["raw_data_location"], + physical_name=ccv["physical_detector_unit"]["physical_name"], + ) + + +@dataclass +class ModulesConstantVersions: + """A group of similar CCVs for several modules of one detector""" + + constants: Dict[str, SingleConstantVersion] # Keys e.g. 'LPD00' + + def select_modules(self, *aggregators) -> "ModulesConstantVersions": + d = {aggr: scv for (aggr, scv) in self.constants if aggr in aggregators} + return ModulesConstantVersions(d) + + @property + def aggregators(self): + return list(self.constants) + + @property + def module_nums(self): + return [int(da[-2:]) for da in self.constants] + + @property + def qm_names(self): + return [module_index_to_qm(n) for n in self.module_nums] + + +class CalibrationData(Mapping): + """Collected constants for a given detector""" + + def __init__(self, constant_groups: Dict[str, Dict[str, SingleConstantVersion]]): + self.constant_groups = { + const_type: ModulesConstantVersions(d) for const_type, d in constant_groups + } + + @classmethod + def from_condition( + cls, + condition: "ConditionsBase", + detector_name, + modules: Optional[Sequence[str]]=None, + calibrations=None, + client=None, + event_at=None, + pdu_snapshot_at=None, + ): + if calibrations is None: + calibrations = set(condition.calibration_types) + + cal_types_by_params_used = {} + for cal_type, params in condition.calibration_types.items(): + if cal_type in calibrations: + cal_types_by_params_used.setdefault(params, []).append(cal_type) + + api = CalCatApi(client or get_client()) + + d = {} + + for params, cal_types in cal_types_by_params_used.items(): + condition_dict = condition.make_dict(params) + + cal_id_map = { + api.calibration_id(calibration): calibration + for calibration in calibrations + } + calibration_ids = list(cal_id_map.keys()) + + query_res = api._closest_ccv_by_time_by_condition( + detector_name, + calibration_ids, + condition_dict, + modules[0] if len(modules) == 1 else '', + event_at, + pdu_snapshot_at or event_at, + ) + + for ccv in query_res: + aggr = ccv["physical_detector_unit"]["karabo_da"] + cal_type = cal_id_map[ccv["calibration_constant"]["calibration_id"]] + + d.setdefault(cal_type, {})[aggr] = SingleConstantVersion.from_response( + ccv + ) + + res = cls(d) + if modules: + res = res.select_modules(modules) + return res + + @classmethod + def from_report( + cls, + report_id_or_path: Union[int, str], + client=None, + ): + client = client or get_client() + api = CalCatApi(client) + + if isinstance(report_id_or_path, int): + resp = CalibrationConstantVersion.get_by_report_id( + client, report_id_or_path + ) + else: + resp = CalibrationConstantVersion.get_by_report_path( + client, report_id_or_path + ) + + if not resp["success"]: + raise CalCatError(resp) + + d = {} + + for ccv in resp["data"]: + aggr = ccv["physical_detector_unit"]["karabo_da"] + cal_type = api.calibration_name( + ccv["calibration_constant"]["calibration_id"] + ) + d.setdefault(cal_type, {})[aggr] = SingleConstantVersion.from_response(ccv) + + return cls(d) + + def __getitem__(self, key): + return self.constant_groups + + def __iter__(self): + return iter(self.constant_groups) + + def __len__(self): + return len(self.constant_groups) + + def select_calibrations(self, *calibrations, require_all=True): + if require_all: + missing = set(calibrations) - set(self.constant_groups) + if missing: + raise KeyError(f"Missing calibrations: {', '.join(sorted(missing))}") + + d = { + cal_type: mcv.constants + for (cal_type, mcv) in self.constant_groups.items() + if cal_type in calibrations + } + # TODO: missing for some modules? + return type(self)(d) + + def select_modules(self, *aggregators): + return type(self)( + { + cal_type: mcv.select_modules(*aggregators).constants + for (cal_type, mcv) in self.constant_groups.items() + } + ) + + def merge(self, other: "CalibrationData") -> "CalibrationData": + d = self.constant_groups.copy() + d.update(other.constant_groups) + return type(self)(d) + + +class ConditionsBase: + calibration_types = {} # For subclasses: {calibration: [parameter names]} + + def make_dict(self, parameters) -> dict: + d = dict() + + for db_name in parameters: + value = getattr(self, db_name.lower().replace(" ", "_")) + if value is not None: + d[db_name] = value + + return d + + +@dataclass +class AGIPDConditions(ConditionsBase): + sensor_bias_voltage: float + memory_cells: int + acquisition_rate: float + gain_setting: Optional[int] + gain_mode: Optional[int] + source_energy: float + integration_time: int = 12 + pixels_x: int = 512 + pixels_y: int = 128 + + _gain_parameters = [ + "Sensor Bias Voltage", + "Pixels X", + "Pixels Y", + "Memory cells", + "Acquisition rate", + "Gain setting", + "Integration time", + ] + _other_dark_parameters = _gain_parameters + ["Gain mode"] + _illuminated_parameters = _gain_parameters + ["Source energy"] + + calibration_types = { + "Offset": _other_dark_parameters, + "Noise": _other_dark_parameters, + "ThresholdsDark": _other_dark_parameters, + "BadPixelsDark": _other_dark_parameters, + "BadPixelsPC": _gain_parameters, + "SlopesPC": _gain_parameters, + "BadPixelsFF": _illuminated_parameters, + "SlopesFF": _illuminated_parameters, + } + + def make_dict(self, parameters): + cond = super().make_dict(parameters) + + # Fix-up some database quirks. + if int(cond.get("Gain mode", -1)) == 0: + del cond["Gain mode"] + + if int(cond.get("Integration time", -1)) == 12: + del cond["Integration time"] + + return cond + + +@dataclass +class LPDConditions(ConditionsBase): + sensor_bias_voltage: float + memory_cells: int + memory_cell_order: Optional[str] = None + feedback_capacitor: float = 5.0 + source_energy: float = 9.2 + category: int = 1 + pixels_x: int = 256 + pixels_y: int = 256 + + _base_params = [ + "Sensor Bias Voltage", + "Memory cells", + "Pixels X", + "Pixels Y", + "Feedback capacitor", + ] + _dark_parameters = _base_params + [ + "Memory cell order", + ] + _illuminated_parameters = _base_params + ["Source Energy", "category"] + + calibration_types = { + "Offset": _dark_parameters, + "Noise": _dark_parameters, + "BadPixelsDark": _dark_parameters, + "RelativeGain": _illuminated_parameters, + "GainAmpMap": _illuminated_parameters, + "FFMap": _illuminated_parameters, + "BadPixelsFF": _illuminated_parameters, + } diff --git a/tests/test_calcat_interface2.py b/tests/test_calcat_interface2.py new file mode 100644 index 000000000..f3507d0e7 --- /dev/null +++ b/tests/test_calcat_interface2.py @@ -0,0 +1,39 @@ +import pytest + +from cal_tools.calcat_interface2 import ( + CalibrationData, AGIPDConditions, SingleConstantVersion, +) + +@pytest.mark.requires_gpfs +def test_AGIPD_CalibrationData_metadata(): + """Test CalibrationData with AGIPD condition + """ + cond = AGIPDConditions( + sensor_bias_voltage=300, + memory_cells=352, + acquisition_rate=1.1, + gain_mode=0, + gain_setting=0, + integration_time=12, + source_energy=9.2, + ) + agipd_cd = CalibrationData.from_condition( + cond, + "SPB_DET_AGIPD1M-1", + event_at="2020-01-07 13:26:48.00", + modules=['AGIPD00'], + ) + assert 'Offset' in agipd_cd + assert set(agipd_cd['Offset'].constants) == {'AGIPD00'} + assert isinstance(agipd_cd['Offset'].constants['AGIPD00'], SingleConstantVersion) + + +@pytest.mark.requires_gpfs +def test_AGIPD_CalibrationData_report(): + """Test CalibrationData with AGIPD condition + """ + # Report ID: https://in.xfel.eu/calibration/reports/3757 + agipd_cd = CalibrationData.from_report(3757) + assert set(agipd_cd) == {'Offset', 'Noise', 'ThresholdsDark', 'BadPixelsDark'} + assert agipd_cd['Offset'].aggregators == [f'AGIPD{n:02}' for n in range(16)] + assert isinstance(agipd_cd['Offset'].constants['AGIPD00'], SingleConstantVersion) -- GitLab