Skip to content
Snippets Groups Projects
Commit e735ea2f authored by Karim Ahmed's avatar Karim Ahmed
Browse files

update calcat_interface after testing notebooks

parent c62d83ea
No related branches found
No related tags found
1 merge request!738New calcat interface
This commit is part of merge request !774. Comments created here will be created in the context of that merge request.
"""Interfaces to calibration constant data."""
import multiprocessing
import re
import socket
from datetime import date, datetime, time, timezone
from functools import lru_cache
from os import getenv
from pathlib import Path
#from sys import maxsize
from weakref import WeakKeyDictionary
import pasha as psh
import h5py
import numpy as np
import pasha as psh
from calibration_client import CalibrationClient
from calibration_client.modules import (
Calibration,
......@@ -22,7 +19,6 @@ from calibration_client.modules import (
Parameter,
PhysicalDetectorUnit,
)
from logging import warning
__all__ = [
'CalCatError',
......@@ -33,7 +29,7 @@ __all__ = [
'JUNGFRAU_CalibrationData',
'PNCCD_CalibrationData',
'EPIX100_CalibrationData',
'Gotthard2_CalibrationData'
'GOTTHARD2_CalibrationData',
]
......@@ -217,7 +213,7 @@ class CalCatApi(metaclass=ClientWrapper):
metadata.setdefault(mod, dict())
return metadata
# Map calibration ID to calibratio name.
# Map calibration ID to calibration name.
cal_id_map = {self.calibration_id(calibration): calibration
for calibration in calibrations}
calibration_ids = list(cal_id_map.keys())
......@@ -258,9 +254,9 @@ class CalCatApi(metaclass=ClientWrapper):
raw_data_location=ccv['raw_data_location'],
start_idx=ccv['start_idx'],
end_idx=ccv['end_idx'],
physical_name=ccv['physical_detector_unit']['physical_name'],
physical_name=ccv[
'physical_detector_unit']['physical_name'],
)
return metadata
......@@ -453,7 +449,6 @@ class CalibrationData:
def metadata(
self, calibrations=None, event_at=None, snapshot_at=None,
raise_error=True,
):
"""Query CCV metadata for calibrations, conditions and time.
......@@ -472,21 +467,15 @@ class CalibrationData:
"""
metadata = CCVMetadata()
try:
self._api.closest_ccv_by_time_by_condition(
self.detector_name, calibrations or self.calibrations,
self.condition, self.modules,
event_at or self.event_at, snapshot_at or self.snapshot_at,
metadata)
except CalCatError as e:
if not raise_error:
warning(str(e))
else:
raise
self._api.closest_ccv_by_time_by_condition(
self.detector_name, calibrations or self.calibrations,
self.condition, self.modules,
event_at or self.event_at, snapshot_at or self.snapshot_at,
metadata)
return metadata
def ndarray(
self, module, calibration, metadata=None, raise_error=True, empty_array=None,
self, module, calibration, metadata,
):
"""Load CCV data as ndarray.
......@@ -506,20 +495,40 @@ class CalibrationData:
raise ValueError('module not part of this calibration data')
if metadata is None:
metadata = self.metadata([calibration], raise_error=raise_error)
if metadata: # In case a constant was not found, empty dict.
return self._load_ccv_data(metadata, module, calibration)
else:
return empty_array
metadata = self.metadata([calibration])
return self._load_ccv_data(metadata, module, calibration)
def _allocate_const_arrays(self, metadata, const_load_mp, const_data):
for mod, ccv_entry in metadata.items():
const_data[mod] = {}
for cname, mdata in ccv_entry.items():
dataset = mdata["dataset"]
with h5py.File(self.caldb_root / mdata["path"], "r") as cf:
shape = cf[f"{dataset}/data"].shape
dtype = cf[f"{dataset}/data"].dtype
const_data[mod][cname] = const_load_mp.alloc(
shape=shape, dtype=dtype)
def load_constants_data(self, metadata):
def load_constant_dataset(wid, index, mod):
for cname, mdata in metadata[mod].items():
with h5py.File(self.caldb_root / mdata["path"], "r") as cf:
cf[f"{mdata['dataset']}/data"].read_direct(
const_data[mod][cname])
def _load_ccv_data(self, metadata, module, calibration):
row = metadata[module][calibration]
const_data = dict()
const_load_mp = psh.ProcessContext(num_workers=24)
self._allocate_const_arrays(metadata, const_load_mp, const_data)
const_load_mp.map(load_constant_dataset, list(metadata.keys()))
with h5py.File(self.caldb_root / row['path'], 'r') as f:
return f[row['dataset'] + '/data'][()]
return const_data
def ndarray_map(
self, calibrations=None, metadata=None, processes=None,
self, calibrations=None, metadata=None,
):
"""Load all CCV data in a nested map of ndarrays.
......@@ -528,31 +537,41 @@ class CalibrationData:
or None for all available (default).
metadata (CCVMetadata, optional): CCV metadata to load constant
for or None to query metadata automatically (default).
processes (Int):
Returns:
(dict of dict of ndarray): CCV data by module number and
calibration constant name.
{module: {calibration: ndarray}}
"""
from functools import partial
if self.caldb_root is None:
raise RuntimeError('calibration database store unavailable')
if metadata is None:
metadata = self.metadata(calibrations)
modno_cname = [
(modno, cname) for modno, md in metadata.items() for cname in md.keys()]
load_ccv_data = partial(self._load_ccv_data, metadata)
with multiprocessing.pool.ThreadPool() as pool:
r = pool.starmap(load_ccv_data, modno_cname)
return self.load_constants_data(metadata)
def data_map(
self, calibrations=None, metadata=None,
):
"""Load all CCV data in a nested map of ndarrays.
Args:
calibrations (Iterable of str, optional): Calibration constants
or None for all available (default).
metadata (CCVMetadata, optional): CCV metadata to load constant
for or None to query metadata automatically (default).
Returns:
(dict of dict of ndarray): CCV data by module number and
calibration constant name.
{module: {calibration: ndarray}}
"""
if self.caldb_root is None:
raise RuntimeError('calibration database store unavailable')
arr_map = {}
for (mod, cname), val in zip(modno_cname, r):
arr_map.setdefault(mod, {})[cname] = val
if metadata is None:
metadata = self.metadata(calibrations)
return arr_map
return self.load_constants_data(metadata)
def _build_condition(self, parameters):
cond = dict()
......@@ -684,7 +703,7 @@ class SplitConditionCalibrationData(CalibrationData):
return self._build_condition(self.illuminated_parameters)
def metadata(
self, calibrations=None, event_at=None, snapshot_at=None, raise_error=True,
self, calibrations=None, event_at=None, snapshot_at=None,
):
"""Query CCV metadata for calibrations, conditions and time.
......@@ -709,27 +728,20 @@ class SplitConditionCalibrationData(CalibrationData):
metadata = CCVMetadata()
dark_calibrations = self.dark_calibrations & set(calibrations)
try:
if dark_calibrations:
self._api.closest_ccv_by_time_by_condition(
self.detector_name, dark_calibrations,
self.dark_condition, self.modules,
event_at or self.event_at, snapshot_at or self.snapshot_at,
metadata)
illum_calibrations = self.illuminated_calibrations & set(calibrations)
if illum_calibrations:
self._api.closest_ccv_by_time_by_condition(
self.detector_name, illum_calibrations,
self.illuminated_condition, self.modules,
event_at or self.event_at, snapshot_at or self.snapshot_at,
metadata)
except CalCatError as e:
print(e)
if not raise_error:
warning(str(e))
else:
raise
if dark_calibrations:
self._api.closest_ccv_by_time_by_condition(
self.detector_name, dark_calibrations,
self.dark_condition, self.modules,
event_at or self.event_at, snapshot_at or self.snapshot_at,
metadata)
illum_calibrations = self.illuminated_calibrations & set(calibrations)
if illum_calibrations:
self._api.closest_ccv_by_time_by_condition(
self.detector_name, illum_calibrations,
self.illuminated_condition, self.modules,
event_at or self.event_at, snapshot_at or self.snapshot_at,
metadata)
return metadata
......@@ -923,7 +935,16 @@ class JUNGFRAU_CalibrationData(CalibrationData):
self.integration_time = integration_time
self.sensor_temperature = sensor_temperature
self.gain_setting = gain_setting
self.gain_mode = None if gain_mode == 0 else 1
self.gain_mode = gain_mode
def _build_condition(self, parameters):
cond = super()._build_condition(parameters)
# Fix-up some database quirks.
if int(cond.get('Gain mode', -1)) == 0:
del cond['Gain mode']
return cond
class PNCCD_CalibrationData(CalibrationData):
......@@ -973,7 +994,7 @@ class EPIX100_CalibrationData(SplitConditionCalibrationData):
}
illuminated_calibrations = {
'RelativeGainEPix100',
#'BadPixelsFFEPix100',
# 'BadPixelsFFEPix100',
}
dark_parameters = [
'Sensor Bias Voltage',
......@@ -1020,12 +1041,10 @@ class GOTTHARD2_CalibrationData(CalibrationData):
}
parameters = [
'Sensor Bias Voltage',
'Memory cells',
'Pixels X',
'Pixels Y',
'Integration time',
'Sensor temperature',
'Gain setting',
'Exposure time',
'Exposure period',
'Acquisition rate',
'Single photon',
]
def __init__(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment