"""Interfaces to calibration constant data.""" import re import socket from datetime import date, datetime, time, timezone from functools import lru_cache from os import getenv from pathlib import Path from weakref import WeakKeyDictionary import h5py import pasha as psh from calibration_client import CalibrationClient from calibration_client.modules import ( Calibration, CalibrationConstantVersion, Detector, Parameter, PhysicalDetectorUnit, ) from cal_tools.tools import module_index_to_qm __all__ = [ "CalCatError", "CalibrationData", "AGIPD_CalibrationData", "LPD_CalibrationData", "DSSC_CalibrationData", "JUNGFRAU_CalibrationData", "PNCCD_CalibrationData", "EPIX100_CalibrationData", "GOTTHARD2_CalibrationData", ] class CCVMetadata(dict): """Dictionary for CCV metadata. Identical to a regular dict, but with a custom pandas-based string representation to be easier to read. """ def __str__(self): """Pretty-print CCV metadata using pandas.""" import pandas as pd res = { pdu_idx: { calibration: ccv_data["ccv_name"] for calibration, ccv_data in pdu_data.items() } for pdu_idx, pdu_data in self.items() } return str(pd.DataFrame.from_dict(res, orient="index")) class CalCatError(Exception): """CalCat API error.""" def __init__(self, response): super().__init__(response["info"]) class ClientWrapper(type): """Metaclass to wrap each calibration_client exactly once.""" _clients = WeakKeyDictionary() def __call__(cls, client): instance = cls._clients.get(client, None) if instance is None: instance = cls._clients[client] = type.__call__(cls, client) return instance class CalCatApi(metaclass=ClientWrapper): """Internal calibration_client wrapper.""" get_detector_keys = [ "id", "name", "identifier", "karabo_name", "karabo_id_control", "description", ] get_pdu_keys = [ "id", "physical_name", "karabo_da", "virtual_device_name", "detector_type_id", "detector_id", "description", ] def __init__(self, client): self.client = client @classmethod def format_time(cls, dt): """Parse different ways to specify time to CalCat.""" if isinstance(dt, datetime): return dt.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%S%Z") elif isinstance(dt, date): return cls.format_time(datetime.combine(dt, time())) return dt def format_cond(self, condition): """Encode operating condition to CalCat API format. Args: caldata (CalibrationData): Calibration data instance used to interface with database. Returns: (dict) Operating condition for use in CalCat API. """ return { "parameters_conditions_attributes": [ {"parameter_id": self.parameter_id(k), "value": str(v)} for k, v in condition.items() ] } @lru_cache() def detector(self, detector_name): """Detector metadata.""" resp_detector = Detector.get_by_identifier(self.client, detector_name) if not resp_detector["success"]: raise CalCatError(resp_detector) return {k: resp_detector["data"][k] for k in self.get_detector_keys} @lru_cache() def physical_detector_units( self, detector_id, snapshot_at, module_naming="da", ): """Physical detector unit metadata.""" resp_pdus = PhysicalDetectorUnit.get_all_by_detector( self.client, detector_id, self.format_time(snapshot_at) ) if not resp_pdus["success"]: raise CalCatError(resp_pdus) # Create dict based on requested keys: karabo_da, module number, # or QxMx naming convention. if module_naming == "da": return { pdu["karabo_da"]: {k: pdu[k] for k in self.get_pdu_keys} for pdu in resp_pdus["data"] } elif module_naming == "modno": return { int(pdu["karabo_da"][-2:]): { k: pdu[k] for k in self.get_pdu_keys } for pdu in resp_pdus["data"] } elif module_naming == "qm": return { module_index_to_qm(int(pdu["karabo_da"][-2:])): { k: pdu[k] for k in self.get_pdu_keys } for pdu in resp_pdus["data"] } else: raise ValueError(f"{module_naming} is unknown!") @lru_cache() def calibration_id(self, calibration_name): """ID for a calibration in CalCat.""" resp_calibration = Calibration.get_by_name( self.client, calibration_name ) if not resp_calibration["success"]: raise CalCatError(resp_calibration) return resp_calibration["data"]["id"] @lru_cache() def parameter_id(self, param_name): """ID for an operating condition parameter in CalCat.""" resp_parameter = Parameter.get_by_name(self.client, param_name) if not resp_parameter["success"]: raise CalCatError(resp_parameter) return resp_parameter["data"]["id"] def closest_ccv_by_time_by_condition( self, detector_name, calibrations, condition, modules=None, event_at=None, snapshot_at=None, metadata=None, module_naming="da", ): """Query bulk CCV metadata from CalCat. This method uses the /get_closest_version_by_detector API to query matching CCVs for PDUs connected to a detector instance in one go. In particular, it automatically includes the PDU as an operating condition parameter to allow for a single global condition rather than PDU-specific ones. Args: detector_name (str): Detector instance name. calibrations (Iterable of str): Calibrations to query metadata for. condition (dict): Mapping of parameter name to value. modules (Collection of int or None): List of module numbers or None for all (default). event_at (datetime, date, str or None): Time at which the CCVs should have been valid or None for now (default). snapshot_at (datetime, date, str or None): Time of database state to look at or None for now (default). metadata (dict or None): Mapping to fill for results or None for a new dictionary (default). module_naming (str or None): Expected module name convention to be used as metadata dict keys. Expected values are: `da`: data aggregator name is used. Default. `modno`: module index is used. Index is chosen based on last 2 integers in karabo_da. `qm`: QxMx naming convention is used. Virtual names for AGIPD, DSSC, and LPD. Returns: (dict) Nested mapping of module number to calibrations to CCV metadata. Identical to passed metadata argument if passed. """ event_at = self.format_time(event_at) snapshot_at = self.format_time(snapshot_at) if metadata is None: metadata = CCVMetadata() if not calibrations: # Make sure there are at least empty dictionaries for each # module. for mod in modules.values(): metadata.setdefault(mod, dict()) return metadata # Map calibration ID to calibration name. cal_id_map = { self.calibration_id(calibration): calibration for calibration in calibrations } calibration_ids = list(cal_id_map.keys()) # The API call supports a single module or all modules, as the # performance increase is only minor in between. Hence, all # modules are queried if more than one is selected and filtered # afterwards, if necessary. karabo_da = ( next(iter(modules)) if modules is not None and len(modules) == 1 else "", ) # noqa resp_versions = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions( # noqa self.client, detector_name, calibration_ids, self.format_cond(condition), karabo_da=karabo_da, event_at=event_at, snapshot_at=snapshot_at, ) if not resp_versions["success"]: raise CalCatError(resp_versions) for ccv in resp_versions["data"]: try: if module_naming == "da": mod = ccv["physical_detector_unit"]["karabo_da"] # Can be used for AGIPD, LPD, and DSSC. elif module_naming == "qm": mod = module_index_to_qm( int(ccv["physical_detector_unit"]["karabo_da"][-2:]) ) elif module_naming == "modno": mod = int(ccv["physical_detector_unit"]["karabo_da"][-2:]) else: raise ValueError(f"{module_naming} is unknown!") except KeyError: # Not included in our modules continue cc = ccv["calibration_constant"] metadata.setdefault(mod, dict())[ cal_id_map[cc["calibration_id"]] ] = dict( cc_id=cc["id"], cc_name=cc["name"], condition_id=cc["condition_id"], ccv_id=ccv["id"], ccv_name=ccv["name"], path=Path(ccv["path_to_file"]) / ccv["file_name"], dataset=ccv["data_set_name"], begin_validity_at=ccv["begin_validity_at"], end_validity_at=ccv["end_validity_at"], raw_data_location=ccv["raw_data_location"], start_idx=ccv["start_idx"], end_idx=ccv["end_idx"], physical_name=ccv["physical_detector_unit"]["physical_name"], ) return metadata class CalibrationData: """Calibration constants data for detectors. European XFEL uses a web app and database to store records about the characterization of detectors and the data necessary to their correction and analysis, collectively called CalCat. The default installation is available at https://in.xfel.eu/calibration. A detector is identified by a name (e.g. SPB_DET_AGIPD1M-1) and consists of one or more detector modules. The modules are a virtual concept and may be identified by their number (e.g. 3), the Karabo data aggregator in EuXFEL's DAQ system they're connected to (e.g. AGIPD05) or a virtual device name describing their relative location (e.g. Q3M2). A detector module is mapped to an actual physical detector unit (PDU), which may be changed in case of a physical replacement. When characterization data is inserted into the database, it is attached to the PDU currently mapped to a module and not the virtual module itself. Characterization data is organized by its type just called calibration (e.g. Offset or SlopesFF) and the operating condition it was taken in, which is a mapping of parameter keys to their values (e.g. Sensor bias voltage or integration time). Any unique combination of calibration (type) and operating condition is a calibration constant (CC). Any individual measurement of a CC is called a calibration constant version (CCV). There may be many CCVs for any given CC. Note that while an authenticated connection to CalCat is possible from anywhere, the actual calibration data referred to is only available on the European XFEL computing infrastructure. If no explicit credentials are supplied, an anonymous read-only connection is established that is also only available from there. """ calibrations = set() default_client = None def __init__( self, detector_name, modules=None, client=None, event_at=None, snapshot_at=None, module_naming="da", ): """Initialize a new CalibrationData object. If no calibration-client object is passed or has been created using Calibration.new_client, an anonymous read-only connection is established automatically. Args: detector_name (str): Name of detector in CalCat. modules (Iterable of int, optional): Module numbers to query for or None for all available (default). client (CalibrationClient, optional): Client for CalCat communication, global one by default. event_at (datetime, date, str or None): Default time at which the CCVs should have been valid, now if omitted snapshot_at (datetime, date, str or None): Default time of database state to look at, now if omitted. module_naming (str or None): Expected module name convention to be used as metadata dict keys. Expected values are: `da`: data aggregator name is used. Default. `modno`: module index is used. Index is chosen based on last 2 integers in karabo_da. `qm`: QxMx naming convention is used. Virtual names for AGIPD, DSSC, and LPD. **condition_params: Operating condition parameters defined on an instance level. """ self.detector_name = detector_name self.modules = modules self.event_at = event_at self.snapshot_at = snapshot_at self.module_naming = module_naming if client is None: client = ( self.__class__.default_client or self.__class__.new_anonymous_client() ) self._api = CalCatApi(client) @staticmethod def new_anonymous_client(): """Create an anonymous calibration-client object. This connection allows read-only access to CalCat using a facility-provided OAuth reverse proxy. This is only accessible on the European XFEL computing infrastructure. """ print( "Access to CalCat via the XFEL OAuth proxy is currently " "considered in testing, please report any issues to " "da-support@xfel.eu" ) return CalibrationData.new_client( None, None, None, use_oauth2=False, base_url="http://exflcalproxy:8080/", ) @staticmethod def new_client( client_id, client_secret, user_email, installation="", base_url="https://in.xfel.eu/{}calibration", **kwargs, ): """Create a new calibration-client object. The client object is saved as a class property and is automatically to any future CalibrationData objects created, if no other client is passed explicitly. Arguments: client_id (str): Client ID. client_secret (str): Client secret. user_email (str): LDAP user email. installation (str, optional): Prefix for CalCat installation, production system by default. base_url (str, optional): URL template for CalCat installation, public European XFEL by default. Any further keyword arguments are passed on to CalibrationClient.__init__(). Returns: (CalibrationClient) CalCat client. """ base_url = base_url.format(f"{installation}_" if installation else "") # Note this is not a classmethod and we're modifying # CalibrationData directly to use the same object across all # detector-specific implementations. CalibrationData.default_client = CalibrationClient( client_id=client_id, client_secret=client_secret, user_email=user_email, base_api_url=f"{base_url}/api/", token_url=f"{base_url}/oauth/token", refresh_url=f"{base_url}/oauth/token", auth_url=f"{base_url}/oauth/authorize", scope="", **kwargs, ) return CalibrationData.default_client @property def caldb_root(self): """Root directory for calibration constant data. Returns: (Path or None) Location of caldb store or None if not available. """ if not hasattr(CalibrationData, "_caldb_root"): if getenv("SASE"): # ONC CalibrationData._caldb_root = Path("/common/cal/caldb_store") elif re.match(r"^max-(.+)\.desy\.de$", socket.getfqdn()): # Maxwell CalibrationData._caldb_root = Path( "/gpfs/exfel/d/cal/caldb_store" ) else: # Probably unavailable CalibrationData._caldb_root = None return CalibrationData._caldb_root @property def client(self): return self._api.client @property def detector(self): return self._api.detector(self.detector_name) @property def physical_detector_units(self): return self._api.physical_detector_units( self.detector["id"], self.snapshot_at, self.module_naming ) @property def condition(self): return self._build_condition(self.parameters) def replace(self, **new_kwargs): """Create a new CalibrationData object with altered values.""" keys = { "detector_name", "modules", "client", "event_at", "snapshot_at", } | {self._simplify_parameter_name(name) for name in self.parameters} kwargs = {key: getattr(self, key) for key in keys} kwargs.update(new_kwargs) return self.__class__(**kwargs) def metadata( self, calibrations=None, event_at=None, snapshot_at=None, ): """Query CCV metadata for calibrations, conditions and time. Args: calibrations (Iterable of str, optional): Calibrations to query metadata for, may be None to retrieve all. event_at (datetime, date, str or None): Time at which the CCVs should have been valid, now or default value passed at initialization time if omitted. snapshot_at (datetime, date, str or None): Time of database state to look at, now or default value passed at initialization time if omitted. Returns: (CCVMetadata) CCV metadata result. """ metadata = CCVMetadata() self._api.closest_ccv_by_time_by_condition( self.detector_name, calibrations or self.calibrations, self.condition, self.modules, event_at or self.event_at, snapshot_at or self.snapshot_at, metadata, module_naming=self.module_naming, ) return metadata def ndarray( self, module, calibration, metadata, ): """Load CCV data as ndarray. Args: module (int): Module number calibration (str): Calibration constant. metadata (CCVMetadata, optional): CCV metadata to load constant data for, may be None to query metadata. Returns: (ndarray): CCV data """ if self.caldb_root is None: raise RuntimeError("calibration database store unavailable") if self.modules and module not in self.modules: raise ValueError("module not part of this calibration data") if metadata is None: metadata = self.metadata([calibration]) return self._load_ccv_data(metadata, module, calibration) def _allocate_constant_arrays(self, metadata, const_load_mp, const_data): for mod, ccv_entry in metadata.items(): const_data[mod] = {} for cname, mdata in ccv_entry.items(): dataset = mdata["dataset"] with h5py.File(self.caldb_root / mdata["path"], "r") as cf: shape = cf[f"{dataset}/data"].shape dtype = cf[f"{dataset}/data"].dtype const_data[mod][cname] = const_load_mp.alloc( shape=shape, dtype=dtype ) def load_constants_from_metadata(self, metadata): """Load the data for all constants in metadata object. Args: metadata (CCVMetadata, optional): CCV metadata to load constant data for, may be None to query metadata. Returns: (Dict): A dictionary of constant data. {module: {calibration: ndarray}}. """ def _load_constant_dataset(wid, index, mod): """Load constant dataset from the CCVMetadata `metadata` into a shared allocated array. Args: mod (str): module key in `metadata` object """ for cname, mdata in metadata[mod].items(): with h5py.File(self.caldb_root / mdata["path"], "r") as cf: cf[f"{mdata['dataset']}/data"].read_direct( const_data[mod][cname] ) const_data = dict() const_load_mp = psh.ProcessContext(num_workers=24) self._allocate_constant_arrays(metadata, const_load_mp, const_data) const_load_mp.map(_load_constant_dataset, list(metadata.keys())) return const_data def ndarray_map( self, calibrations=None, metadata=None, ): """Load all CCV data in a nested map of ndarrays. Args: calibrations (Iterable of str, optional): Calibration constants or None for all available (default). metadata (CCVMetadata, optional): CCV metadata to load constant for or None to query metadata automatically (default). Returns: (dict of dict of ndarray): CCV data by module number and calibration constant name. {module: {calibration: ndarray}} """ if self.caldb_root is None: raise RuntimeError("calibration database store unavailable") if metadata is None: metadata = self.metadata(calibrations) return self.load_constants_from_metadata(metadata) def _build_condition(self, parameters): cond = dict() for db_name in parameters: value = getattr(self, self._simplify_parameter_name(db_name), None) if value is not None: cond[db_name] = value return cond @classmethod def _from_multimod_detector_data( cls, component_cls, data, detector, modules, client, ): if isinstance(detector, component_cls): detector_name = detector.detector_name elif detector is None: detector_name = component_cls._find_detector_name(data) elif isinstance(detector, str): detector_name = detector else: raise ValueError( f"detector may be an object of type " f"{type(cls)}, a string or None" ) source_to_modno = dict( component_cls._source_matches(data, detector_name) ) detector_sources = [data[source] for source in source_to_modno.keys()] if modules is None: modules = sorted(source_to_modno.values()) creation_date = cls._determine_data_creation_date(data) # Create new CalibrationData object. caldata = cls( detector_name, modules, client, creation_date, creation_date, ) caldata.memory_cells = component_cls._get_memory_cell_count( detector_sources[0] ) caldata.pixels_x = component_cls.module_shape[1] caldata.pixels_y = component_cls.module_shape[0] return caldata, detector_sources @staticmethod def _simplify_parameter_name(name): """Convert parameter names to valid Python symbols.""" return name.lower().replace(" ", "_") @staticmethod def _determine_data_creation_date(data): """Determine data creation date.""" assert data.files, "data contains no files" try: creation_date = data.files[0].metadata()["creationDate"] except KeyError: from warnings import warning warning( "Last file modification time used as creation date for old " "DAQ file format may be unreliable" ) return datetime.fromtimestamp( Path(data.files[0].filename).lstat().st_mtime ) else: if not data.is_single_run: from warnings import warning warning( "Sample file used to determine creation date for multi " "run data" ) return creation_date class SplitConditionCalibrationData(CalibrationData): """Calibration data with dark and illuminated conditions. Some detectors of this kind distinguish between two different operating conditions depending on whether photons illuminate the detector or not, correspondingly called the illuminated and dark conditions. Typically the illuminated condition is a superset of the dark condition. Not all implementations for semiconductor detectors inherit from this type, but only those that make this distinction such as AGIPD and LPD. """ dark_calibrations = set() illuminated_calibrations = set() dark_parameters = list() illuminated_parameters = list() @property def calibrations(self): """Compatibility with CalibrationData.""" return self.dark_calibrations | self.illuminated_calibrations @property def parameters(self): """Compatibility with CalibrationData.""" # Removes likely duplicates while preserving order. return list( dict.fromkeys(self.dark_parameters + self.illuminated_parameters) ) @property def condition(self): """Compatibility with CalibrationData.""" cond = dict() cond.update(self.dark_condition) cond.update(self.illuminated_condition) return cond @property def dark_condition(self): return self._build_condition(self.dark_parameters) @property def illuminated_condition(self): return self._build_condition(self.illuminated_parameters) def metadata( self, calibrations=None, event_at=None, snapshot_at=None, ): """Query CCV metadata for calibrations, conditions and time. Args: calibrations (Iterable of str, optional): Calibrations to query metadata for, may be None to retrieve all. event_at (datetime, date, str or None): Time at which the CCVs should have been valid, now or default value passed at initialization time if omitted. snapshot_at (datetime, date, str or None): Time of database state to look at, now or default value passed at initialization time if omitted. Returns: (CCVMetadata) CCV metadata result. """ if calibrations is None: calibrations = ( self.dark_calibrations | self.illuminated_calibrations ) metadata = CCVMetadata() # Calibrations are sorted to ensure using exactly the same query # for multiple configuration. e.g. This is essential for calparrot. dark_calibrations = sorted( self.dark_calibrations & set(calibrations)) if dark_calibrations: self._api.closest_ccv_by_time_by_condition( self.detector_name, dark_calibrations, self.dark_condition, self.modules, event_at or self.event_at, snapshot_at or self.snapshot_at, metadata, module_naming=self.module_naming, ) illum_calibrations = sorted( self.illuminated_calibrations & set(calibrations)) if illum_calibrations: self._api.closest_ccv_by_time_by_condition( self.detector_name, illum_calibrations, self.illuminated_condition, self.modules, event_at or self.event_at, snapshot_at or self.snapshot_at, metadata, module_naming=self.module_naming, ) return metadata class AGIPD_CalibrationData(SplitConditionCalibrationData): """Calibration data for the AGIPD detector.""" dark_calibrations = { "Offset", "Noise", "ThresholdsDark", "BadPixelsDark", "BadPixelsPC", "SlopesPC", } illuminated_calibrations = { "BadPixelsFF", "SlopesFF", } dark_parameters = [ "Sensor Bias Voltage", "Pixels X", "Pixels Y", "Memory cells", "Acquisition rate", "Gain setting", "Gain mode", "Integration time", ] illuminated_parameters = dark_parameters + ["Source energy"] def __init__( self, detector_name, sensor_bias_voltage, memory_cells, acquisition_rate, modules=None, client=None, event_at=None, snapshot_at=None, gain_setting=None, gain_mode=None, module_naming="da", integration_time=12, source_energy=9.2, pixels_x=512, pixels_y=128, ): super().__init__( detector_name, modules, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.memory_cells = memory_cells self.pixels_x = pixels_x self.pixels_y = pixels_y self.acquisition_rate = acquisition_rate self.gain_setting = gain_setting self.gain_mode = gain_mode self.integration_time = integration_time self.source_energy = source_energy def _build_condition(self, parameters): cond = super()._build_condition(parameters) # Fix-up some database quirks. if int(cond.get("Gain mode", -1)) == 0: del cond["Gain mode"] if int(cond.get("Integration time", -1)) == 12: del cond["Integration time"] return cond class LPD_CalibrationData(SplitConditionCalibrationData): """Calibration data for the LPD detector.""" dark_calibrations = { "Offset", "Noise", "BadPixelsDark", } illuminated_calibrations = { "RelativeGain", "GainAmpMap", "FFMap", "BadPixelsFF", } dark_parameters = [ "Sensor Bias Voltage", "Memory cells", "Pixels X", "Pixels Y", "Feedback capacitor", ] illuminated_parameters = dark_parameters + ["Source Energy", "category"] def __init__( self, detector_name, sensor_bias_voltage, memory_cells, feedback_capacitor=5.0, pixels_x=256, pixels_y=256, source_energy=9.2, category=1, modules=None, client=None, event_at=None, snapshot_at=None, module_naming="da", ): super().__init__( detector_name, modules, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.memory_cells = memory_cells self.pixels_x = pixels_x self.pixels_y = pixels_y self.feedback_capacitor = feedback_capacitor self.source_energy = source_energy self.category = category class DSSC_CalibrationData(CalibrationData): """Calibration data for the DSSC detetor.""" calibrations = { "Offset", "Noise", } parameters = [ "Sensor Bias Voltage", "Memory cells", "Pixels X", "Pixels Y", "Pulse id checksum", "Acquisition rate", "Target gain", "Encoded gain", ] def __init__( self, detector_name, sensor_bias_voltage, memory_cells, pulse_id_checksum=None, acquisition_rate=None, target_gain=None, encoded_gain=None, pixels_x=512, pixels_y=128, modules=None, client=None, event_at=None, snapshot_at=None, module_naming="da", ): super().__init__( detector_name, modules, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.memory_cells = memory_cells self.pixels_x = pixels_x self.pixels_y = pixels_y self.pulse_id_checksum = pulse_id_checksum self.acquisition_rate = acquisition_rate self.target_gain = target_gain self.encoded_gain = encoded_gain class JUNGFRAU_CalibrationData(CalibrationData): """Calibration data for the JUNGFRAU detector.""" calibrations = { "Offset10Hz", "Noise10Hz", "BadPixelsDark10Hz", "RelativeGain10Hz", "BadPixelsFF10Hz", } parameters = [ "Sensor Bias Voltage", "Memory Cells", "Pixels X", "Pixels Y", "Integration Time", "Sensor temperature", "Gain Setting", ] def __init__( self, detector_name, sensor_bias_voltage, memory_cells, integration_time, gain_setting, gain_mode=None, sensor_temperature=291, pixels_x=1024, pixels_y=512, modules=None, client=None, event_at=None, snapshot_at=None, module_naming="da", ): super().__init__( detector_name, modules, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.memory_cells = memory_cells self.pixels_x = pixels_x self.pixels_y = pixels_y self.integration_time = integration_time self.sensor_temperature = sensor_temperature self.gain_setting = gain_setting self.gain_mode = gain_mode def _build_condition(self, parameters): cond = super()._build_condition(parameters) # Fix-up some database quirks. if int(cond.get("Gain mode", -1)) == 0: del cond["Gain mode"] return cond class PNCCD_CalibrationData(CalibrationData): calibrations = { "OffsetCCD", "BadPixelsDarkCCD", "NoiseCCD", "RelativeGainCCD", "CTECCD", } parameters = [ "Sensor Bias Voltage", "Memory cells", "Pixels X", "Pixels Y", "Integration Time", "Sensor Temperature", "Gain Setting", ] def __init__( self, detector_name, sensor_bias_voltage, integration_time, sensor_temperature, gain_setting, pixels_x=1024, pixels_y=1024, client=None, event_at=None, snapshot_at=None, module_naming="da", ): # Ignore modules for this detector. super().__init__( detector_name, None, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.memory_cells = 1 # Ignore memory_cells for this detector self.pixels_x = pixels_x self.pixels_y = pixels_y self.integration_time = integration_time self.sensor_temperature = sensor_temperature self.gain_setting = gain_setting class EPIX100_CalibrationData(SplitConditionCalibrationData): dark_calibrations = { "OffsetEPix100", "NoiseEPix100", "BadPixelsDarkEPix100", } illuminated_calibrations = { "RelativeGainEPix100", # 'BadPixelsFFEPix100', } dark_parameters = [ "Sensor Bias Voltage", "Memory cells", "Pixels X", "Pixels Y", "Integration time", "Sensor temperature", "In vacuum", ] illuminated_parameters = dark_parameters + ["Source energy"] def __init__( self, detector_name, sensor_bias_voltage, integration_time, in_vacuum=0, sensor_temperature=288, pixels_x=708, pixels_y=768, source_energy=9.2, client=None, event_at=None, snapshot_at=None, module_naming="da", ): # Ignore modules for this detector. super().__init__( detector_name, None, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.integration_time = integration_time self.memory_cells = 1 # Ignore memory_cells for this detector self.pixels_x = pixels_x self.pixels_y = pixels_y self.in_vacuum = in_vacuum self.sensor_temperature = sensor_temperature self.source_energy = source_energy class GOTTHARD2_CalibrationData(CalibrationData): calibrations = { "LUTGotthard2" "OffsetGotthard2", "NoiseGotthard2", "BadPixelsDarkGotthard2", "RelativeGainGotthard2", "BadPixelsFFGotthard2", } parameters = [ "Sensor Bias Voltage", "Exposure time", "Exposure period", "Acquisition rate", "Single photon", ] def __init__( self, detector_name, sensor_bias_voltage, exposure_time, exposure_period, acquisition_rate, single_photon, client=None, event_at=None, snapshot_at=None, module_naming="da", ): # Ignore modules for this detector. super().__init__( detector_name, None, client, event_at, snapshot_at, module_naming, ) self.sensor_bias_voltage = sensor_bias_voltage self.exposure_time = exposure_time self.exposure_period = exposure_period self.acquisition_rate = acquisition_rate self.single_photon = single_photon