From 19033cb05428af266120c427bcb9154b88f7f514 Mon Sep 17 00:00:00 2001 From: ahmedk <karim.ahmed@xfel.eu> Date: Wed, 12 Oct 2022 16:21:55 +0200 Subject: [PATCH] Draft: new calcat interface and corresponding updates in the correction notebook --- .../ePix100/Correction_ePix100_NBC.ipynb | 108 +- ...100_retrieve_constants_precorrection.ipynb | 71 +- src/cal_tools/calibration.py | 1027 +++++++++++++++++ src/cal_tools/tools.py | 35 + 4 files changed, 1117 insertions(+), 124 deletions(-) create mode 100644 src/cal_tools/calibration.py diff --git a/notebooks/ePix100/Correction_ePix100_NBC.ipynb b/notebooks/ePix100/Correction_ePix100_NBC.ipynb index 03b9bbded..ca167a235 100644 --- a/notebooks/ePix100/Correction_ePix100_NBC.ipynb +++ b/notebooks/ePix100/Correction_ePix100_NBC.ipynb @@ -98,18 +98,13 @@ "from XFELDetAna import xfelpycaltools as xcal\n", "from cal_tools import h5_copy_except\n", "from cal_tools.epix100 import epix100lib\n", + "from cal_tools.calibration import CalCatError, EPIX100_CalibrationData\n", "from cal_tools.tools import (\n", " calcat_creation_time,\n", - " get_dir_creation_date,\n", - " get_constant_from_db,\n", - " load_specified_constants,\n", + " load_constants_dict,\n", " CalibrationMetadata,\n", ")\n", "from cal_tools.step_timing import StepTimer\n", - "from iCalibrationDB import (\n", - " Conditions,\n", - " Constants,\n", - ")\n", "\n", "warnings.filterwarnings('ignore')\n", "\n", @@ -274,60 +269,35 @@ "metadata": {}, "outputs": [], "source": [ - "cond_dict = {\n", - " \"bias_voltage\": bias_voltage,\n", - " \"integration_time\": integration_time,\n", - " \"temperature\": temperature_k,\n", - " \"in_vacuum\": in_vacuum,\n", - "}\n", - "\n", - "dark_condition = Conditions.Dark.ePix100(**cond_dict)\n", - "\n", - "# update conditions with illuminated conditins.\n", - "cond_dict.update({\n", - " \"photon_energy\": gain_photon_energy\n", - " })\n", - "\n", - "illum_condition = Conditions.Illuminated.ePix100(**cond_dict)\n", + "epix_cal = EPIX100_CalibrationData(\n", + " detector_name=karabo_id,\n", + " sensor_bias_voltage=bias_voltage,\n", + " integration_time=integration_time,\n", + " sensor_temperature=temperature_k,\n", + " in_vacuum=in_vacuum,\n", + " source_energy=gain_photon_energy,\n", + " event_at=creation_time,\n", + " snapshot_at=None,#creation_time,\n", + " )\n", + "constant_names = [\"OffsetEPix100\", \"NoiseEPix100\"]\n", + "if relative_gain:\n", + " constant_names += [\"RelativeGainEPix100\"]\n", "\n", - "const_cond = {\n", - " \"Offset\": dark_condition,\n", - " \"Noise\": dark_condition,\n", - " \"RelativeGain\": illum_condition,\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "empty_constant = np.zeros((708, 768, 1), dtype=np.float32)\n", "if const_yaml: # Used while reproducing corrected data.\n", " print(f\"Using stored constants in {metadata.filename}\")\n", - " const_data, _ = load_specified_constants(const_yaml[karabo_da][\"constants\"])\n", - " for cname, cval in const_data.items():\n", - " if cval is None and cname != \"RelativeGain\":\n", - " const_data[cname] = empty_constant\n", - "else: # First correction attempt.\n", + " const_data, _ = load_constants_dict(const_yaml[karabo_da][\"constants\"])\n", + "else:\n", " const_data = dict()\n", - " for cname, condition in const_cond.items():\n", - " # Avoid retrieving RelativeGain, if not needed for correction.\n", - " if cname == \"RelativeGain\" and not relative_gain:\n", - " const_data[cname] = None\n", - " else:\n", - " const_data[cname] = get_constant_from_db(\n", - " karabo_id=karabo_id,\n", - " karabo_da=karabo_da,\n", - " constant=getattr(Constants.ePix100, cname)(),\n", - " condition=condition,\n", - " empty_constant=None if cname == \"RelativeGain\" else empty_constant,\n", - " cal_db_interface=cal_db_interface,\n", - " creation_time=creation_time,\n", - " print_once=2,\n", - " timeout=cal_db_timeout\n", - " )" + " for cname in constant_names:\n", + " try:\n", + " const_data[cname] = epix_cal.ndarray(module=1, calibration=cname) # TODO: what is this module number?\n", + " except CalCatError as e:\n", + " if cname == \"RelativeGainEPix100\":\n", + " print(\"RelativeGainEPix100 is not found. No gain correction will be applied.\")\n", + " relative_gain = False\n", + " absolute_gain = False\n", + " else:\n", + " raise CalCatError(f\"{cname}: {e}\")" ] }, { @@ -336,14 +306,6 @@ "metadata": {}, "outputs": [], "source": [ - "if relative_gain and const_data.get(\"RelativeGain\", None) is None:\n", - " print(\n", - " \"WARNING: RelativeGain map is requested, but not found.\\n\"\n", - " \"No gain correction will be applied\"\n", - " )\n", - " relative_gain = False\n", - " absolute_gain = False\n", - "\n", "# Initializing some parameters.\n", "hscale = 1\n", "stats = True\n", @@ -398,7 +360,7 @@ " blockSize=commonModeBlockSize, \n", " orientation='block',\n", " nCells=memoryCells, \n", - " noiseMap=const_data['Noise'],\n", + " noiseMap=const_data['NoiseEPix100'],\n", " runParallel=run_parallel,\n", " parallel=run_parallel,\n", " stats=stats,\n", @@ -410,7 +372,7 @@ " blockSize=commonModeBlockSize, \n", " orientation='row',\n", " nCells=memoryCells, \n", - " noiseMap=const_data['Noise'],\n", + " noiseMap=const_data['NoiseEPix100'],\n", " runParallel=run_parallel,\n", " parallel=run_parallel,\n", " stats=stats,\n", @@ -422,7 +384,7 @@ " blockSize=commonModeBlockSize, \n", " orientation='col',\n", " nCells=memoryCells, \n", - " noiseMap=const_data['Noise'],\n", + " noiseMap=const_data['NoiseEPix100'],\n", " runParallel=run_parallel,\n", " parallel=run_parallel,\n", " stats=stats,\n", @@ -438,7 +400,7 @@ "outputs": [], "source": [ "if relative_gain:\n", - " gain_cnst = np.median(const_data[\"RelativeGain\"])\n", + " gain_cnst = np.median(const_data[\"RelativeGainEPix100\"])\n", " hscale = gain_cnst\n", " plot_unit = 'keV'\n", " if photon_energy > 0:\n", @@ -447,7 +409,7 @@ " \n", " gainCorrection = xcal.RelativeGainCorrection(\n", " sensorSize,\n", - " gain_cnst/const_data[\"RelativeGain\"][..., None],\n", + " gain_cnst/const_data[\"RelativeGainEPix100\"][..., None],\n", " nCells=memoryCells,\n", " parallel=run_parallel,\n", " blockSize=blockSize,\n", @@ -483,7 +445,7 @@ "if pattern_classification :\n", " patternClassifier = xcal.PatternClassifier(\n", " [x, y],\n", - " const_data[\"Noise\"],\n", + " const_data[\"NoiseEPix100\"],\n", " split_evt_primary_threshold,\n", " split_evt_secondary_threshold,\n", " split_evt_mip_threshold,\n", @@ -539,7 +501,7 @@ " np.any(d > 0, axis=(0, 1)), d, axis=2)\n", " \n", " # Offset correction.\n", - " d -= const_data[\"Offset\"]\n", + " d -= const_data[\"OffsetEPix100\"]\n", "\n", " histCalOffsetCor.fill(d)\n", " # Common Mode correction.\n", @@ -563,7 +525,7 @@ " it changes the scale (the unit of measurement)\n", " of the data from ADU to either keV or n_of_photons.\n", " But the pattern classification relies on comparing\n", - " data with the noise map, which is still in ADU.\n", + " data with the NoiseEPix100 map, which is still in ADU.\n", "\n", " The best solution is to do a relative gain\n", " correction first and apply the global absolute\n", diff --git a/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb b/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb index 591d3b842..e9dbf4ab1 100644 --- a/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb +++ b/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb @@ -55,15 +55,12 @@ "from extra_data import RunDirectory\n", "from pathlib import Path\n", "\n", + "from cal_tools.calibration import EPIX100_CalibrationData\n", "from cal_tools.epix100 import epix100lib\n", "from cal_tools.tools import (\n", " calcat_creation_time,\n", - " get_dir_creation_date,\n", - " get_from_db,\n", - " save_constant_metadata,\n", " CalibrationMetadata,\n", - ")\n", - "from iCalibrationDB import Conditions, Constants" + ")" ] }, { @@ -144,55 +141,27 @@ "metadata": {}, "outputs": [], "source": [ - "cond_dict = {\n", - " \"bias_voltage\": bias_voltage,\n", - " \"integration_time\": integration_time,\n", - " \"temperature\": temperature_k,\n", - " \"in_vacuum\": in_vacuum,\n", - "}\n", - "\n", - "dark_condition = Conditions.Dark.ePix100(**cond_dict)\n", - "\n", - "# update conditions with illuminated conditions.\n", - "cond_dict.update({\"photon_energy\": gain_photon_energy})\n", - "\n", - "illum_condition = Conditions.Illuminated.ePix100(**cond_dict)\n", - "\n", - "const_cond = {\n", - " \"Offset\": dark_condition,\n", - " \"Noise\": dark_condition,\n", - " \"RelativeGain\": illum_condition,\n", - "}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ + "epix_cal = EPIX100_CalibrationData(\n", + " detector_name=karabo_id,\n", + " sensor_bias_voltage=bias_voltage,\n", + " integration_time=integration_time,\n", + " sensor_temperature=temperature_k,\n", + " in_vacuum=in_vacuum,\n", + " source_energy=gain_photon_energy,\n", + " event_at=creation_time,\n", + " snapshot_at=None,#creation_time,\n", + " )\n", + "epix_cal_metadata = epix_cal.metadata()\n", "const_data = dict()\n", "mdata_dict = dict()\n", "mdata_dict[\"constants\"] = dict()\n", - "for cname, condition in const_cond.items():\n", - " # Avoid retrieving RelativeGain, if not needed for correction.\n", - " if cname == \"RelativeGain\" and not relative_gain:\n", - " const_data[cname] = None\n", - " else:\n", - " const_data[cname], mdata = get_from_db(\n", - " karabo_id=karabo_id,\n", - " karabo_da=karabo_da,\n", - " constant=getattr(Constants.ePix100, cname)(),\n", - " condition=condition,\n", - " empty_constant=None,\n", - " cal_db_interface=cal_db_interface,\n", - " creation_time=creation_time,\n", - " verbosity=2,\n", - " timeout=cal_db_timeout,\n", - " meta_only=True,\n", - " )\n", - " save_constant_metadata(mdata_dict[\"constants\"], mdata, cname)\n", - "mdata_dict[\"physical-detector-unit\"] = mdata.calibration_constant_version.device_name\n", + "for cname, ccv_metadata in list(epix_cal_metadata.values())[0].items():\n", + " mdata_dict[\"constants\"][cname] = dict()\n", + " mdata_dict[\"constants\"][cname][\"path\"] = str(epix_cal.caldb_root / ccv_metadata[\"path\"])\n", + " mdata_dict[\"constants\"][cname][\"dataset\"] = ccv_metadata[\"dataset\"]\n", + " mdata_dict[\"constants\"][cname][\"creation-time\"] = ccv_metadata[\"begin_validity_at\"]\n", + " print(f\"Retrieved {cname} with creation-time: {ccv_metadata['begin_validity_at']}\")\n", + "mdata_dict[\"physical-detector-unit\"] = ccv_metadata[\"physical_name\"]\n", "retrieved_constants[karabo_da] = mdata_dict\n", "metadata.save()\n", "print(f\"Stored retrieved constants in {metadata.filename}\")" diff --git a/src/cal_tools/calibration.py b/src/cal_tools/calibration.py new file mode 100644 index 000000000..74eb86594 --- /dev/null +++ b/src/cal_tools/calibration.py @@ -0,0 +1,1027 @@ + +"""Interfaces to calibration constant data.""" + +import multiprocessing +import re +import socket +from datetime import date, datetime, time, timezone +from functools import lru_cache +from os import getenv +from pathlib import Path +from sys import maxsize +from weakref import WeakKeyDictionary + +import h5py +import numpy as np +from calibration_client import CalibrationClient +from calibration_client.modules import ( + Calibration, + CalibrationConstantVersion, + Detector, + Parameter, + PhysicalDetectorUnit, +) + +__all__ = [ + 'CalCatError', + 'CalibrationData', + 'AGIPD_CalibrationData', + 'LPD_CalibrationData', + 'DSSC_CalibrationData', + 'JUNGFRAU_CalibrationData', + 'PNCCD_CalibrationData', + 'EPIX100_CalibrationData', + 'Gotthard2_CalibrationData' +] + + +class CCVMetadata(dict): + """Dictionary for CCV metadata. + + Identical to a regular dict, but with a custom pandas-based + string representation to be easier to read. + """ + + def __str__(self): + """Pretty-print CCV metadata using pandas.""" + + import pandas as pd + + res = {pdu_idx: {calibration: ccv_data['ccv_name'] + for calibration, ccv_data in pdu_data.items()} + for pdu_idx, pdu_data in self.items()} + + return str(pd.DataFrame.from_dict(res, orient='index')) + + +class CalCatError(Exception): + """CalCat API error.""" + + def __init__(self, response): + super().__init__(response['info']) + + +class ClientWrapper(type): + """Metaclass to wrap each calibration_client exactly once.""" + + _clients = WeakKeyDictionary() + + def __call__(cls, client): + instance = cls._clients.get(client, None) + + if instance is None: + instance = cls._clients[client] = type.__call__(cls, client) + + return instance + + +class CalCatApi(metaclass=ClientWrapper): + """Internal calibration_client wrapper.""" + + get_detector_keys = [ + 'id', 'name', 'identifier', 'karabo_name', + 'karabo_id_control', 'description'] + get_pdu_keys = [ + 'id', 'physical_name', 'karabo_da', 'virtual_device_name', + 'detector_type_id', 'detector_id', 'description'] + + def __init__(self, client): + self.client = client + + @classmethod + def format_time(cls, dt): + """Parse different ways to specify time to CalCat.""" + + if isinstance(dt, datetime): + return dt.astimezone(timezone.utc).strftime('%Y%m%dT%H%M%S%Z') + elif isinstance(dt, date): + return cls.format_time(datetime.combine(dt, time())) + + return dt + + def format_cond(self, condition): + """Encode operating condition to CalCat API format. + + Args: + caldata (CalibrationData): Calibration data instance used to + interface with database. + + Returns: + (dict) Operating condition for use in CalCat API. + """ + + return {'parameters_conditions_attributes': [ + {'parameter_id': self.parameter_id(k), 'value': str(v)} + for k, v in condition.items()]} + + @lru_cache() + def detector(self, detector_name): + """Detector metadata.""" + + resp_detector = Detector.get_by_identifier( + self.client, detector_name) + + if not resp_detector['success']: + raise CalCatError(resp_detector) + + return {k: resp_detector['data'][k] for k in self.get_detector_keys} + + @lru_cache() + def physical_detector_units(self, detector_id, snapshot_at): + """Physical detector unit metadata.""" + + resp_pdus = PhysicalDetectorUnit.get_all_by_detector( + self.client, detector_id, self.format_time(snapshot_at)) + + if not resp_pdus['success']: + raise CalCatError(resp_pdus) + + return {int(pdu['karabo_da'][-2:]): { + k: pdu[k] for k in self.get_pdu_keys} + for pdu in resp_pdus['data']} + + @lru_cache() + def calibration_id(self, calibration_name): + """ID for a calibration in CalCat.""" + + resp_calibration = Calibration.get_by_name( + self.client, calibration_name) + + if not resp_calibration['success']: + raise CalCatError(resp_calibration) + + return resp_calibration['data']['id'] + + @lru_cache() + def parameter_id(self, param_name): + """ID for an operating condition parameter in CalCat.""" + + resp_parameter = Parameter.get_by_name(self.client, param_name) + + if not resp_parameter['success']: + raise CalCatError(resp_parameter) + + return resp_parameter['data']['id'] + + def closest_ccv_by_time_by_condition( + self, detector_name, calibrations, condition, + modules=None, event_at=None, snapshot_at=None, metadata=None, + ): + """Query bulk CCV metadata from CalCat. + + This method uses the /get_closest_version_by_detector API + to query matching CCVs for PDUs connected to a detector instance + in one go. In particular, it automatically includes the PDU as + an operating condition parameter to allow for a single global + condition rather than PDU-specific ones. + + Args: + detector_name (str): Detector instance name. + calibrations (Iterable of str): Calibrations to query + metadata for. + condition (dict): Mapping of parameter name to value. + modules (Collection of int or None): List of module numbers + or None for all (default). + event_at (datetime, date, str or None): Time at which the + CCVs should have been valid or None for now (default). + snapshot_at (datetime, date, str or None): Time of database + state to look at or None for now (default). + metadata (dict or None): Mapping to fill for results or + None for a new dictionary (default). + + Returns: + (dict) Nested mapping of module number to calibrations to + CCV metadata. Identical to passed metadata argument if + passed. + """ + event_at = self.format_time(event_at) + snapshot_at = self.format_time(snapshot_at) + + # Map aggregator to module number. + da_to_modno = { + data['karabo_da']: modno for modno, data in + self.physical_detector_units( + self.detector(detector_name)['id'], snapshot_at).items() + if not modules or modno in modules} + + if metadata is None: + metadata = CCVMetadata() + + if not calibrations: + # Make sure there are at least empty dictionaries for each + # module. + for modno in da_to_modno.values(): + metadata.setdefault(modno, dict()) + return metadata + + # Map calibration ID to calibratio name. + cal_id_map = {self.calibration_id(calibration): calibration + for calibration in calibrations} + calibration_ids = list(cal_id_map.keys()) + + # The API call supports a single module or all modules, as the + # performance increase is only minor in between. Hence, all + # modules are queried if more than one is selected and filtered + # afterwards, if necessary. + karabo_da = next(iter(da_to_modno)) if len(da_to_modno) == 1 else '', + resp_versions = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions( # noqa + self.client, detector_name, calibration_ids, + self.format_cond(condition), + karabo_da=karabo_da, + event_at=event_at, snapshot_at=snapshot_at) + + if not resp_versions['success']: + raise CalCatError(resp_versions) + + for ccv in resp_versions['data']: + try: + modno = da_to_modno[ccv['physical_detector_unit']['karabo_da']] + except KeyError: + # Not included in our modules + continue + + cc = ccv['calibration_constant'] + metadata.setdefault( + modno, dict())[cal_id_map[cc['calibration_id']]] = dict( + cc_id=cc['id'], + cc_name=cc['name'], + condition_id=cc['condition_id'], + ccv_id=ccv['id'], + ccv_name=ccv['name'], + path=Path(ccv['path_to_file']) / ccv['file_name'], + dataset=ccv['data_set_name'], + begin_validity_at=ccv['begin_validity_at'], + end_validity_at=ccv['end_validity_at'], + raw_data_location=ccv['raw_data_location'], + start_idx=ccv['start_idx'], + end_idx=ccv['end_idx'], + physical_name=ccv['physical_detector_unit']['physical_name'], + ) + + return metadata + + +class CalibrationData: + """Calibration constants data for detectors. + + European XFEL uses a web app and database to store records about the + characterization of detectors and the data necessary to their + correction and analysis, collectively called CalCat. The default + installation is available at https://in.xfel.eu/calibration. + + A detector is identified by a name (e.g. SPB_DET_AGIPD1M-1) and + consists of one or more detector modules. The modules are a virtual + concept and may be identified by their number (e.g. 3), the Karabo + data aggregator in EuXFEL's DAQ system they're connected to + (e.g. AGIPD05) or a virtual device name describing their relative + location (e.g. Q3M2). + + A detector module is mapped to an actual physical detector unit + (PDU), which may be changed in case of a physical replacement. When + characterization data is inserted into the database, it is attached + to the PDU currently mapped to a module and not the virtual module + itself. + + Characterization data is organized by its type just called + calibration (e.g. Offset or SlopesFF) and the operating condition it + was taken in, which is a mapping of parameter keys to their values + (e.g. Sensor bias voltage or integration time). Any unique + combination of calibration (type) and operating condition is a + calibration constant (CC). Any individual measurement of a CC is + called a calibration constant version (CCV). There may be many CCVs + for any given CC. + + Note that while an authenticated connection to CalCat is possible + from anywhere, the actual calibration data referred to is only + available on the European XFEL computing infrastructure. If no + explicit credentials are supplied, an anonymous read-only connection + is established that is also only available from there. + """ + + calibrations = set() + default_client = None + + def __init__(self, detector_name, modules=None, client=None, event_at=None, + snapshot_at=None): + """Initialize a new CalibrationData object. + + If no calibration-client object is passed or has been created + using Calibration.new_client, an anonymous read-only connection + is established automatically. + + Args: + detector_name (str): Name of detector in CalCat. + modules (Iterable of int, optional): Module numbers to + query for or None for all available (default). + client (CalibrationClient, optional): Client for CalCat + communication, global one by default. + event_at (datetime, date, str or None): Default time at which the + CCVs should have been valid, now if omitted + snapshot_at (datetime, date, str or None): Default time of + database state to look at, now if omitted. + **condition_params: Operating condition parameters defined + on an instance level. + """ + + self.detector_name = detector_name + self.modules = modules + self.event_at = event_at + self.snapshot_at = snapshot_at + + if client is None: + client = self.__class__.default_client or \ + self.__class__.new_anonymous_client() + + self._api = CalCatApi(client) + + @staticmethod + def new_anonymous_client(): + """Create an anonymous calibration-client object. + + This connection allows read-only access to CalCat using a + facility-proveded OAuth reverse proxy. This is only accessible + on the European XFEL computing infrastructure. + """ + + print('Access to CalCat via the XFEL OAuth proxy is currently ' + 'considered in testing, please report any issues to ' + 'da-support@xfel.eu') + return CalibrationData.new_client( + None, None, None, use_oauth2=False, + base_url='http://exflcalproxy:8080/') + + @staticmethod + def new_client( + client_id, client_secret, user_email, installation='', + base_url='https://in.xfel.eu/{}calibration', **kwargs, + ): + """Create a new calibration-client object. + + The client object is saved as a class property and is + automatically to any future CalibrationData objects created, if + no other client is passed explicitly. + + Arguments: + client_id (str): Client ID. + client_secret (str): Client secret. + user_email (str): LDAP user email. + installation (str, optional): Prefix for CalCat + installation, production system by default. + base_url (str, optional): URL template for CalCat + installation, public European XFEL by default. + Any further keyword arguments are passed on to + CalibrationClient.__init__(). + + Returns: + (CalibrationClient) CalCat client. + """ + + base_url = base_url.format(f'{installation}_' if installation else '') + + # Note this is not a classmethod and we're modifying + # CalibrationData directly to use the same object across all + # detector-specific implementations. + CalibrationData.default_client = CalibrationClient( + client_id=client_id, + client_secret=client_secret, + user_email=user_email, + base_api_url=f'{base_url}/api/', + token_url=f'{base_url}/oauth/token', + refresh_url=f'{base_url}/oauth/token', + auth_url=f'{base_url}/oauth/authorize', + scope='', + **kwargs + ) + return CalibrationData.default_client + + @property + def caldb_root(self): + """Root directory for calibration constant data. + + Returns: + (Path or None) Location of caldb store or + None if not available. + """ + + if not hasattr(CalibrationData, '_caldb_root'): + if getenv('SASE'): + # ONC + CalibrationData._caldb_root = Path('/common/cal/caldb_store') + elif re.match(r'^max-(.+)\.desy\.de$', socket.getfqdn()): + # Maxwell + CalibrationData._caldb_root = Path( + '/gpfs/exfel/d/cal/caldb_store') + else: + # Probably unavailable + CalibrationData._caldb_root = None + + return CalibrationData._caldb_root + + @property + def client(self): + return self._api.client + + @property + def detector(self): + return self._api.detector(self.detector_name) + + @property + def physical_detector_units(self): + return self._api.physical_detector_units( + self.detector['id'], self.snapshot_at) + + @property + def condition(self): + return self._build_condition(self.parameters) + + def replace(self, **new_kwargs): + """Create a new CalibrationData object with altered values.""" + + keys = { + 'detector_name', 'modules', 'client', 'event_at', 'snapshot_at' + } | { + self._simplify_parameter_name(name)for name in self.parameters + } + + kwargs = {key: getattr(self, key) for key in keys} + kwargs.update(new_kwargs) + + return self.__class__(**kwargs) + + def metadata( + self, calibrations=None, event_at=None, snapshot_at=None, + ): + """Query CCV metadata for calibrations, conditions and time. + + Args: + calibrations (Iterable of str, optional): Calibrations to + query metadata for, may be None to retrieve all. + event_at (datetime, date, str or None): Time at which the + CCVs should have been valid, now or default value passed at + initialization time if omitted. + snapshot_at (datetime, date, str or None): Time of database + state to look at, now or default value passed at + initialization time if omitted. + + Returns: + (CCVMetadata) CCV metadata result. + """ + + metadata = CCVMetadata() + self._api.closest_ccv_by_time_by_condition( + self.detector_name, calibrations or self.calibrations, + self.condition, self.modules, + event_at or self.event_at, snapshot_at or self.snapshot_at, + metadata) + + return metadata + + def ndarray( + self, module, calibration, metadata=None, + ): + """Load CCV data as ndarray. + + Args: + module (int): Module number + calibration (str): Calibration constant. + metadata (CCVMetadata, optional): CCV metadata to load + constant data for, may be None to query metadata. + + Returns: + (ndarray): CCV data + """ + + if self.caldb_root is None: + raise RuntimeError('calibration database store unavailable') + + if self.modules and module not in self.modules: + raise ValueError('module not part of this calibration data') + + if metadata is None: + metadata = self.metadata([calibration]) + + row = metadata[module][calibration] + + with h5py.File(self.caldb_root / row['path'], 'r') as f: + return np.asarray(f[row['dataset'] + '/data']) + + def ndarray_map( + self, calibrations=None, metadata=None, processes=None, + ): + """Load all CCV data in a nested map of ndarrays. + + Args: + calibrations (Iterable of str, optional): Calibration constants + or None for all available (default). + metadata (CCVMetadata, optional): CCV metadata to load constant + for or None to query metadata automatically (default). + + Returns: + (dict of dict of ndarray): CCV data by module number and + calibration constant name. + """ + + if self.caldb_root is None: + raise RuntimeError('calibration database store unavailable') + + if metadata is None: + metadata = self.metadata(calibrations) + + map_arg = [ + (modno, cname) for modno, mdata in metadata.items() for cname in mdata.keys()] # noqa + + with multiprocessing.pool.ThreadPool(processes=processes) as pool: + r = pool.starmap(self.ndarray, map_arg) + + arr_map = {} + for i, (modno, cname) in enumerate(map_arg): + arr_map.setdefault(modno, {})[cname] = r[i] + return arr_map + + def _build_condition(self, parameters): + cond = dict() + + for db_name in parameters: + value = getattr(self, self._simplify_parameter_name(db_name), None) + + if value is not None: + cond[db_name] = value + + return cond + + @classmethod + def _from_multimod_detector_data( + cls, component_cls, data, detector, + modules, client, + ): + if isinstance(detector, component_cls): + detector_name = detector.detector_name + elif detector is None: + detector_name = component_cls._find_detector_name(data) + elif isinstance(detector, str): + detector_name = detector + else: + raise ValueError(f'detector may be an object of type ' + f'{type(cls)}, a string or None') + + source_to_modno = dict(component_cls._source_matches( + data, detector_name)) + detector_sources = [data[source] for source in source_to_modno.keys()] + + if modules is None: + modules = sorted(source_to_modno.values()) + + creation_date = cls._determine_data_creation_date(data) + + # Create new CalibrationData object. + caldata = cls( + detector_name, modules, client, + creation_date, creation_date, + ) + + caldata.memory_cells = component_cls._get_memory_cell_count( + detector_sources[0]) + caldata.pixels_x = component_cls.module_shape[1] + caldata.pixels_y = component_cls.module_shape[0] + + return caldata, detector_sources + + @staticmethod + def _simplify_parameter_name(name): + """Convert parameter names to valid Python symbols.""" + + return name.lower().replace(' ', '_') + + @staticmethod + def _determine_data_creation_date(data): + """Determine data creation date.""" + + assert data.files, 'data contains no files' + + try: + creation_date = data.files[0].metadata()['creationDate'] + except KeyError: + from warnings import warn + warn('Last file modification time used as creation date for old ' + 'DAQ file format may be unreliable') + + return datetime.fromtimestamp( + Path(data.files[0].filename).lstat().st_mtime) + else: + if not data.is_single_run: + from warnings import warn + warn('Sample file used to determine creation date for multi ' + 'run data') + + return creation_date + + +class SplitConditionCalibrationData(CalibrationData): + """Calibration data with dark and illuminated conditions. + + Some detectors of this kind distinguish between two different + operating conditions depending on whether photons illuminate the + detector or not, correspondingly called the illuminated and dark + conditions. Typically the illuminated condition is a superset of the + dark condition. + + Not all implementations for semiconductor detectors inherit from + this type, but only those that make this distinction such as AGIPD + and LPD. + """ + + dark_calibrations = set() + illuminated_calibrations = set() + dark_parameters = list() + illuminated_parameters = list() + + @property + def calibrations(self): + """Compatibility with CalibrationData.""" + + return self.dark_calibrations | self.illuminated_calibrations + + @property + def parameters(self): + """Compatibility with CalibrationData.""" + + # Removes likely duplicates while preserving order. + return list(dict.fromkeys( + self.dark_parameters + self.illuminated_parameters)) + + @property + def condition(self): + """Compatibility with CalibrationData.""" + + cond = dict() + cond.update(self.dark_condition) + cond.update(self.illuminated_condition) + + return cond + + @property + def dark_condition(self): + return self._build_condition(self.dark_parameters) + + @property + def illuminated_condition(self): + return self._build_condition(self.illuminated_parameters) + + def metadata( + self, calibrations=None, event_at=None, snapshot_at=None, + ): + """Query CCV metadata for calibrations, conditions and time. + + Args: + calibrations (Iterable of str, optional): Calibrations to + query metadata for, may be None to retrieve all. + event_at (datetime, date, str or None): Time at which the + CCVs should have been valid, now or default value passed at + initialization time if omitted. + snapshot_at (datetime, date, str or None): Time of database + state to look at, now or default value passed at + initialization time if omitted. + + Returns: + (CCVMetadata) CCV metadata result. + """ + + if calibrations is None: + calibrations = ( + self.dark_calibrations | self.illuminated_calibrations) + + metadata = CCVMetadata() + + dark_calibrations = self.dark_calibrations & set(calibrations) + if dark_calibrations: + self._api.closest_ccv_by_time_by_condition( + self.detector_name, dark_calibrations, + self.dark_condition, self.modules, + event_at or self.event_at, snapshot_at or self.snapshot_at, + metadata) + + illum_calibrations = self.illuminated_calibrations & set(calibrations) + if illum_calibrations: + self._api.closest_ccv_by_time_by_condition( + self.detector_name, illum_calibrations, + self.illuminated_condition, self.modules, + event_at or self.event_at, snapshot_at or self.snapshot_at, + metadata) + + return metadata + + +class AGIPD_CalibrationData(SplitConditionCalibrationData): + """Calibration data for the AGIPD detector.""" + + dark_calibrations = { + 'Offset', + 'Noise', + 'ThresholdsDark', + 'BadPixelsDark', + 'BadPixelsPC', + 'SlopesPC', + } + illuminated_calibrations = { + 'BadPixelsFF', + 'SlopesFF', + } + + dark_parameters = [ + 'Sensor Bias Voltage', + 'Pixels X', + 'Pixels Y', + 'Memory cells', + 'Acquisition rate', + 'Gain setting', + 'Gain mode', + 'Integration time', + ] + illuminated_parameters = dark_parameters + ['Source energy'] + + def __init__( + self, detector_name, sensor_bias_voltage, + memory_cells, acquisition_rate, + modules=None, client=None, + event_at=None, snapshot_at=None, + gain_setting=None, gain_mode=None, + integration_time=12, source_energy=9.2, + pixels_x=512, pixels_y=128, + ): + super().__init__( + detector_name, modules, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.acquisition_rate = acquisition_rate + self.gain_setting = gain_setting + self.gain_mode = gain_mode + self.integration_time = integration_time + self.source_energy = source_energy + + def _build_condition(self, parameters): + cond = super()._build_condition(parameters) + + # Fix-up some database quirks. + if int(cond.get('Gain mode', -1)) == 0: + del cond['Gain mode'] + + if int(cond.get('Integration time', -1)) == 12: + del cond['Integration time'] + + return cond + + +class LPD_CalibrationData(SplitConditionCalibrationData): + """Calibration data for the LPD detector.""" + + dark_calibrations = { + 'Offset', + 'Noise', + 'BadPixelsDark', + } + illuminated_calibrations = { + 'RelativeGain', + 'GainAmpMap', + 'FFMap', + 'BadPixelsFF', + } + + dark_parameters = [ + 'Sensor Bias Voltage', + 'Memory cells', + 'Pixels X', + 'Pixels Y', + 'Feedback capacitor', + ] + illuminated_parameters = dark_parameters + ['Source Energy', 'category'] + + def __init__( + self, detector_name, sensor_bias_voltage, + memory_cells, feedback_capacitor=5.0, + pixels_x=256, pixels_y=256, + source_energy=9.2, category=1, + modules=None, client=None, + event_at=None, snapshot_at=None, + ): + super().__init__( + detector_name, modules, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.feedback_capacitor = feedback_capacitor + self.source_energy = source_energy + self.category = category + + +class DSSC_CalibrationData(CalibrationData): + """Calibration data for the DSSC detetor.""" + + calibrations = { + 'Offset', + 'Noise', + } + parameters = [ + 'Sensor Bias Voltage', + 'Memory cells', + 'Pixels X', + 'Pixels Y', + 'Pulse id checksum', + 'Acquisition rate', + 'Target gain', + 'Encoded gain', + ] + + def __init__( + self, detector_name, + sensor_bias_voltage, memory_cells, + pulse_id_checksum=None, acquisition_rate=None, + target_gain=None, encoded_gain=None, + pixels_x=512, pixels_y=128, + modules=None, client=None, + event_at=None, snapshot_at=None, + ): + super().__init__( + detector_name, modules, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.pulse_id_checksum = pulse_id_checksum + self.acquisition_rate = acquisition_rate + self.target_gain = target_gain + self.encoded_gain = encoded_gain + + +class JUNGFRAU_CalibrationData(CalibrationData): + """Calibration data for the JUNGFRAU detector.""" + + calibrations = { + 'Offset10Hz', + 'Noise10Hz', + 'BadPixelsDark10Hz', + 'RelativeGain10Hz', + 'BadPixelsFF10Hz', + } + parameters = [ + 'Sensor Bias Voltage', + 'Memory Cells', + 'Pixels X', + 'Pixels Y', + 'Integration Time', + 'Sensor temperature', + 'Gain Setting', + ] + + # class GainSetting(Enum): + # dynamicgain = 0 + # dynamichg0 = 1 + + def __init__( + self, detector_name, sensor_bias_voltage, + memory_cells, integration_time, + sensor_temperature, gain_setting, + pixels_x=1024, pixels_y=512, + modules=None, client=None, + event_at=None, snapshot_at=None, + ): + super().__init__( + detector_name, modules, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = memory_cells + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.integration_time = integration_time + self.sensor_temperature = sensor_temperature + self.gain_setting = gain_setting + + +class PNCCD_CalibrationData(CalibrationData): + calibrations = { + 'OffsetCCD', + 'BadPixelsDarkCCD', + 'NoiseCCD', + 'RelativeGainCCD', + 'CTECCD', + } + parameters = [ + 'Sensor Bias Voltage', + 'Memory cells', + 'Pixels X', + 'Pixels Y', + 'Integration Time', + 'Sensor Temperature', + 'Gain Setting', + ] + + def __init__( + self, detector_name, sensor_bias_voltage, + integration_time, sensor_temperature, + gain_setting, pixels_x=1024, + pixels_y=1024, client=None, + event_at=None, snapshot_at=None, + ): + # Ignore modules for this detector. + super().__init__( + detector_name, None, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.memory_cells = 1 # Ignore memory_cells for this detector + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.integration_time = integration_time + self.sensor_temperature = sensor_temperature + self.gain_setting = gain_setting + + +class EPIX100_CalibrationData(SplitConditionCalibrationData): + dark_calibrations = { + 'OffsetEPix100', + 'NoiseEPix100', + 'BadPixelsDarkEPix100', + } + illuminated_calibrations = { + 'RelativeGainEPix100', + #'BadPixelsFFEPix100', + } + dark_parameters = [ + 'Sensor Bias Voltage', + 'Memory cells', + 'Pixels X', + 'Pixels Y', + 'Integration time', + 'Sensor temperature', + 'In vacuum', + ] + illuminated_parameters = dark_parameters + ['Source energy'] + + def __init__( + self, detector_name, + sensor_bias_voltage, integration_time, + in_vacuum=0, sensor_temperature=288, + pixels_x=708, pixels_y=768, + source_energy=9.2, client=None, + event_at=None, snapshot_at=None, + ): + # Ignore modules for this detector. + super().__init__( + detector_name, None, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.integration_time = integration_time + self.memory_cells = 1 # Ignore memory_cells for this detector + self.pixels_x = pixels_x + self.pixels_y = pixels_y + self.in_vacuum = in_vacuum + self.sensor_temperature = sensor_temperature + self.source_energy = source_energy + + +class GOTTHARD2_CalibrationData(CalibrationData): + calibrations = { + 'LUTGotthard2' + 'OffsetGotthard2', + 'NoiseGotthard2', + 'BadPixelsDarkGotthard2', + 'RelativeGainGotthard2', + 'BadPixelsFFGotthard2', + } + parameters = [ + 'Sensor Bias Voltage', + 'Memory cells', + 'Pixels X', + 'Pixels Y', + 'Integration time', + 'Sensor temperature', + 'Gain setting', + ] + + def __init__( + self, detector_name, + sensor_bias_voltage, exposure_time, + exposure_period, acquisition_rate, + single_photon, client=None, + event_at=None, snapshot_at=None, + ): + # Ignore modules for this detector. + super().__init__( + detector_name, None, client, event_at, snapshot_at, + ) + + self.sensor_bias_voltage = sensor_bias_voltage + self.exposure_time = exposure_time + self.exposure_period = exposure_period + self.acquisition_rate = acquisition_rate + self.single_photon = single_photon diff --git a/src/cal_tools/tools.py b/src/cal_tools/tools.py index 2d15866ac..88c533282 100644 --- a/src/cal_tools/tools.py +++ b/src/cal_tools/tools.py @@ -880,6 +880,41 @@ def save_constant_metadata( const_mdata["creation-time"] = None +def load_constants_dict( + retrieved_constants: dict, + empty_constants: Optional[dict] = None, + ) -> Tuple[dict, dict]: + """Load constant data from metadata in the + retrieved_constants dictionary. + + :param retrieved_constants: A dict. with the constant filepaths and + dataset-name to read the constant data arrays. + { + 'Constant Name': { + 'file-path': '/gpfs/.../*.h5', + 'dataset-name': '/module_name/...', + 'creation-time': str(datetime),}, + } + :param empty_constants: A dict of constant names keys and + the empty constant array to use in case of not non-retrieved constants. + :return constant_data: A dict of constant names keys and their data. + """ + const_data = dict() + when = dict() + + for cname, mdata in retrieved_constants.items(): + const_data[cname] = dict() + when[cname] = mdata["creation-time"] + if when[cname]: + with h5py.File(mdata["path"], "r") as cf: + const_data[cname] = np.copy( + cf[f"{mdata['dataset']}/data"]) + else: + const_data[cname] = ( + empty_constants[cname] if empty_constants else None) + return const_data, when + + def load_specified_constants( retrieved_constants: dict, empty_constants: Optional[dict] = None, -- GitLab