diff --git a/docs/source/calcat_interface.rst b/docs/source/calcat_interface.rst new file mode 100644 index 0000000000000000000000000000000000000000..60b9f1429e024d86149e8dd68e01f72f44be7d3f --- /dev/null +++ b/docs/source/calcat_interface.rst @@ -0,0 +1,29 @@ +CALCAT Interface +================ + +.. module:: cal_tools.calcat_interface + +.. class:: CalCatError + +.. class:: CalibrationData + + .. attribute:: metadata + + .. attribute:: ndarray + + .. attribute:: ndarray_map + +.. class:: SplitConditionCalibrationData + +.. class:: LPD_CalibrationData + +.. class:: DSSC_CalibrationData + +.. class:: JUNGFRAU_CalibrationData + +.. class:: PNCCD_CalibrationData + +.. class:: EPIX100_CalibrationData + +.. class:: GOTTHARD2_CalibrationData + diff --git a/docs/source/index.rst b/docs/source/index.rst index b57a68dee27860df4384878f2116ba92a7c4740f..d45ed2e41742d1ce243e635fa53508107edb789e 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -57,6 +57,7 @@ Documentation contents: xfel_calibrate_conf cal_tools_algorithms + calcat_interface .. toctree:: :caption: Development diff --git a/src/cal_tools/calcat_interface.py b/src/cal_tools/calcat_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..a7c8cf158007f8a1e097ee928276dfd92bba22e4 --- /dev/null +++ b/src/cal_tools/calcat_interface.py @@ -0,0 +1,1300 @@ +"""Interfaces to calibration constant data.""" +import re +import socket +from datetime import date, datetime, time, timezone +from functools import lru_cache +from os import getenv +from pathlib import Path +from weakref import WeakKeyDictionary + +import h5py +import pasha as psh +from calibration_client import CalibrationClient +from calibration_client.modules import ( + Calibration, + CalibrationConstantVersion, + Detector, + Parameter, + PhysicalDetectorUnit, +) + +from cal_tools.tools import module_index_to_qm + +__all__ = [ + "CalCatError", + "CalibrationData", + "AGIPD_CalibrationData", + "LPD_CalibrationData", + "DSSC_CalibrationData", + "JUNGFRAU_CalibrationData", + "PNCCD_CalibrationData", + "EPIX100_CalibrationData", + "GOTTHARD2_CalibrationData", +] + + +class CCVMetadata(dict): + """Dictionary for CCV metadata. + + Identical to a regular dict, but with a custom pandas-based + string representation to be easier to read. + """ + + def __str__(self): + """Pretty-print CCV metadata using pandas.""" + + import pandas as pd + + res = { + pdu_idx: { + calibration: ccv_data["ccv_name"] + for calibration, ccv_data in pdu_data.items() + } + for pdu_idx, pdu_data in self.items() + } + + return str(pd.DataFrame.from_dict(res, orient="index")) + + +class CalCatError(Exception): + """CalCat API error.""" + + def __init__(self, response): + super().__init__(response["info"]) + + +class ClientWrapper(type): + """Metaclass to wrap each calibration_client exactly once.""" + + _clients = WeakKeyDictionary() + + def __call__(cls, client): + instance = cls._clients.get(client, None) + + if instance is None: + instance = cls._clients[client] = type.__call__(cls, client) + + return instance + + +class CalCatApi(metaclass=ClientWrapper): + """Internal calibration_client wrapper.""" + + get_detector_keys = [ + "id", + "name", + "identifier", + "karabo_name", + "karabo_id_control", + "description", + ] + get_pdu_keys = [ + "id", + "physical_name", + "karabo_da", + "virtual_device_name", + "detector_type_id", + "detector_id", + "description", + ] + + def __init__(self, client): + self.client = client + + @classmethod + def format_time(cls, dt): + """Parse different ways to specify time to CalCat.""" + + if isinstance(dt, datetime): + return dt.astimezone(timezone.utc).isoformat() + elif isinstance(dt, date): + return cls.format_time(datetime.combine(dt, time())) + + return dt + + def format_cond(self, condition): + """Encode operating condition to CalCat API format. + + Args: + caldata (CalibrationData): Calibration data instance used to + interface with database. + + Returns: + (dict) Operating condition for use in CalCat API. + """ + + return { + "parameters_conditions_attributes": [ + {"parameter_id": self.parameter_id(k), "value": str(v)} + for k, v in condition.items() + ] + } + + @lru_cache() + def detector(self, detector_name): + """Detector metadata.""" + + resp_detector = Detector.get_by_identifier(self.client, detector_name) + + if not resp_detector["success"]: + raise CalCatError(resp_detector) + + return {k: resp_detector["data"][k] for k in self.get_detector_keys} + + @lru_cache() + def physical_detector_units( + self, + detector_id, + pdu_snapshot_at, + module_naming="da" + ): + """Physical detector unit metadata.""" + + resp_pdus = PhysicalDetectorUnit.get_all_by_detector( + self.client, detector_id, self.format_time(pdu_snapshot_at) + ) + + if not resp_pdus["success"]: + raise CalCatError(resp_pdus) + + # Create dict based on requested keys: karabo_da, module number, + # or QxMx naming convention. + if module_naming == "da": + return { + pdu["karabo_da"]: {k: pdu[k] for k in self.get_pdu_keys} + for pdu in resp_pdus["data"] + } + elif module_naming == "modno": + return { + int(pdu["karabo_da"][-2:]): { + k: pdu[k] for k in self.get_pdu_keys + } + for pdu in resp_pdus["data"] + } + elif module_naming == "qm": + return { + module_index_to_qm(int(pdu["karabo_da"][-2:])): { + k: pdu[k] for k in self.get_pdu_keys + } + for pdu in resp_pdus["data"] + } + else: + raise ValueError(f"{module_naming} is unknown!") + + + @lru_cache() + def calibration_id(self, calibration_name): + """ID for a calibration in CalCat.""" + + resp_calibration = Calibration.get_by_name( + self.client, calibration_name + ) + + if not resp_calibration["success"]: + raise CalCatError(resp_calibration) + + return resp_calibration["data"]["id"] + + @lru_cache() + def parameter_id(self, param_name): + """ID for an operating condition parameter in CalCat.""" + + resp_parameter = Parameter.get_by_name(self.client, param_name) + + if not resp_parameter["success"]: + raise CalCatError(resp_parameter) + + return resp_parameter["data"]["id"] + + def closest_ccv_by_time_by_condition( + self, + detector_name, + calibrations, + condition, + modules=None, + event_at=None, + pdu_snapshot_at=None, + metadata=None, + module_naming="da", + ): + """Query bulk CCV metadata from CalCat. + + This method uses the /get_closest_version_by_detector API + to query matching CCVs for PDUs connected to a detector instance + in one go. In particular, it automatically includes the PDU as + an operating condition parameter to allow for a single global + condition rather than PDU-specific ones. + + Args: + detector_name (str): Detector instance name. + calibrations (Iterable of str): Calibrations to query + metadata for. + condition (dict): Mapping of parameter name to value. + modules (Collection of int or None): List of module numbers + or None for all (default). + event_at (datetime, date, str or None): Time at which the + CCVs should have been valid or None for now (default). + pdu_snapshot_at (datetime, date, str or None): Time of database + state to look at or None for now (default). + metadata (dict or None): Mapping to fill for results or + None for a new dictionary (default). + module_naming (str or None): Expected module name convention to be + used as metadata dict keys. Expected values are: + `da`: data aggregator name is used. Default. + `modno`: module index is used. Index is chosen based on last 2 + integers in karabo_da. + `qm`: QxMx naming convention is used. Virtual names for + AGIPD, DSSC, and LPD. + + Returns: + (dict) Nested mapping of module number to calibrations to + CCV metadata. Identical to passed metadata argument if + passed. + """ + event_at = self.format_time(event_at) + pdu_snapshot_at = self.format_time(pdu_snapshot_at) + + if metadata is None: + metadata = CCVMetadata() + + if not calibrations: + # Make sure there are at least empty dictionaries for each + # module. + for mod in modules.values(): + metadata.setdefault(mod, dict()) + return metadata + + # Map calibration ID to calibration name. + cal_id_map = { + self.calibration_id(calibration): calibration + for calibration in calibrations + } + calibration_ids = list(cal_id_map.keys()) + + # Map aggregator to the selected module name. + da_to_modname = { + data['karabo_da']: mod_name for mod_name, data in + self.physical_detector_units( + self.detector(detector_name)['id'], + pdu_snapshot_at, + module_naming=module_naming + ).items() + if not modules or mod_name in modules + } + # The API call supports a single module or all modules, as the + # performance increase is only minor in between. Hence, all + # modules are queried if more than one is selected and filtered + # afterwards, if necessary. + karabo_da = next(iter(da_to_modname)) if len(da_to_modname) == 1 else '', + + resp_versions = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions( # noqa + self.client, + detector_name, + calibration_ids, + self.format_cond(condition), + karabo_da=karabo_da, + event_at=event_at, + pdu_snapshot_at=pdu_snapshot_at, + ) + + if not resp_versions["success"]: + raise CalCatError(resp_versions) + + for ccv in resp_versions["data"]: + try: + mod = da_to_modname[ccv['physical_detector_unit']['karabo_da']] + except KeyError: + # Not included in our modules + continue + + cc = ccv["calibration_constant"] + metadata.setdefault(mod, dict())[ + cal_id_map[cc["calibration_id"]] + ] = dict( + cc_id=cc["id"], + cc_name=cc["name"], + condition_id=cc["condition_id"], + ccv_id=ccv["id"], + ccv_name=ccv["name"], + path=Path(ccv["path_to_file"]) / ccv["file_name"], + dataset=ccv["data_set_name"], + begin_validity_at=ccv["begin_validity_at"], + end_validity_at=ccv["end_validity_at"], + raw_data_location=ccv["raw_data_location"], + start_idx=ccv["start_idx"], + end_idx=ccv["end_idx"], + physical_name=ccv["physical_detector_unit"]["physical_name"], + ) + return metadata + + +class CalibrationData: + """Calibration constants data for detectors. + + European XFEL uses a web app and database to store records about the + characterization of detectors and the data necessary to their + correction and analysis, collectively called CalCat. The default + installation is available at https://in.xfel.eu/calibration. + + A detector is identified by a name (e.g. SPB_DET_AGIPD1M-1) and + consists of one or more detector modules. The modules are a virtual + concept and may be identified by their number (e.g. 3), the Karabo + data aggregator in EuXFEL's DAQ system they're connected to + (e.g. AGIPD05) or a virtual device name describing their relative + location (e.g. Q3M2). + + A detector module is mapped to an actual physical detector unit + (PDU), which may be changed in case of a physical replacement. When + characterization data is inserted into the database, it is attached + to the PDU currently mapped to a module and not the virtual module + itself. + + Characterization data is organized by its type just called + calibration (e.g. Offset or SlopesFF) and the operating condition it + was taken in, which is a mapping of parameter keys to their values + (e.g. Sensor bias voltage or integration time). Any unique + combination of calibration (type) and operating condition is a + calibration constant (CC). Any individual measurement of a CC is + called a calibration constant version (CCV). There may be many CCVs + for any given CC. + + Note that while an authenticated connection to CalCat is possible + from anywhere, the actual calibration data referred to is only + available on the European XFEL computing infrastructure. If no + explicit credentials are supplied, an anonymous read-only connection + is established that is also only available from there. + """ + + calibrations = set() + default_client = None + + def __init__( + self, + detector_name, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + """Initialize a new CalibrationData object. + + If no calibration-client object is passed or has been created + using Calibration.new_client, an anonymous read-only connection + is established automatically. + + Args: + detector_name (str): Name of detector in CalCat. + modules (Iterable of int, optional): Module numbers to + query for or None for all available (default). + client (CalibrationClient, optional): Client for CalCat + communication, global one by default. + event_at (datetime, date, str or None): Default time at which the + CCVs should have been valid, now if omitted + module_naming (str or None): Expected module name convention to be + used as metadata dict keys. Expected values are: + `da`: data aggregator name is used. Default. + `modno`: module index is used. Index is chosen based on last 2 + integers in karabo_da. + `qm`: QxMx naming convention is used. Virtual names for + AGIPD, DSSC, and LPD. + **condition_params: Operating condition parameters defined + on an instance level. + """ + + self.detector_name = detector_name + self.modules = modules + self.event_at = event_at + self.pdu_snapshot_at = event_at + self.module_naming = module_naming + + if client is None: + + client = ( + self.__class__.default_client + or self.__class__.new_anonymous_client() + ) + + self._api = CalCatApi(client) + + @staticmethod + def new_anonymous_client(): + """Create an anonymous calibration-client object. + + This connection allows read-only access to CalCat using a + facility-provided OAuth reverse proxy. This is only accessible + on the European XFEL computing infrastructure. + """ + + print( + "Access to CalCat via the XFEL OAuth proxy is currently " + "considered in testing, please report any issues to " + "da-support@xfel.eu" + ) + return CalibrationData.new_client( + None, + None, + None, + use_oauth2=False, + base_url="http://exflcalproxy:8080/", + ) + + @staticmethod + def new_client( + client_id, + client_secret, + user_email, + installation="", + base_url="https://in.xfel.eu/{}calibration", + **kwargs, + ): + """Create a new calibration-client object. + + The client object is saved as a class property and is + automatically to any future CalibrationData objects created, if + no other client is passed explicitly. + + Arguments: + client_id (str): Client ID. + client_secret (str): Client secret. + user_email (str): LDAP user email. + installation (str, optional): Prefix for CalCat + installation, production system by default. + base_url (str, optional): URL template for CalCat + installation, public European XFEL by default. + Any further keyword arguments are passed on to + CalibrationClient.__init__(). + + Returns: + (CalibrationClient) CalCat client. + """ + + base_url = base_url.format(f"{installation}_" if installation else "") + + # Note this is not a classmethod and we're modifying + # CalibrationData directly to use the same object across all + # detector-specific implementations. + CalibrationData.default_client = CalibrationClient( + client_id=client_id, + client_secret=client_secret, + user_email=user_email, + base_api_url=f"{base_url}/api/", + token_url=f"{base_url}/oauth/token", + refresh_url=f"{base_url}/oauth/token", + auth_url=f"{base_url}/oauth/authorize", + scope="", + **kwargs, + ) + return CalibrationData.default_client + + @property + def caldb_root(self): + """Root directory for calibration constant data. + + Returns: + (Path or None) Location of caldb store or + None if not available. + """ + + if not hasattr(CalibrationData, "_caldb_root"): + if getenv("SASE"): + # ONC + CalibrationData._caldb_root = Path("/common/cal/caldb_store") + elif re.match(r"^max-(.+)\.desy\.de$", socket.getfqdn()): + # Maxwell + CalibrationData._caldb_root = Path( + "/gpfs/exfel/d/cal/caldb_store" + ) + else: + # Probably unavailable + CalibrationData._caldb_root = None + + return CalibrationData._caldb_root + + @property + def client(self): + return self._api.client + + @property + def detector(self): + return self._api.detector(self.detector_name) + + @property + def physical_detector_units(self): + return self._api.physical_detector_units( + self.detector["id"], self.pdu_snapshot_at, self.module_naming + ) + + @property + def condition(self): + return self._build_condition(self.parameters) + + def replace(self, **new_kwargs): + """Create a new CalibrationData object with altered values.""" + + keys = { + "detector_name", + "modules", + "client", + "event_at", + "pdu_snapshot_at", + } | {self._simplify_parameter_name(name) for name in self.parameters} + + kwargs = {key: getattr(self, key) for key in keys} + kwargs.update(new_kwargs) + + return self.__class__(**kwargs) + + def metadata( + self, + calibrations=None, + event_at=None, + pdu_snapshot_at=None, + ): + """Query CCV metadata for calibrations, conditions and time. + + Args: + calibrations (Iterable of str, optional): Calibrations to + query metadata for, may be None to retrieve all. + event_at (datetime, date, str or None): Time at which the + CCVs should have been valid, now or default value passed at + initialization time if omitted. + pdu_snapshot_at (datetime, date, str or None): Time of database + state to look at, now or default value passed at + initialization time if omitted. + + Returns: + (CCVMetadata) CCV metadata result. + """ + + metadata = CCVMetadata() + self._api.closest_ccv_by_time_by_condition( + self.detector_name, + calibrations or self.calibrations, + self.condition, + self.modules, + event_at or self.event_at, + pdu_snapshot_at or self.pdu_snapshot_at, + metadata, + module_naming=self.module_naming, + ) + return metadata + + def ndarray( + self, + module, + calibration, + metadata=None, + ): + """Load CCV data as ndarray. + + Args: + module (int): Module number + calibration (str): Calibration constant. + metadata (CCVMetadata, optional): CCV metadata to load + constant data for, may be None to query metadata. + + Returns: + (ndarray): CCV data + """ + import numpy as np + + if self.caldb_root is None: + raise RuntimeError("calibration database store unavailable") + + if self.modules and module not in self.modules: + raise ValueError("module not part of this calibration data") + + if metadata is None: + metadata = self.metadata([calibration]) + + row = metadata[module][calibration] + + with h5py.File(self.caldb_root / row['path'], 'r') as f: + return np.asarray(f[row['dataset'] + '/data']) + + def _allocate_constant_arrays(self, metadata, const_load_mp, const_data): + + for mod, ccv_entry in metadata.items(): + const_data[mod] = {} + for cname, mdata in ccv_entry.items(): + dataset = mdata["dataset"] + with h5py.File(self.caldb_root / mdata["path"], "r") as cf: + shape = cf[f"{dataset}/data"].shape + dtype = cf[f"{dataset}/data"].dtype + + const_data[mod][cname] = const_load_mp.alloc( + shape=shape, dtype=dtype + ) + + def load_constants_from_metadata(self, metadata): + """Load the data for all constants in metadata object. + + Args: + metadata (CCVMetadata, optional): CCV metadata to load + constant data for, may be None to query metadata. + Returns: + (Dict): A dictionary of constant data. + {module: {calibration: ndarray}}. + """ + def _load_constant_dataset(wid, index, mod): + """Load constant dataset from the CCVMetadata `metadata` into + a shared allocated array. + + Args: + mod (str): module key in `metadata` object + """ + for cname, mdata in metadata[mod].items(): + with h5py.File(self.caldb_root / mdata["path"], "r") as cf: + cf[f"{mdata['dataset']}/data"].read_direct( + const_data[mod][cname] + ) + + const_data = dict() + const_load_mp = psh.ProcessContext(num_workers=24) + self._allocate_constant_arrays(metadata, const_load_mp, const_data) + const_load_mp.map(_load_constant_dataset, list(metadata.keys())) + + return const_data + + def ndarray_map( + self, + calibrations=None, + metadata=None, + ): + """Load all CCV data in a nested map of ndarrays. + + Args: + calibrations (Iterable of str, optional): Calibration constants + or None for all available (default). + metadata (CCVMetadata, optional): CCV metadata to load constant + for or None to query metadata automatically (default). + Returns: + (dict of dict of ndarray): CCV data by module number and + calibration constant name. + {module: {calibration: ndarray}} + """ + if self.caldb_root is None: + raise RuntimeError("calibration database store unavailable") + + if metadata is None: + metadata = self.metadata(calibrations) + + return self.load_constants_from_metadata(metadata) + + def _build_condition(self, parameters): + cond = dict() + + for db_name in parameters: + value = getattr(self, self._simplify_parameter_name(db_name), None) + + if value is not None: + cond[db_name] = value + + return cond + + @classmethod + def _from_multimod_detector_data( + cls, + component_cls, + data, + detector, + modules, + client, + ): + if isinstance(detector, component_cls): + detector_name = detector.detector_name + elif detector is None: + detector_name = component_cls._find_detector_name(data) + elif isinstance(detector, str): + detector_name = detector + else: + raise ValueError( + f"detector may be an object of type " + f"{type(cls)}, a string or None" + ) + + source_to_modno = dict( + component_cls._source_matches(data, detector_name) + ) + detector_sources = [data[source] for source in source_to_modno.keys()] + + if modules is None: + modules = sorted(source_to_modno.values()) + + creation_date = cls._determine_data_creation_date(data) + + # Create new CalibrationData object. + caldata = cls( + detector_name, + modules, + client, + creation_date, + creation_date, + ) + + caldata.memory_cells = component_cls._get_memory_cell_count( + detector_sources[0] + ) + caldata.pixels_x = component_cls.module_shape[1] + caldata.pixels_y = component_cls.module_shape[0] + + return caldata, detector_sources + + @staticmethod + def _simplify_parameter_name(name): + """Convert parameter names to valid Python symbols.""" + + return name.lower().replace(" ", "_") + + @staticmethod + def _determine_data_creation_date(data): + """Determine data creation date.""" + + assert data.files, "data contains no files" + + try: + creation_date = data.files[0].metadata()["creationDate"] + except KeyError: + from warnings import warning + + warning( + "Last file modification time used as creation date for old " + "DAQ file format may be unreliable" + ) + + return datetime.fromtimestamp( + Path(data.files[0].filename).lstat().st_mtime + ) + else: + if not data.is_single_run: + from warnings import warning + + warning( + "Sample file used to determine creation date for multi " + "run data" + ) + + return creation_date + + +class SplitConditionCalibrationData(CalibrationData): + """Calibration data with dark and illuminated conditions. + + Some detectors of this kind distinguish between two different + operating conditions depending on whether photons illuminate the + detector or not, correspondingly called the illuminated and dark + conditions. Typically the illuminated condition is a superset of the + dark condition. + + Not all implementations for semiconductor detectors inherit from + this type, but only those that make this distinction such as AGIPD + and LPD. + """ + + dark_calibrations = set() + illuminated_calibrations = set() + dark_parameters = list() + illuminated_parameters = list() + + @property + def calibrations(self): + """Compatibility with CalibrationData.""" + + return self.dark_calibrations | self.illuminated_calibrations + + @property + def parameters(self): + """Compatibility with CalibrationData.""" + + # Removes likely duplicates while preserving order. + return list( + dict.fromkeys(self.dark_parameters + self.illuminated_parameters) + ) + + @property + def condition(self): + """Compatibility with CalibrationData.""" + + cond = dict() + cond.update(self.dark_condition) + cond.update(self.illuminated_condition) + + return cond + + @property + def dark_condition(self): + return self._build_condition(self.dark_parameters) + + @property + def illuminated_condition(self): + return self._build_condition(self.illuminated_parameters) + + def metadata( + self, + calibrations=None, + event_at=None, + pdu_snapshot_at=None, + ): + """Query CCV metadata for calibrations, conditions and time. + + Args: + calibrations (Iterable of str, optional): Calibrations to + query metadata for, may be None to retrieve all. + event_at (datetime, date, str or None): Time at which the + CCVs should have been valid, now or default value passed at + initialization time if omitted. + pdu_snapshot_at (datetime, date, str or None): Time of database + state to look at, now or default value passed at + initialization time if omitted. + + Returns: + (CCVMetadata) CCV metadata result. + """ + + if calibrations is None: + calibrations = ( + self.dark_calibrations | self.illuminated_calibrations + ) + + metadata = CCVMetadata() + + # Calibrations are sorted to ensure using exactly the same query + # for multiple configuration. e.g. This is essential for calparrot. + dark_calibrations = sorted( + self.dark_calibrations & set(calibrations)) + if dark_calibrations: + self._api.closest_ccv_by_time_by_condition( + self.detector_name, + dark_calibrations, + self.dark_condition, + self.modules, + event_at or self.event_at, + pdu_snapshot_at or self.pdu_snapshot_at, + metadata, + module_naming=self.module_naming, + ) + + illum_calibrations = sorted( + self.illuminated_calibrations & set(calibrations)) + if illum_calibrations: + self._api.closest_ccv_by_time_by_condition( + self.detector_name, + illum_calibrations, + self.illuminated_condition, + self.modules, + event_at or self.event_at, + pdu_snapshot_at or self.pdu_snapshot_at, + metadata, + module_naming=self.module_naming, + ) + + return metadata + + +class AGIPD_CalibrationData(SplitConditionCalibrationData): + """Calibration data for the AGIPD detector.""" + + dark_calibrations = { + "Offset", + "Noise", + "ThresholdsDark", + "BadPixelsDark", + "BadPixelsPC", + "SlopesPC", + } + illuminated_calibrations = { + "BadPixelsFF", + "SlopesFF", + } + + dark_parameters = [ + "Sensor Bias Voltage", + "Pixels X", + "Pixels Y", + "Memory cells", + "Acquisition rate", + "Gain setting", + "Gain mode", + "Integration time", + ] + illuminated_parameters = dark_parameters + ["Source energy"] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + memory_cells, + acquisition_rate, + modules=None, + client=None, + event_at=None, + gain_setting=None, + gain_mode=None, + module_naming="da", + integration_time=12, + source_energy=9.2, + pixels_x=512, + pixels_y=128, + ): + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.acquisition_rate = acquisition_rate + self.gain_setting = gain_setting + self.gain_mode = gain_mode + self.integration_time = integration_time + self.source_energy = source_energy + + def _build_condition(self, parameters): + cond = super()._build_condition(parameters) + + # Fix-up some database quirks. + if int(cond.get("Gain mode", -1)) == 0: + del cond["Gain mode"] + + if int(cond.get("Integration time", -1)) == 12: + del cond["Integration time"] + + return cond + + +class LPD_CalibrationData(SplitConditionCalibrationData): + """Calibration data for the LPD detector.""" + + dark_calibrations = { + "Offset", + "Noise", + "BadPixelsDark", + } + illuminated_calibrations = { + "RelativeGain", + "GainAmpMap", + "FFMap", + "BadPixelsFF", + } + + dark_parameters = [ + "Sensor Bias Voltage", + "Memory cells", + "Pixels X", + "Pixels Y", + "Feedback capacitor", + ] + illuminated_parameters = dark_parameters + ["Source Energy", "category"] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + memory_cells, + feedback_capacitor=5.0, + pixels_x=256, + pixels_y=256, + source_energy=9.2, + category=1, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.feedback_capacitor = feedback_capacitor + self.source_energy = source_energy + self.category = category + + +class DSSC_CalibrationData(CalibrationData): + """Calibration data for the DSSC detetor.""" + + calibrations = { + "Offset", + "Noise", + } + parameters = [ + "Sensor Bias Voltage", + "Memory cells", + "Pixels X", + "Pixels Y", + "Pulse id checksum", + "Acquisition rate", + "Target gain", + "Encoded gain", + ] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + memory_cells, + pulse_id_checksum=None, + acquisition_rate=None, + target_gain=None, + encoded_gain=None, + pixels_x=512, + pixels_y=128, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.pulse_id_checksum = pulse_id_checksum + self.acquisition_rate = acquisition_rate + self.target_gain = target_gain + self.encoded_gain = encoded_gain + + +class JUNGFRAU_CalibrationData(CalibrationData): + """Calibration data for the JUNGFRAU detector.""" + + calibrations = { + "Offset10Hz", + "Noise10Hz", + "BadPixelsDark10Hz", + "RelativeGain10Hz", + "BadPixelsFF10Hz", + } + parameters = [ + "Sensor Bias Voltage", + "Memory Cells", + "Pixels X", + "Pixels Y", + "Integration Time", + "Sensor temperature", + "Gain Setting", + ] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + memory_cells, + integration_time, + gain_setting, + gain_mode=None, + sensor_temperature=291, + pixels_x=1024, + pixels_y=512, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.integration_time = integration_time + self.sensor_temperature = sensor_temperature + self.gain_setting = gain_setting + self.gain_mode = gain_mode + + def _build_condition(self, parameters): + cond = super()._build_condition(parameters) + + # Fix-up some database quirks. + if int(cond.get("Gain mode", -1)) == 0: + del cond["Gain mode"] + + return cond + + +class PNCCD_CalibrationData(SplitConditionCalibrationData): + """Calibration data for the pnCCD detector.""" + + dark_calibrations = { + "OffsetCCD", + "BadPixelsDarkCCD", + "NoiseCCD", + } + illuminated_calibrations = { + "RelativeGainCCD", + "CTECCD", + } + + dark_parameters = [ + "Sensor Bias Voltage", + "Memory cells", + "Pixels X", + "Pixels Y", + "Integration Time", + "Sensor Temperature", + "Gain Setting", + ] + + illuminated_parameters = dark_parameters + ["Source energy"] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + integration_time, + sensor_temperature, + gain_setting, + source_energy=9.2, + pixels_x=1024, + pixels_y=1024, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + # Ignore modules for this detector. + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = 1 # Ignore memory_cells for this detector + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.integration_time = integration_time + self.sensor_temperature = sensor_temperature + self.gain_setting = gain_setting + self.source_energy = source_energy + +class EPIX100_CalibrationData(SplitConditionCalibrationData): + """Calibration data for the ePix100 detector.""" + + dark_calibrations = { + "OffsetEPix100", + "NoiseEPix100", + "BadPixelsDarkEPix100", + } + illuminated_calibrations = { + "RelativeGainEPix100", + # 'BadPixelsFFEPix100', + } + dark_parameters = [ + "Sensor Bias Voltage", + "Memory cells", + "Pixels X", + "Pixels Y", + "Integration time", + "Sensor temperature", + "In vacuum", + ] + illuminated_parameters = dark_parameters + ["Source energy"] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + integration_time, + in_vacuum=0, + sensor_temperature=288, + pixels_x=708, + pixels_y=768, + source_energy=9.2, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + # Ignore modules for this detector. + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.integration_time = integration_time + self.memory_cells = 1 # Ignore memory_cells for this detector + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.in_vacuum = in_vacuum + self.sensor_temperature = sensor_temperature + self.source_energy = source_energy + + +class GOTTHARD2_CalibrationData(CalibrationData): + """Calibration data for the Gotthard II detector.""" + + calibrations = { + "LUTGotthard2" "OffsetGotthard2", + "NoiseGotthard2", + "BadPixelsDarkGotthard2", + "RelativeGainGotthard2", + "BadPixelsFFGotthard2", + } + parameters = [ + "Sensor Bias Voltage", + "Exposure time", + "Exposure period", + "Acquisition rate", + "Single photon", + ] + + def __init__( + self, + detector_name, + sensor_bias_voltage, + exposure_time, + exposure_period, + acquisition_rate, + single_photon, + modules=None, + client=None, + event_at=None, + module_naming="da", + ): + # Ignore modules for this detector. + super().__init__( + detector_name, + modules, + client, + event_at, + module_naming, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.exposure_time = exposure_time + self.exposure_period = exposure_period + self.acquisition_rate = acquisition_rate + self.single_photon = single_photon diff --git a/src/cal_tools/restful_config.py b/src/cal_tools/restful_config.py index 7a794408066fe6df43693ff1e0a0937bcf803598..618140c0b372cfd344ca1341697a8ec50eba63b3 100644 --- a/src/cal_tools/restful_config.py +++ b/src/cal_tools/restful_config.py @@ -13,3 +13,20 @@ restful_config = Dynaconf( ], merge_enabled=True, ) + + +def calibration_client(): + from calibration_client import CalibrationClient + + # Create client for CalCat. + calcat_config = restful_config.get('calcat') + return CalibrationClient( + base_api_url=calcat_config['base-api-url'], + use_oauth2=calcat_config['use-oauth2'], + client_id=calcat_config['user-id'], + client_secret=calcat_config['user-secret'], + user_email=calcat_config['user-email'], + token_url=calcat_config['token-url'], + refresh_url=calcat_config['refresh-url'], + auth_url=calcat_config['auth-url'], + scope='') diff --git a/tests/test_calcat_interface.py b/tests/test_calcat_interface.py new file mode 100644 index 0000000000000000000000000000000000000000..b838a07fc16a3a032990affa49781c1d05f76c57 --- /dev/null +++ b/tests/test_calcat_interface.py @@ -0,0 +1,150 @@ +""" +TEST CALCAT INTERFACE: + +All the tests run with client = None +""" + +import numpy as np +import pytest + +from cal_tools.calcat_interface import ( + AGIPD_CalibrationData, + DSSC_CalibrationData, + JUNGFRAU_CalibrationData, + LPD_CalibrationData, +) + + +@pytest.mark.requires_gpfs +@pytest.mark.parametrize( + "mod,mod_naming", + [ + ("AGIPD00", "da"), + ("Q1M1", "qm"), + (0, "modno"), + + ] +) +def test_AGIPD_CalibrationData_metadata(mod, mod_naming): + """Test CalibrationData through AGIPD_CalibrationData + and test metadata() method. + + Args: + mod (str): Module name + mod_naming (str): Selected module naming + """ + agipd_md = AGIPD_CalibrationData( + detector_name="SPB_DET_AGIPD1M-1", + sensor_bias_voltage=300, + memory_cells=352, + acquisition_rate=1.1, + integration_time=12, + source_energy=9.2, + gain_mode=0, + gain_setting=0, + event_at="2020-01-07 13:26:48.00", + modules=[mod], + client=None, + module_naming=mod_naming + ).metadata() + assert isinstance(agipd_md, dict) + # Check correct module naming (da, qm, modno) + assert list(agipd_md.keys())[0] == mod + # closest_ccv_by_time_by_condition + assert [ + "begin_validity_at", + "cc_id", "cc_name", + "ccv_id", "ccv_name", + "condition_id", "dataset", + "end_idx", "end_validity_at", + "path", "physical_name", + "raw_data_location", "start_idx", + ] == sorted(list(agipd_md[mod]["Offset"].keys())) + + +@pytest.mark.parametrize( + "mod,mod_naming", + [ + ("DSSC00", "da"), + ("Q1M1", "qm"), + (0, "modno"), + + ] +) +def test_DSSC_physical_detector_units(mod, mod_naming): + """Test physical_detector_units property and DSSC_CalibrationData. + + Args: + mod (str): Module name + mod_naming (str): Selected module naming + """ + dssc_cal = DSSC_CalibrationData( + detector_name="SCS_DET_DSSC1M-1", + sensor_bias_voltage=300, + memory_cells=400, + pulse_id_checksum=None, + acquisition_rate=None, + target_gain=None, + encoded_gain=None, + event_at="2020-01-07 13:26:48.00", + modules=[mod], + module_naming=mod_naming + ) + pdus = dssc_cal.physical_detector_units + assert isinstance(pdus, dict) + assert len(pdus) == 16 + assert list(pdus.keys())[0] == mod + + +@pytest.mark.requires_gpfs +@pytest.mark.parametrize( + "mod,mod_naming", + [ + ("JNGFR01", "da"), + (1, "modno"), + + ] +) +def test_JUNGFRAU_CalibrationData_ndarray(mod, mod_naming): + """Test ndarray() through JUNGFRAU_CalibrationData. + + Args: + mod (str): Module name + mod_naming (str): Selected module naming + """ + + jf_cal = JUNGFRAU_CalibrationData( + detector_name="SPB_IRDA_JF4M", + sensor_bias_voltage=180, + memory_cells=1, + integration_time=10, + gain_setting=0, + gain_mode=0, + modules=[mod], + module_naming=mod_naming, + event_at="2022-07-11 13:26:48.00", + ) + assert isinstance(jf_cal.ndarray(mod, "Offset10Hz"), np.ndarray) + + +@pytest.mark.requires_gpfs +def test_LPD_CalibrationData_ndarray(): + """Test ndarray_map() through LPD_CalibrationData. + + Args: + mod (str): Module name + mod_naming (str): Selected module naming + """ + + lpd_cal = LPD_CalibrationData( + detector_name="FXE_DET_LPD1M-1", + sensor_bias_voltage=250, + memory_cells=512, + feedback_capacitor=5.0, + event_at="2022-07-11 13:26:48.00", + ) + const_map = lpd_cal.ndarray_map() + assert isinstance(const_map, dict) + assert "LPD00" in const_map.keys() + assert "Offset" in const_map["LPD00"].keys() + assert isinstance(const_map["LPD00"]["Offset"], np.ndarray)