diff --git a/docs/source/calcat_interface.rst b/docs/source/calcat_interface.rst index e9ba50289196d8394d15e356ef3b0a0a047ebf7f..60b9f1429e024d86149e8dd68e01f72f44be7d3f 100644 --- a/docs/source/calcat_interface.rst +++ b/docs/source/calcat_interface.rst @@ -1,15 +1,10 @@ CALCAT Interface ================ - -.. class:: CCVMetadata +.. module:: cal_tools.calcat_interface .. class:: CalCatError -.. class:: ClientWrapper - -.. class:: CalCatApi - .. class:: CalibrationData .. attribute:: metadata diff --git a/notebooks/LPD/LPD_Correct_Fast.ipynb b/notebooks/LPD/LPD_Correct_Fast.ipynb index 4b8dfa4576a2ce551562609a818c18da4063ee6e..397d144a47ac51700f45c7730e40c569bd5306ca 100644 --- a/notebooks/LPD/LPD_Correct_Fast.ipynb +++ b/notebooks/LPD/LPD_Correct_Fast.ipynb @@ -26,7 +26,7 @@ "metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.\n", "sequences = [-1] # Sequences to correct, use [-1] for all\n", "modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty\n", - "karabo_da = [''] # Data aggregators names to correct, use [''] for all\n", + "karabo_da = ['all'] # Data aggregators names to correct, use [''] for all\n", "run = 10 # run to process, required\n", "\n", "# Source parameters\n", @@ -104,6 +104,7 @@ "\n", "from extra_data.components import LPD1M\n", "\n", + "from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData\n", "from cal_tools.lpdalgs import correct_lpd_frames\n", "from cal_tools.tools import CalibrationMetadata, calcat_creation_time\n", "from cal_tools.files import DataFile\n", @@ -139,11 +140,13 @@ "print(f'Using {creation_time.isoformat()} as creation time')\n", "\n", "# Pick all modules/aggregators or those selected.\n", - "if not karabo_da or karabo_da == ['']:\n", - " if not modules or modules == [-1]:\n", + "if karabo_da == ['all']:\n", + " if modules == [-1]:\n", " modules = list(range(16))\n", "\n", " karabo_da = [f'LPD{i:02d}' for i in modules]\n", + "else:\n", + " modules = [int(x[-2:]) for x in karabo_da]\n", " \n", "# Pick all sequences or those selected.\n", "if not sequences or sequences == [-1]:\n", @@ -242,37 +245,11 @@ " dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32\n", "\n", " const_data[(da, calibration_name)] = dict(\n", - " path=Path(ccv['file-path']),\n", - " dataset=ccv['dataset-name'],\n", + " path=Path(ccv['path']),\n", + " dataset=ccv['dataset'],\n", " data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)\n", " )\n", "else: # Retrieve constants from CALCAT.\n", - " dark_calibrations = {\n", - " 1: 'Offset', # np.float32\n", - " 14: 'BadPixelsDark' # should be np.uint32, but is np.float64\n", - " }\n", - "\n", - " dark_condition = [\n", - " dict(parameter_id=1, value=bias_voltage), # Sensor bias voltage\n", - " dict(parameter_id=7, value=mem_cells), # Memory cells\n", - " dict(parameter_id=15, value=capacitor), # Feedback capacitor\n", - " dict(parameter_id=13, value=256), # Pixels X\n", - " dict(parameter_id=14, value=256), # Pixels Y\n", - " ]\n", - "\n", - " illuminated_calibrations = {\n", - " 20: 'BadPixelsFF', # np.uint32\n", - " 42: 'GainAmpMap', # np.float32\n", - " 43: 'FFMap', # np.float32\n", - " 44: 'RelativeGain' # np.float32\n", - " }\n", - "\n", - " illuminated_condition = dark_condition.copy()\n", - " illuminated_condition += [\n", - " dict(parameter_id=3, value=photon_energy), # Source energy\n", - " dict(parameter_id=25, value=category) # category\n", - " ]\n", - "\n", " print('Querying calibration database', end='', flush=True)\n", " start = perf_counter()\n", " for calibrations, condition in [\n", diff --git a/notebooks/LPD/LPD_retrieve_constants_precorrection.ipynb b/notebooks/LPD/LPD_retrieve_constants_precorrection.ipynb index 7dfde91a4fdec82803f954e386b617f347b23008..9d6088af1671e41ebbdce15025b1c26d7327c200 100644 --- a/notebooks/LPD/LPD_retrieve_constants_precorrection.ipynb +++ b/notebooks/LPD/LPD_retrieve_constants_precorrection.ipynb @@ -22,7 +22,7 @@ "out_folder = \"/gpfs/exfel/data/scratch/ahmedk/test/remove/LPD_test\" # the folder to output to, required\n", "metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.\n", "modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty\n", - "karabo_da = [''] # Data aggregators names to correct, use [''] for all\n", + "karabo_da = ['all'] # Data aggregators names to correct, use [''] for all\n", "run = 10 # run to process, required\n", "\n", "# Source parameters\n", @@ -46,17 +46,14 @@ "outputs": [], "source": [ "from pathlib import Path\n", - "from time import perf_counter\n", - "\n", - "from calibration_client import CalibrationClient\n", - "from calibration_client.modules import CalibrationConstantVersion\n", "\n", + "from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData\n", "from cal_tools.tools import (\n", " CalibrationMetadata,\n", " calcat_creation_time,\n", - " save_constant_metadata,\n", ")\n", - "from cal_tools.restful_config import restful_config" + "from cal_tools.restful_config import restful_config\n", + "from cal_tools.step_timing import StepTimer" ] }, { @@ -76,11 +73,12 @@ "print(f'Using {creation_time.isoformat()} as creation time')\n", "\n", "# Pick all modules/aggregators or those selected.\n", - "if not karabo_da or karabo_da == ['']:\n", - " if not modules or modules == [-1]:\n", + "if karabo_da == ['all']:\n", + " if modules == [-1]:\n", " modules = list(range(16))\n", - "\n", - " karabo_da = [f'LPD{i:02d}' for i in modules] " + " karabo_da = [f'LPD{i:02d}' for i in modules]\n", + "else:\n", + " modules = [int(x[-2:]) for x in karabo_da]" ] }, { @@ -89,18 +87,7 @@ "metadata": {}, "outputs": [], "source": [ - "# Connect to CalCat.\n", - "calcat_config = restful_config['calcat']\n", - "client = CalibrationClient(\n", - " base_api_url=calcat_config['base-api-url'],\n", - " use_oauth2=calcat_config['use-oauth2'],\n", - " client_id=calcat_config['user-id'],\n", - " client_secret=calcat_config['user-secret'],\n", - " user_email=calcat_config['user-email'],\n", - " token_url=calcat_config['token-url'],\n", - " refresh_url=calcat_config['refresh-url'],\n", - " auth_url=calcat_config['auth-url'],\n", - " scope='')" + "step_timer = StepTimer()" ] }, { diff --git a/notebooks/ePix100/Correction_ePix100_NBC.ipynb b/notebooks/ePix100/Correction_ePix100_NBC.ipynb index ca167a235bb06fe55efb4dd208ef4fb2557da468..9a38ee093c612f758ff5818634bc586e4b7c7d27 100644 --- a/notebooks/ePix100/Correction_ePix100_NBC.ipynb +++ b/notebooks/ePix100/Correction_ePix100_NBC.ipynb @@ -98,7 +98,7 @@ "from XFELDetAna import xfelpycaltools as xcal\n", "from cal_tools import h5_copy_except\n", "from cal_tools.epix100 import epix100lib\n", - "from cal_tools.calibration import CalCatError, EPIX100_CalibrationData\n", + "from cal_tools.calcat_interface import CalCatError, EPIX100_CalibrationData\n", "from cal_tools.tools import (\n", " calcat_creation_time,\n", " load_constants_dict,\n", @@ -277,7 +277,7 @@ " in_vacuum=in_vacuum,\n", " source_energy=gain_photon_energy,\n", " event_at=creation_time,\n", - " snapshot_at=None,#creation_time,\n", + " snapshot_at=None, # creation_time, # TODO:.. why None works\n", " )\n", "constant_names = [\"OffsetEPix100\", \"NoiseEPix100\"]\n", "if relative_gain:\n", diff --git a/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb b/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb index e9dbf4ab14dbad830c647f99513057b6f2a2a38c..a8a56f47d2804e16fbacc639fb5d69a688b157a0 100644 --- a/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb +++ b/notebooks/ePix100/ePix100_retrieve_constants_precorrection.ipynb @@ -55,7 +55,7 @@ "from extra_data import RunDirectory\n", "from pathlib import Path\n", "\n", - "from cal_tools.calibration import EPIX100_CalibrationData\n", + "from cal_tools.calcat_interface import EPIX100_CalibrationData\n", "from cal_tools.epix100 import epix100lib\n", "from cal_tools.tools import (\n", " calcat_creation_time,\n", @@ -149,18 +149,25 @@ " in_vacuum=in_vacuum,\n", " source_energy=gain_photon_energy,\n", " event_at=creation_time,\n", - " snapshot_at=None,#creation_time,\n", + " snapshot_at=None, # creation_time, # TODO:.. why None works\n", " )\n", - "epix_cal_metadata = epix_cal.metadata()\n", + "\n", "const_data = dict()\n", "mdata_dict = dict()\n", "mdata_dict[\"constants\"] = dict()\n", - "for cname, ccv_metadata in list(epix_cal_metadata.values())[0].items():\n", - " mdata_dict[\"constants\"][cname] = dict()\n", - " mdata_dict[\"constants\"][cname][\"path\"] = str(epix_cal.caldb_root / ccv_metadata[\"path\"])\n", - " mdata_dict[\"constants\"][cname][\"dataset\"] = ccv_metadata[\"dataset\"]\n", - " mdata_dict[\"constants\"][cname][\"creation-time\"] = ccv_metadata[\"begin_validity_at\"]\n", - " print(f\"Retrieved {cname} with creation-time: {ccv_metadata['begin_validity_at']}\")\n", + "\n", + "constant_names = [\"OffsetEPix100\", \"NoiseEPix100\"]\n", + "if relative_gain:\n", + " constant_names += [\"RelativeGainEPix100\"]\n", + "\n", + "for cname in constant_names:\n", + " # Retrieve metadata for all epix100 constants.\n", + " for cname, ccv_metadata in list(epix_cal.metadata([cname]).values())[0].items():\n", + " mdata_dict[\"constants\"][cname] = dict()\n", + " mdata_dict[\"constants\"][cname][\"path\"] = str(epix_cal.caldb_root / ccv_metadata[\"path\"])\n", + " mdata_dict[\"constants\"][cname][\"dataset\"] = ccv_metadata[\"dataset\"]\n", + " mdata_dict[\"constants\"][cname][\"creation-time\"] = ccv_metadata[\"begin_validity_at\"]\n", + " print(f\"Retrieved {cname} with creation-time: {ccv_metadata['begin_validity_at']}\")\n", "mdata_dict[\"physical-detector-unit\"] = ccv_metadata[\"physical_name\"]\n", "retrieved_constants[karabo_da] = mdata_dict\n", "metadata.save()\n", diff --git a/src/cal_tools/calibration.py b/src/cal_tools/calibration.py deleted file mode 100644 index 74eb8659490619cb6124cc4ff7835bbe7a3e3ee1..0000000000000000000000000000000000000000 --- a/src/cal_tools/calibration.py +++ /dev/null @@ -1,1027 +0,0 @@ - -"""Interfaces to calibration constant data.""" - -import multiprocessing -import re -import socket -from datetime import date, datetime, time, timezone -from functools import lru_cache -from os import getenv -from pathlib import Path -from sys import maxsize -from weakref import WeakKeyDictionary - -import h5py -import numpy as np -from calibration_client import CalibrationClient -from calibration_client.modules import ( - Calibration, - CalibrationConstantVersion, - Detector, - Parameter, - PhysicalDetectorUnit, -) - -__all__ = [ - 'CalCatError', - 'CalibrationData', - 'AGIPD_CalibrationData', - 'LPD_CalibrationData', - 'DSSC_CalibrationData', - 'JUNGFRAU_CalibrationData', - 'PNCCD_CalibrationData', - 'EPIX100_CalibrationData', - 'Gotthard2_CalibrationData' -] - - -class CCVMetadata(dict): - """Dictionary for CCV metadata. - - Identical to a regular dict, but with a custom pandas-based - string representation to be easier to read. - """ - - def __str__(self): - """Pretty-print CCV metadata using pandas.""" - - import pandas as pd - - res = {pdu_idx: {calibration: ccv_data['ccv_name'] - for calibration, ccv_data in pdu_data.items()} - for pdu_idx, pdu_data in self.items()} - - return str(pd.DataFrame.from_dict(res, orient='index')) - - -class CalCatError(Exception): - """CalCat API error.""" - - def __init__(self, response): - super().__init__(response['info']) - - -class ClientWrapper(type): - """Metaclass to wrap each calibration_client exactly once.""" - - _clients = WeakKeyDictionary() - - def __call__(cls, client): - instance = cls._clients.get(client, None) - - if instance is None: - instance = cls._clients[client] = type.__call__(cls, client) - - return instance - - -class CalCatApi(metaclass=ClientWrapper): - """Internal calibration_client wrapper.""" - - get_detector_keys = [ - 'id', 'name', 'identifier', 'karabo_name', - 'karabo_id_control', 'description'] - get_pdu_keys = [ - 'id', 'physical_name', 'karabo_da', 'virtual_device_name', - 'detector_type_id', 'detector_id', 'description'] - - def __init__(self, client): - self.client = client - - @classmethod - def format_time(cls, dt): - """Parse different ways to specify time to CalCat.""" - - if isinstance(dt, datetime): - return dt.astimezone(timezone.utc).strftime('%Y%m%dT%H%M%S%Z') - elif isinstance(dt, date): - return cls.format_time(datetime.combine(dt, time())) - - return dt - - def format_cond(self, condition): - """Encode operating condition to CalCat API format. - - Args: - caldata (CalibrationData): Calibration data instance used to - interface with database. - - Returns: - (dict) Operating condition for use in CalCat API. - """ - - return {'parameters_conditions_attributes': [ - {'parameter_id': self.parameter_id(k), 'value': str(v)} - for k, v in condition.items()]} - - @lru_cache() - def detector(self, detector_name): - """Detector metadata.""" - - resp_detector = Detector.get_by_identifier( - self.client, detector_name) - - if not resp_detector['success']: - raise CalCatError(resp_detector) - - return {k: resp_detector['data'][k] for k in self.get_detector_keys} - - @lru_cache() - def physical_detector_units(self, detector_id, snapshot_at): - """Physical detector unit metadata.""" - - resp_pdus = PhysicalDetectorUnit.get_all_by_detector( - self.client, detector_id, self.format_time(snapshot_at)) - - if not resp_pdus['success']: - raise CalCatError(resp_pdus) - - return {int(pdu['karabo_da'][-2:]): { - k: pdu[k] for k in self.get_pdu_keys} - for pdu in resp_pdus['data']} - - @lru_cache() - def calibration_id(self, calibration_name): - """ID for a calibration in CalCat.""" - - resp_calibration = Calibration.get_by_name( - self.client, calibration_name) - - if not resp_calibration['success']: - raise CalCatError(resp_calibration) - - return resp_calibration['data']['id'] - - @lru_cache() - def parameter_id(self, param_name): - """ID for an operating condition parameter in CalCat.""" - - resp_parameter = Parameter.get_by_name(self.client, param_name) - - if not resp_parameter['success']: - raise CalCatError(resp_parameter) - - return resp_parameter['data']['id'] - - def closest_ccv_by_time_by_condition( - self, detector_name, calibrations, condition, - modules=None, event_at=None, snapshot_at=None, metadata=None, - ): - """Query bulk CCV metadata from CalCat. - - This method uses the /get_closest_version_by_detector API - to query matching CCVs for PDUs connected to a detector instance - in one go. In particular, it automatically includes the PDU as - an operating condition parameter to allow for a single global - condition rather than PDU-specific ones. - - Args: - detector_name (str): Detector instance name. - calibrations (Iterable of str): Calibrations to query - metadata for. - condition (dict): Mapping of parameter name to value. - modules (Collection of int or None): List of module numbers - or None for all (default). - event_at (datetime, date, str or None): Time at which the - CCVs should have been valid or None for now (default). - snapshot_at (datetime, date, str or None): Time of database - state to look at or None for now (default). - metadata (dict or None): Mapping to fill for results or - None for a new dictionary (default). - - Returns: - (dict) Nested mapping of module number to calibrations to - CCV metadata. Identical to passed metadata argument if - passed. - """ - event_at = self.format_time(event_at) - snapshot_at = self.format_time(snapshot_at) - - # Map aggregator to module number. - da_to_modno = { - data['karabo_da']: modno for modno, data in - self.physical_detector_units( - self.detector(detector_name)['id'], snapshot_at).items() - if not modules or modno in modules} - - if metadata is None: - metadata = CCVMetadata() - - if not calibrations: - # Make sure there are at least empty dictionaries for each - # module. - for modno in da_to_modno.values(): - metadata.setdefault(modno, dict()) - return metadata - - # Map calibration ID to calibratio name. - cal_id_map = {self.calibration_id(calibration): calibration - for calibration in calibrations} - calibration_ids = list(cal_id_map.keys()) - - # The API call supports a single module or all modules, as the - # performance increase is only minor in between. Hence, all - # modules are queried if more than one is selected and filtered - # afterwards, if necessary. - karabo_da = next(iter(da_to_modno)) if len(da_to_modno) == 1 else '', - resp_versions = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions( # noqa - self.client, detector_name, calibration_ids, - self.format_cond(condition), - karabo_da=karabo_da, - event_at=event_at, snapshot_at=snapshot_at) - - if not resp_versions['success']: - raise CalCatError(resp_versions) - - for ccv in resp_versions['data']: - try: - modno = da_to_modno[ccv['physical_detector_unit']['karabo_da']] - except KeyError: - # Not included in our modules - continue - - cc = ccv['calibration_constant'] - metadata.setdefault( - modno, dict())[cal_id_map[cc['calibration_id']]] = dict( - cc_id=cc['id'], - cc_name=cc['name'], - condition_id=cc['condition_id'], - ccv_id=ccv['id'], - ccv_name=ccv['name'], - path=Path(ccv['path_to_file']) / ccv['file_name'], - dataset=ccv['data_set_name'], - begin_validity_at=ccv['begin_validity_at'], - end_validity_at=ccv['end_validity_at'], - raw_data_location=ccv['raw_data_location'], - start_idx=ccv['start_idx'], - end_idx=ccv['end_idx'], - physical_name=ccv['physical_detector_unit']['physical_name'], - ) - - return metadata - - -class CalibrationData: - """Calibration constants data for detectors. - - European XFEL uses a web app and database to store records about the - characterization of detectors and the data necessary to their - correction and analysis, collectively called CalCat. The default - installation is available at https://in.xfel.eu/calibration. - - A detector is identified by a name (e.g. SPB_DET_AGIPD1M-1) and - consists of one or more detector modules. The modules are a virtual - concept and may be identified by their number (e.g. 3), the Karabo - data aggregator in EuXFEL's DAQ system they're connected to - (e.g. AGIPD05) or a virtual device name describing their relative - location (e.g. Q3M2). - - A detector module is mapped to an actual physical detector unit - (PDU), which may be changed in case of a physical replacement. When - characterization data is inserted into the database, it is attached - to the PDU currently mapped to a module and not the virtual module - itself. - - Characterization data is organized by its type just called - calibration (e.g. Offset or SlopesFF) and the operating condition it - was taken in, which is a mapping of parameter keys to their values - (e.g. Sensor bias voltage or integration time). Any unique - combination of calibration (type) and operating condition is a - calibration constant (CC). Any individual measurement of a CC is - called a calibration constant version (CCV). There may be many CCVs - for any given CC. - - Note that while an authenticated connection to CalCat is possible - from anywhere, the actual calibration data referred to is only - available on the European XFEL computing infrastructure. If no - explicit credentials are supplied, an anonymous read-only connection - is established that is also only available from there. - """ - - calibrations = set() - default_client = None - - def __init__(self, detector_name, modules=None, client=None, event_at=None, - snapshot_at=None): - """Initialize a new CalibrationData object. - - If no calibration-client object is passed or has been created - using Calibration.new_client, an anonymous read-only connection - is established automatically. - - Args: - detector_name (str): Name of detector in CalCat. - modules (Iterable of int, optional): Module numbers to - query for or None for all available (default). - client (CalibrationClient, optional): Client for CalCat - communication, global one by default. - event_at (datetime, date, str or None): Default time at which the - CCVs should have been valid, now if omitted - snapshot_at (datetime, date, str or None): Default time of - database state to look at, now if omitted. - **condition_params: Operating condition parameters defined - on an instance level. - """ - - self.detector_name = detector_name - self.modules = modules - self.event_at = event_at - self.snapshot_at = snapshot_at - - if client is None: - client = self.__class__.default_client or \ - self.__class__.new_anonymous_client() - - self._api = CalCatApi(client) - - @staticmethod - def new_anonymous_client(): - """Create an anonymous calibration-client object. - - This connection allows read-only access to CalCat using a - facility-proveded OAuth reverse proxy. This is only accessible - on the European XFEL computing infrastructure. - """ - - print('Access to CalCat via the XFEL OAuth proxy is currently ' - 'considered in testing, please report any issues to ' - 'da-support@xfel.eu') - return CalibrationData.new_client( - None, None, None, use_oauth2=False, - base_url='http://exflcalproxy:8080/') - - @staticmethod - def new_client( - client_id, client_secret, user_email, installation='', - base_url='https://in.xfel.eu/{}calibration', **kwargs, - ): - """Create a new calibration-client object. - - The client object is saved as a class property and is - automatically to any future CalibrationData objects created, if - no other client is passed explicitly. - - Arguments: - client_id (str): Client ID. - client_secret (str): Client secret. - user_email (str): LDAP user email. - installation (str, optional): Prefix for CalCat - installation, production system by default. - base_url (str, optional): URL template for CalCat - installation, public European XFEL by default. - Any further keyword arguments are passed on to - CalibrationClient.__init__(). - - Returns: - (CalibrationClient) CalCat client. - """ - - base_url = base_url.format(f'{installation}_' if installation else '') - - # Note this is not a classmethod and we're modifying - # CalibrationData directly to use the same object across all - # detector-specific implementations. - CalibrationData.default_client = CalibrationClient( - client_id=client_id, - client_secret=client_secret, - user_email=user_email, - base_api_url=f'{base_url}/api/', - token_url=f'{base_url}/oauth/token', - refresh_url=f'{base_url}/oauth/token', - auth_url=f'{base_url}/oauth/authorize', - scope='', - **kwargs - ) - return CalibrationData.default_client - - @property - def caldb_root(self): - """Root directory for calibration constant data. - - Returns: - (Path or None) Location of caldb store or - None if not available. - """ - - if not hasattr(CalibrationData, '_caldb_root'): - if getenv('SASE'): - # ONC - CalibrationData._caldb_root = Path('/common/cal/caldb_store') - elif re.match(r'^max-(.+)\.desy\.de$', socket.getfqdn()): - # Maxwell - CalibrationData._caldb_root = Path( - '/gpfs/exfel/d/cal/caldb_store') - else: - # Probably unavailable - CalibrationData._caldb_root = None - - return CalibrationData._caldb_root - - @property - def client(self): - return self._api.client - - @property - def detector(self): - return self._api.detector(self.detector_name) - - @property - def physical_detector_units(self): - return self._api.physical_detector_units( - self.detector['id'], self.snapshot_at) - - @property - def condition(self): - return self._build_condition(self.parameters) - - def replace(self, **new_kwargs): - """Create a new CalibrationData object with altered values.""" - - keys = { - 'detector_name', 'modules', 'client', 'event_at', 'snapshot_at' - } | { - self._simplify_parameter_name(name)for name in self.parameters - } - - kwargs = {key: getattr(self, key) for key in keys} - kwargs.update(new_kwargs) - - return self.__class__(**kwargs) - - def metadata( - self, calibrations=None, event_at=None, snapshot_at=None, - ): - """Query CCV metadata for calibrations, conditions and time. - - Args: - calibrations (Iterable of str, optional): Calibrations to - query metadata for, may be None to retrieve all. - event_at (datetime, date, str or None): Time at which the - CCVs should have been valid, now or default value passed at - initialization time if omitted. - snapshot_at (datetime, date, str or None): Time of database - state to look at, now or default value passed at - initialization time if omitted. - - Returns: - (CCVMetadata) CCV metadata result. - """ - - metadata = CCVMetadata() - self._api.closest_ccv_by_time_by_condition( - self.detector_name, calibrations or self.calibrations, - self.condition, self.modules, - event_at or self.event_at, snapshot_at or self.snapshot_at, - metadata) - - return metadata - - def ndarray( - self, module, calibration, metadata=None, - ): - """Load CCV data as ndarray. - - Args: - module (int): Module number - calibration (str): Calibration constant. - metadata (CCVMetadata, optional): CCV metadata to load - constant data for, may be None to query metadata. - - Returns: - (ndarray): CCV data - """ - - if self.caldb_root is None: - raise RuntimeError('calibration database store unavailable') - - if self.modules and module not in self.modules: - raise ValueError('module not part of this calibration data') - - if metadata is None: - metadata = self.metadata([calibration]) - - row = metadata[module][calibration] - - with h5py.File(self.caldb_root / row['path'], 'r') as f: - return np.asarray(f[row['dataset'] + '/data']) - - def ndarray_map( - self, calibrations=None, metadata=None, processes=None, - ): - """Load all CCV data in a nested map of ndarrays. - - Args: - calibrations (Iterable of str, optional): Calibration constants - or None for all available (default). - metadata (CCVMetadata, optional): CCV metadata to load constant - for or None to query metadata automatically (default). - - Returns: - (dict of dict of ndarray): CCV data by module number and - calibration constant name. - """ - - if self.caldb_root is None: - raise RuntimeError('calibration database store unavailable') - - if metadata is None: - metadata = self.metadata(calibrations) - - map_arg = [ - (modno, cname) for modno, mdata in metadata.items() for cname in mdata.keys()] # noqa - - with multiprocessing.pool.ThreadPool(processes=processes) as pool: - r = pool.starmap(self.ndarray, map_arg) - - arr_map = {} - for i, (modno, cname) in enumerate(map_arg): - arr_map.setdefault(modno, {})[cname] = r[i] - return arr_map - - def _build_condition(self, parameters): - cond = dict() - - for db_name in parameters: - value = getattr(self, self._simplify_parameter_name(db_name), None) - - if value is not None: - cond[db_name] = value - - return cond - - @classmethod - def _from_multimod_detector_data( - cls, component_cls, data, detector, - modules, client, - ): - if isinstance(detector, component_cls): - detector_name = detector.detector_name - elif detector is None: - detector_name = component_cls._find_detector_name(data) - elif isinstance(detector, str): - detector_name = detector - else: - raise ValueError(f'detector may be an object of type ' - f'{type(cls)}, a string or None') - - source_to_modno = dict(component_cls._source_matches( - data, detector_name)) - detector_sources = [data[source] for source in source_to_modno.keys()] - - if modules is None: - modules = sorted(source_to_modno.values()) - - creation_date = cls._determine_data_creation_date(data) - - # Create new CalibrationData object. - caldata = cls( - detector_name, modules, client, - creation_date, creation_date, - ) - - caldata.memory_cells = component_cls._get_memory_cell_count( - detector_sources[0]) - caldata.pixels_x = component_cls.module_shape[1] - caldata.pixels_y = component_cls.module_shape[0] - - return caldata, detector_sources - - @staticmethod - def _simplify_parameter_name(name): - """Convert parameter names to valid Python symbols.""" - - return name.lower().replace(' ', '_') - - @staticmethod - def _determine_data_creation_date(data): - """Determine data creation date.""" - - assert data.files, 'data contains no files' - - try: - creation_date = data.files[0].metadata()['creationDate'] - except KeyError: - from warnings import warn - warn('Last file modification time used as creation date for old ' - 'DAQ file format may be unreliable') - - return datetime.fromtimestamp( - Path(data.files[0].filename).lstat().st_mtime) - else: - if not data.is_single_run: - from warnings import warn - warn('Sample file used to determine creation date for multi ' - 'run data') - - return creation_date - - -class SplitConditionCalibrationData(CalibrationData): - """Calibration data with dark and illuminated conditions. - - Some detectors of this kind distinguish between two different - operating conditions depending on whether photons illuminate the - detector or not, correspondingly called the illuminated and dark - conditions. Typically the illuminated condition is a superset of the - dark condition. - - Not all implementations for semiconductor detectors inherit from - this type, but only those that make this distinction such as AGIPD - and LPD. - """ - - dark_calibrations = set() - illuminated_calibrations = set() - dark_parameters = list() - illuminated_parameters = list() - - @property - def calibrations(self): - """Compatibility with CalibrationData.""" - - return self.dark_calibrations | self.illuminated_calibrations - - @property - def parameters(self): - """Compatibility with CalibrationData.""" - - # Removes likely duplicates while preserving order. - return list(dict.fromkeys( - self.dark_parameters + self.illuminated_parameters)) - - @property - def condition(self): - """Compatibility with CalibrationData.""" - - cond = dict() - cond.update(self.dark_condition) - cond.update(self.illuminated_condition) - - return cond - - @property - def dark_condition(self): - return self._build_condition(self.dark_parameters) - - @property - def illuminated_condition(self): - return self._build_condition(self.illuminated_parameters) - - def metadata( - self, calibrations=None, event_at=None, snapshot_at=None, - ): - """Query CCV metadata for calibrations, conditions and time. - - Args: - calibrations (Iterable of str, optional): Calibrations to - query metadata for, may be None to retrieve all. - event_at (datetime, date, str or None): Time at which the - CCVs should have been valid, now or default value passed at - initialization time if omitted. - snapshot_at (datetime, date, str or None): Time of database - state to look at, now or default value passed at - initialization time if omitted. - - Returns: - (CCVMetadata) CCV metadata result. - """ - - if calibrations is None: - calibrations = ( - self.dark_calibrations | self.illuminated_calibrations) - - metadata = CCVMetadata() - - dark_calibrations = self.dark_calibrations & set(calibrations) - if dark_calibrations: - self._api.closest_ccv_by_time_by_condition( - self.detector_name, dark_calibrations, - self.dark_condition, self.modules, - event_at or self.event_at, snapshot_at or self.snapshot_at, - metadata) - - illum_calibrations = self.illuminated_calibrations & set(calibrations) - if illum_calibrations: - self._api.closest_ccv_by_time_by_condition( - self.detector_name, illum_calibrations, - self.illuminated_condition, self.modules, - event_at or self.event_at, snapshot_at or self.snapshot_at, - metadata) - - return metadata - - -class AGIPD_CalibrationData(SplitConditionCalibrationData): - """Calibration data for the AGIPD detector.""" - - dark_calibrations = { - 'Offset', - 'Noise', - 'ThresholdsDark', - 'BadPixelsDark', - 'BadPixelsPC', - 'SlopesPC', - } - illuminated_calibrations = { - 'BadPixelsFF', - 'SlopesFF', - } - - dark_parameters = [ - 'Sensor Bias Voltage', - 'Pixels X', - 'Pixels Y', - 'Memory cells', - 'Acquisition rate', - 'Gain setting', - 'Gain mode', - 'Integration time', - ] - illuminated_parameters = dark_parameters + ['Source energy'] - - def __init__( - self, detector_name, sensor_bias_voltage, - memory_cells, acquisition_rate, - modules=None, client=None, - event_at=None, snapshot_at=None, - gain_setting=None, gain_mode=None, - integration_time=12, source_energy=9.2, - pixels_x=512, pixels_y=128, - ): - super().__init__( - detector_name, modules, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.memory_cells = memory_cells - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.acquisition_rate = acquisition_rate - self.gain_setting = gain_setting - self.gain_mode = gain_mode - self.integration_time = integration_time - self.source_energy = source_energy - - def _build_condition(self, parameters): - cond = super()._build_condition(parameters) - - # Fix-up some database quirks. - if int(cond.get('Gain mode', -1)) == 0: - del cond['Gain mode'] - - if int(cond.get('Integration time', -1)) == 12: - del cond['Integration time'] - - return cond - - -class LPD_CalibrationData(SplitConditionCalibrationData): - """Calibration data for the LPD detector.""" - - dark_calibrations = { - 'Offset', - 'Noise', - 'BadPixelsDark', - } - illuminated_calibrations = { - 'RelativeGain', - 'GainAmpMap', - 'FFMap', - 'BadPixelsFF', - } - - dark_parameters = [ - 'Sensor Bias Voltage', - 'Memory cells', - 'Pixels X', - 'Pixels Y', - 'Feedback capacitor', - ] - illuminated_parameters = dark_parameters + ['Source Energy', 'category'] - - def __init__( - self, detector_name, sensor_bias_voltage, - memory_cells, feedback_capacitor=5.0, - pixels_x=256, pixels_y=256, - source_energy=9.2, category=1, - modules=None, client=None, - event_at=None, snapshot_at=None, - ): - super().__init__( - detector_name, modules, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.memory_cells = memory_cells - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.feedback_capacitor = feedback_capacitor - self.source_energy = source_energy - self.category = category - - -class DSSC_CalibrationData(CalibrationData): - """Calibration data for the DSSC detetor.""" - - calibrations = { - 'Offset', - 'Noise', - } - parameters = [ - 'Sensor Bias Voltage', - 'Memory cells', - 'Pixels X', - 'Pixels Y', - 'Pulse id checksum', - 'Acquisition rate', - 'Target gain', - 'Encoded gain', - ] - - def __init__( - self, detector_name, - sensor_bias_voltage, memory_cells, - pulse_id_checksum=None, acquisition_rate=None, - target_gain=None, encoded_gain=None, - pixels_x=512, pixels_y=128, - modules=None, client=None, - event_at=None, snapshot_at=None, - ): - super().__init__( - detector_name, modules, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.memory_cells = memory_cells - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.pulse_id_checksum = pulse_id_checksum - self.acquisition_rate = acquisition_rate - self.target_gain = target_gain - self.encoded_gain = encoded_gain - - -class JUNGFRAU_CalibrationData(CalibrationData): - """Calibration data for the JUNGFRAU detector.""" - - calibrations = { - 'Offset10Hz', - 'Noise10Hz', - 'BadPixelsDark10Hz', - 'RelativeGain10Hz', - 'BadPixelsFF10Hz', - } - parameters = [ - 'Sensor Bias Voltage', - 'Memory Cells', - 'Pixels X', - 'Pixels Y', - 'Integration Time', - 'Sensor temperature', - 'Gain Setting', - ] - - # class GainSetting(Enum): - # dynamicgain = 0 - # dynamichg0 = 1 - - def __init__( - self, detector_name, sensor_bias_voltage, - memory_cells, integration_time, - sensor_temperature, gain_setting, - pixels_x=1024, pixels_y=512, - modules=None, client=None, - event_at=None, snapshot_at=None, - ): - super().__init__( - detector_name, modules, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.memory_cells = memory_cells - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.integration_time = integration_time - self.sensor_temperature = sensor_temperature - self.gain_setting = gain_setting - - -class PNCCD_CalibrationData(CalibrationData): - calibrations = { - 'OffsetCCD', - 'BadPixelsDarkCCD', - 'NoiseCCD', - 'RelativeGainCCD', - 'CTECCD', - } - parameters = [ - 'Sensor Bias Voltage', - 'Memory cells', - 'Pixels X', - 'Pixels Y', - 'Integration Time', - 'Sensor Temperature', - 'Gain Setting', - ] - - def __init__( - self, detector_name, sensor_bias_voltage, - integration_time, sensor_temperature, - gain_setting, pixels_x=1024, - pixels_y=1024, client=None, - event_at=None, snapshot_at=None, - ): - # Ignore modules for this detector. - super().__init__( - detector_name, None, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.memory_cells = 1 # Ignore memory_cells for this detector - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.integration_time = integration_time - self.sensor_temperature = sensor_temperature - self.gain_setting = gain_setting - - -class EPIX100_CalibrationData(SplitConditionCalibrationData): - dark_calibrations = { - 'OffsetEPix100', - 'NoiseEPix100', - 'BadPixelsDarkEPix100', - } - illuminated_calibrations = { - 'RelativeGainEPix100', - #'BadPixelsFFEPix100', - } - dark_parameters = [ - 'Sensor Bias Voltage', - 'Memory cells', - 'Pixels X', - 'Pixels Y', - 'Integration time', - 'Sensor temperature', - 'In vacuum', - ] - illuminated_parameters = dark_parameters + ['Source energy'] - - def __init__( - self, detector_name, - sensor_bias_voltage, integration_time, - in_vacuum=0, sensor_temperature=288, - pixels_x=708, pixels_y=768, - source_energy=9.2, client=None, - event_at=None, snapshot_at=None, - ): - # Ignore modules for this detector. - super().__init__( - detector_name, None, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.integration_time = integration_time - self.memory_cells = 1 # Ignore memory_cells for this detector - self.pixels_x = pixels_x - self.pixels_y = pixels_y - self.in_vacuum = in_vacuum - self.sensor_temperature = sensor_temperature - self.source_energy = source_energy - - -class GOTTHARD2_CalibrationData(CalibrationData): - calibrations = { - 'LUTGotthard2' - 'OffsetGotthard2', - 'NoiseGotthard2', - 'BadPixelsDarkGotthard2', - 'RelativeGainGotthard2', - 'BadPixelsFFGotthard2', - } - parameters = [ - 'Sensor Bias Voltage', - 'Memory cells', - 'Pixels X', - 'Pixels Y', - 'Integration time', - 'Sensor temperature', - 'Gain setting', - ] - - def __init__( - self, detector_name, - sensor_bias_voltage, exposure_time, - exposure_period, acquisition_rate, - single_photon, client=None, - event_at=None, snapshot_at=None, - ): - # Ignore modules for this detector. - super().__init__( - detector_name, None, client, event_at, snapshot_at, - ) - - self.sensor_bias_voltage = sensor_bias_voltage - self.exposure_time = exposure_time - self.exposure_period = exposure_period - self.acquisition_rate = acquisition_rate - self.single_photon = single_photon