diff --git a/src/cal_tools/calcat_interface.py b/src/cal_tools/calcat_interface.py index d72e0a2ec883b63f990ef24dd669dd7990f87b55..4b1e2288af408af050c57a51037206014950ad1a 100644 --- a/src/cal_tools/calcat_interface.py +++ b/src/cal_tools/calcat_interface.py @@ -1,19 +1,16 @@ """Interfaces to calibration constant data.""" -import multiprocessing import re import socket from datetime import date, datetime, time, timezone from functools import lru_cache from os import getenv from pathlib import Path -#from sys import maxsize from weakref import WeakKeyDictionary -import pasha as psh import h5py -import numpy as np +import pasha as psh from calibration_client import CalibrationClient from calibration_client.modules import ( Calibration, @@ -22,7 +19,6 @@ from calibration_client.modules import ( Parameter, PhysicalDetectorUnit, ) -from logging import warning __all__ = [ 'CalCatError', @@ -33,7 +29,7 @@ __all__ = [ 'JUNGFRAU_CalibrationData', 'PNCCD_CalibrationData', 'EPIX100_CalibrationData', - 'Gotthard2_CalibrationData' + 'GOTTHARD2_CalibrationData', ] @@ -217,7 +213,7 @@ class CalCatApi(metaclass=ClientWrapper): metadata.setdefault(mod, dict()) return metadata - # Map calibration ID to calibratio name. + # Map calibration ID to calibration name. cal_id_map = {self.calibration_id(calibration): calibration for calibration in calibrations} calibration_ids = list(cal_id_map.keys()) @@ -258,9 +254,9 @@ class CalCatApi(metaclass=ClientWrapper): raw_data_location=ccv['raw_data_location'], start_idx=ccv['start_idx'], end_idx=ccv['end_idx'], - physical_name=ccv['physical_detector_unit']['physical_name'], + physical_name=ccv[ + 'physical_detector_unit']['physical_name'], ) - return metadata @@ -453,7 +449,6 @@ class CalibrationData: def metadata( self, calibrations=None, event_at=None, snapshot_at=None, - raise_error=True, ): """Query CCV metadata for calibrations, conditions and time. @@ -472,21 +467,15 @@ class CalibrationData: """ metadata = CCVMetadata() - try: - self._api.closest_ccv_by_time_by_condition( - self.detector_name, calibrations or self.calibrations, - self.condition, self.modules, - event_at or self.event_at, snapshot_at or self.snapshot_at, - metadata) - except CalCatError as e: - if not raise_error: - warning(str(e)) - else: - raise + self._api.closest_ccv_by_time_by_condition( + self.detector_name, calibrations or self.calibrations, + self.condition, self.modules, + event_at or self.event_at, snapshot_at or self.snapshot_at, + metadata) return metadata def ndarray( - self, module, calibration, metadata=None, raise_error=True, empty_array=None, + self, module, calibration, metadata, ): """Load CCV data as ndarray. @@ -506,20 +495,40 @@ class CalibrationData: raise ValueError('module not part of this calibration data') if metadata is None: - metadata = self.metadata([calibration], raise_error=raise_error) - if metadata: # In case a constant was not found, empty dict. - return self._load_ccv_data(metadata, module, calibration) - else: - return empty_array + metadata = self.metadata([calibration]) + + return self._load_ccv_data(metadata, module, calibration) + + def _allocate_const_arrays(self, metadata, const_load_mp, const_data): + + for mod, ccv_entry in metadata.items(): + const_data[mod] = {} + for cname, mdata in ccv_entry.items(): + dataset = mdata["dataset"] + with h5py.File(self.caldb_root / mdata["path"], "r") as cf: + shape = cf[f"{dataset}/data"].shape + dtype = cf[f"{dataset}/data"].dtype + + const_data[mod][cname] = const_load_mp.alloc( + shape=shape, dtype=dtype) + + def load_constants_data(self, metadata): + + def load_constant_dataset(wid, index, mod): + for cname, mdata in metadata[mod].items(): + with h5py.File(self.caldb_root / mdata["path"], "r") as cf: + cf[f"{mdata['dataset']}/data"].read_direct( + const_data[mod][cname]) - def _load_ccv_data(self, metadata, module, calibration): - row = metadata[module][calibration] + const_data = dict() + const_load_mp = psh.ProcessContext(num_workers=24) + self._allocate_const_arrays(metadata, const_load_mp, const_data) + const_load_mp.map(load_constant_dataset, list(metadata.keys())) - with h5py.File(self.caldb_root / row['path'], 'r') as f: - return f[row['dataset'] + '/data'][()] + return const_data def ndarray_map( - self, calibrations=None, metadata=None, processes=None, + self, calibrations=None, metadata=None, ): """Load all CCV data in a nested map of ndarrays. @@ -528,31 +537,41 @@ class CalibrationData: or None for all available (default). metadata (CCVMetadata, optional): CCV metadata to load constant for or None to query metadata automatically (default). - processes (Int): Returns: (dict of dict of ndarray): CCV data by module number and calibration constant name. {module: {calibration: ndarray}} """ - from functools import partial if self.caldb_root is None: raise RuntimeError('calibration database store unavailable') if metadata is None: metadata = self.metadata(calibrations) - modno_cname = [ - (modno, cname) for modno, md in metadata.items() for cname in md.keys()] - - load_ccv_data = partial(self._load_ccv_data, metadata) - with multiprocessing.pool.ThreadPool() as pool: - r = pool.starmap(load_ccv_data, modno_cname) + return self.load_constants_data(metadata) + + def data_map( + self, calibrations=None, metadata=None, + ): + """Load all CCV data in a nested map of ndarrays. + + Args: + calibrations (Iterable of str, optional): Calibration constants + or None for all available (default). + metadata (CCVMetadata, optional): CCV metadata to load constant + for or None to query metadata automatically (default). + Returns: + (dict of dict of ndarray): CCV data by module number and + calibration constant name. + {module: {calibration: ndarray}} + """ + if self.caldb_root is None: + raise RuntimeError('calibration database store unavailable') - arr_map = {} - for (mod, cname), val in zip(modno_cname, r): - arr_map.setdefault(mod, {})[cname] = val + if metadata is None: + metadata = self.metadata(calibrations) - return arr_map + return self.load_constants_data(metadata) def _build_condition(self, parameters): cond = dict() @@ -684,7 +703,7 @@ class SplitConditionCalibrationData(CalibrationData): return self._build_condition(self.illuminated_parameters) def metadata( - self, calibrations=None, event_at=None, snapshot_at=None, raise_error=True, + self, calibrations=None, event_at=None, snapshot_at=None, ): """Query CCV metadata for calibrations, conditions and time. @@ -709,27 +728,20 @@ class SplitConditionCalibrationData(CalibrationData): metadata = CCVMetadata() dark_calibrations = self.dark_calibrations & set(calibrations) - try: - if dark_calibrations: - self._api.closest_ccv_by_time_by_condition( - self.detector_name, dark_calibrations, - self.dark_condition, self.modules, - event_at or self.event_at, snapshot_at or self.snapshot_at, - metadata) - - illum_calibrations = self.illuminated_calibrations & set(calibrations) - if illum_calibrations: - self._api.closest_ccv_by_time_by_condition( - self.detector_name, illum_calibrations, - self.illuminated_condition, self.modules, - event_at or self.event_at, snapshot_at or self.snapshot_at, - metadata) - except CalCatError as e: - print(e) - if not raise_error: - warning(str(e)) - else: - raise + if dark_calibrations: + self._api.closest_ccv_by_time_by_condition( + self.detector_name, dark_calibrations, + self.dark_condition, self.modules, + event_at or self.event_at, snapshot_at or self.snapshot_at, + metadata) + + illum_calibrations = self.illuminated_calibrations & set(calibrations) + if illum_calibrations: + self._api.closest_ccv_by_time_by_condition( + self.detector_name, illum_calibrations, + self.illuminated_condition, self.modules, + event_at or self.event_at, snapshot_at or self.snapshot_at, + metadata) return metadata @@ -923,7 +935,16 @@ class JUNGFRAU_CalibrationData(CalibrationData): self.integration_time = integration_time self.sensor_temperature = sensor_temperature self.gain_setting = gain_setting - self.gain_mode = None if gain_mode == 0 else 1 + self.gain_mode = gain_mode + + def _build_condition(self, parameters): + cond = super()._build_condition(parameters) + + # Fix-up some database quirks. + if int(cond.get('Gain mode', -1)) == 0: + del cond['Gain mode'] + + return cond class PNCCD_CalibrationData(CalibrationData): @@ -973,7 +994,7 @@ class EPIX100_CalibrationData(SplitConditionCalibrationData): } illuminated_calibrations = { 'RelativeGainEPix100', - #'BadPixelsFFEPix100', + # 'BadPixelsFFEPix100', } dark_parameters = [ 'Sensor Bias Voltage', @@ -1020,12 +1041,10 @@ class GOTTHARD2_CalibrationData(CalibrationData): } parameters = [ 'Sensor Bias Voltage', - 'Memory cells', - 'Pixels X', - 'Pixels Y', - 'Integration time', - 'Sensor temperature', - 'Gain setting', + 'Exposure time', + 'Exposure period', + 'Acquisition rate', + 'Single photon', ] def __init__(