Skip to content
Snippets Groups Projects

Revised CalCat API

Merged Thomas Kluyver requested to merge calcat-api-2 into master
Compare and Show latest version
2 files
+ 102
33
Compare changes
  • Side-by-side
  • Inline
Files
2
import json
import re
from collections.abc import Mapping
from dataclasses import dataclass
from dataclasses import dataclass, field, replace
from datetime import date, datetime, time, timezone
from functools import lru_cache
from pathlib import Path
@@ -201,7 +201,7 @@ def setup_client(
if oauth_client is None and base_url == CALCAT_PROXY_URL:
try:
# timeout=(connect_timeout, read_timeout)
global_client.get_request('me', timeout=(1, 5))
global_client.get_request("me", timeout=(1, 5))
except requests.ConnectionError as e:
raise RuntimeError(
"Could not connect to calibration catalog proxy. This proxy allows "
@@ -212,12 +212,12 @@ def setup_client(
) from e
_default_caldb_root = ...
_default_caldb_root = None
def _get_default_caldb_root():
global _default_caldb_root
if _default_caldb_root is ...:
if _default_caldb_root is None:
onc_path = Path("/common/cal/caldb_store")
maxwell_path = Path("/gpfs/exfel/d/cal/caldb_store")
if onc_path.is_dir():
@@ -225,7 +225,10 @@ def _get_default_caldb_root():
elif maxwell_path.is_dir():
_default_caldb_root = maxwell_path
else:
_default_caldb_root = None
raise RuntimeError(
f"Neither {onc_path} nor {maxwell_path} was found. If the caldb_store "
"directory is at another location, pass its path as caldb_root."
)
return _default_caldb_root
@@ -237,33 +240,22 @@ class SingleConstant:
CalCat calls this a calibration constant version (CCV).
"""
id: int
version_name: str
constant_id: int
constant_name: str
condition_id: int
path: Path
dataset: str
begin_validity_at: datetime
end_validity_at: datetime
raw_data_location: str
physical_name: str # PDU name
ccv_id: Optional[int]
pdu_name: Optional[str]
_metadata: dict = field(default_factory=dict)
_have_calcat_metadata: bool = False
@classmethod
def from_response(cls, ccv: dict) -> "SingleConstant":
const = ccv["calibration_constant"]
return cls(
id=ccv["id"],
version_name=ccv["name"],
constant_id=const["id"],
constant_name=const["name"],
condition_id=const["condition_id"],
path=Path(ccv["path_to_file"]) / ccv["file_name"],
dataset=ccv["data_set_name"],
begin_validity_at=ccv["begin_validity_at"],
end_validity_at=ccv["end_validity_at"],
raw_data_location=ccv["raw_data_location"],
physical_name=ccv["physical_detector_unit"]["physical_name"],
ccv_id=ccv["id"],
pdu_name=ccv["physical_detector_unit"]["physical_name"],
_metadata=ccv,
_have_calcat_metadata=True,
)
def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
@@ -278,6 +270,36 @@ class SingleConstant:
def ndarray(self, caldb_root=None):
return self.dataset_obj(caldb_root)[:]
def _load_calcat_metadata(self, client=None):
client = client or get_client()
calcat_meta = client.get(f"calibration_constant_versions/{self.ccv_id}")
# Any metadata we already have takes precedence over CalCat, so
# this can't change a value that was previously returned.
self._metadata = calcat_meta | self._metadata
self._have_calcat_metadata = True
def metadata(self, key, client=None):
"""Get a specific metadata field, e.g. 'begin_validity_at'
This may make a request to CalCat if the value is not already known.
"""
if key not in self._metadata and not self._have_calcat_metadata:
if self.ccv_id is None:
raise KeyError(f"{key!r} (no CCV ID to request data from CalCat")
self._load_calcat_metadata(client)
return self._metadata[key]
def metadata_dict(self, client=None):
"""Get a dict of available metadata
If this constant didn't come from CalCat but we have a CalCat CCV ID,
this will fetch metadata from CalCat.
"""
if (not self._have_calcat_metadata) and (self.ccv_id is not None):
self._load_calcat_metadata(client)
return self._metadata.copy()
def prepare_selection(
module_details, module_nums=None, aggregator_names=None, qm_names=None
@@ -306,12 +328,45 @@ def prepare_selection(
@dataclass
class MultiModuleConstant:
class MultiModuleConstant(Mapping):
"""A group of similar constants for several modules of one detector"""
constants: Dict[str, SingleConstant] # Keys e.g. 'LPD00'
module_details: List[Dict]
detector_name: str # e.g. 'HED_DET_AGIPD500K2G'
calibration_name: str
def __repr__(self):
return (
f"<MultiModuleConstant: {self.calibration_name} for "
f"{len(self.constants)} modules of {self.detector_name}>"
)
def __iter__(self):
return iter(self.constants)
def __len__(self):
return len(self.constants)
def __getitem__(self, key):
if key in (None, ""):
raise KeyError(key)
candidate_kdas = set()
if key in self.constants: # Karabo DA name, e.g. 'LPD00'
candidate_kdas.add(key)
for m in self.module_details:
names = (m["module_number"], m["virtual_device_name"], m["physical_name"])
if key in names and m["karabo_da"] in self.constants:
candidate_kdas.add(m["karabo_da"])
if not candidate_kdas:
raise KeyError(key)
elif len(candidate_kdas) > 1:
raise KeyError(f"Ambiguous key: {key} matched {candidate_kdas}")
return self.constants[candidate_kdas.pop()]
def select_modules(
self, module_nums=None, *, aggregator_names=None, qm_names=None
@@ -321,7 +376,7 @@ class MultiModuleConstant:
)
d = {aggr: scv for (aggr, scv) in self.constants.items() if aggr in aggs}
mods = [m for m in self.module_details if m["karabo_da"] in d]
return MultiModuleConstant(d, mods, self.detector_name)
return replace(self, constants=d, module_details=mods)
# These properties label only the modules we have constants for, which may
# be a subset of what's in module_details
@@ -390,7 +445,7 @@ class MultiModuleConstant:
# Dimension labels
dims = ["module"] + ["dim_%d" % i for i in range(ndarr.ndim - 1)]
coords = {"module": modules}
name = self.constants[self.aggregator_names[0]].constant_name
name = self.calibration_name
return xarray.DataArray(ndarr, dims=dims, coords=coords, name=name)
@@ -514,6 +569,10 @@ class CalibrationData(Mapping):
kda = pdu["karabo_da"] = pdu.pop("karabo_da_at_ccv_begin_at")
det_id = pdu["detector_id"] = pdu.pop("detector_id_at_ccv_begin_at")
pdu["virtual_device_name"] = pdu.pop("virtual_device_name_at_ccv_begin_at")
if pdu.get("module_number_at_ccv_begin_at") is not None:
pdu["module_number"] = pdu.pop("module_number_at_ccv_begin_at")
else:
pdu["module_number"] = int(re.findall(r"\d+", kda)[-1])
det_ids.add(det_id)
if kda in pdus:
@@ -540,9 +599,15 @@ class CalibrationData(Mapping):
return cls(constant_groups, module_details, det_name)
def __getitem__(self, key) -> MultiModuleConstant:
return MultiModuleConstant(
self.constant_groups[key], self.module_details, self.detector_name
)
if isinstance(key, str):
return MultiModuleConstant(
self.constant_groups[key], self.module_details, self.detector_name, key
)
elif isinstance(key, tuple) and len(key) == 2:
cal_type, module = key
return self[cal_type][module]
else:
raise TypeError(f"Key should be string or 2-tuple (got {key!r})")
def __iter__(self):
return iter(self.constant_groups)
@@ -550,6 +615,9 @@ class CalibrationData(Mapping):
def __len__(self):
return len(self.constant_groups)
def __contains__(self, item):
return item in self.constant_groups
def __repr__(self):
return (
f"<CalibrationData: {', '.join(sorted(self.constant_groups))} "
Loading