Skip to content
Snippets Groups Projects
Commit 0e9b1ed5 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Backport CalCat CCV metadata API from EXtra

parent 320b21f1
No related branches found
No related tags found
1 merge request!885Revised CalCat API
import json
import re
from collections.abc import Mapping
from dataclasses import dataclass, replace
from dataclasses import dataclass, field, replace
from datetime import date, datetime, time, timezone
from functools import lru_cache
from pathlib import Path
......@@ -244,6 +244,8 @@ class SingleConstant:
dataset: str
ccv_id: Optional[int]
pdu_name: Optional[str]
_metadata: dict = field(default_factory=dict)
_have_calcat_metadata: bool = False
@classmethod
def from_response(cls, ccv: dict) -> "SingleConstant":
......@@ -252,6 +254,8 @@ class SingleConstant:
dataset=ccv["data_set_name"],
ccv_id=ccv["id"],
pdu_name=ccv["physical_detector_unit"]["physical_name"],
_metadata=ccv,
_have_calcat_metadata=True,
)
def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
......@@ -266,6 +270,36 @@ class SingleConstant:
def ndarray(self, caldb_root=None):
return self.dataset_obj(caldb_root)[:]
def _load_calcat_metadata(self, client=None):
client = client or get_client()
calcat_meta = client.get(f"calibration_constant_versions/{self.ccv_id}")
# Any metadata we already have takes precedence over CalCat, so
# this can't change a value that was previously returned.
self._metadata = calcat_meta | self._metadata
self._have_calcat_metadata = True
def metadata(self, key, client=None):
"""Get a specific metadata field, e.g. 'begin_validity_at'
This may make a request to CalCat if the value is not already known.
"""
if key not in self._metadata and not self._have_calcat_metadata:
if self.ccv_id is None:
raise KeyError(f"{key!r} (no CCV ID to request data from CalCat")
self._load_calcat_metadata(client)
return self._metadata[key]
def metadata_dict(self, client=None):
"""Get a dict of available metadata
If this constant didn't come from CalCat but we have a CalCat CCV ID,
this will fetch metadata from CalCat.
"""
if (not self._have_calcat_metadata) and (self.ccv_id is not None):
self._load_calcat_metadata(client)
return self._metadata.copy()
def prepare_selection(
module_details, module_nums=None, aggregator_names=None, qm_names=None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment