Something went wrong on our end
-
Thomas Kluyver authoredThomas Kluyver authored
tools.py 38.74 KiB
import datetime
import json
import os
import re
import zlib
from collections import OrderedDict
from glob import glob
from multiprocessing.pool import ThreadPool
from os import environ, listdir, path
from os.path import isfile
from pathlib import Path
from queue import Queue
from tempfile import NamedTemporaryFile
from time import sleep
from typing import List, Optional, Tuple, Union
from urllib.parse import urljoin
import h5py
import ipykernel
import numpy as np
import requests
import yaml
import zmq
from extra_data import H5File, RunDirectory
from iCalibrationDB import ConstantMetaData, Versions
from notebook.notebookapp import list_running_servers
from .ana_tools import save_dict_to_hdf5
def parse_runs(runs, return_type=str):
pruns = []
if isinstance(runs, str):
for rcomp in runs.split(","):
if "-" in rcomp:
start, end = rcomp.split("-")
pruns += list(range(int(start), int(end)))
else:
pruns += [int(rcomp)]
elif isinstance(runs, (list, tuple)):
pruns = runs
else:
pruns = [runs, ]
if return_type is str:
return ["r{:04d}".format(r) for r in pruns]
else:
return pruns
def run_prop_seq_from_path(filename):
run = re.findall(r'.*r([0-9]{4}).*', filename)
run = run[0] if len(run) else None
proposal = re.findall(r'.*p([0-9]{6}).*', filename)
proposal = proposal[0] if len(proposal) else None
sequence = re.findall(r'.*S([0-9]{5}).*', filename)
sequence = sequence[0] if len(sequence) else None
return run, proposal, sequence
def map_seq_files(
run_folder: Path,
karabo_das: List[str],
sequences: Optional[List[int]] = None,
) -> Tuple[dict, int]:
"""Glob run_folder and match the files based on the selected
detectors and sequence numbers.
Returns:
Dict: with karabo_das keys and the corresponding sequence files.
Int: for number of all sequence files for all karabo_das to process.
"""
if sequences == [-1]:
sequences = None
if sequences is not None:
sequences = set(int(seq) for seq in sequences)
seq_fn_pat = re.compile(r".*-(?P<da>.*?)-S(?P<seq>.*?)\.h5")
mapped_files = {kda: [] for kda in karabo_das}
total_files = 0
for fn in run_folder.glob("*.h5"):
if (match := seq_fn_pat.match(fn.name)) is not None:
da = match.group("da")
if da in mapped_files and (
sequences is None or int(match.group("seq")) in sequences
):
mapped_files[da].append(fn)
total_files += 1
# Return dict with sorted list of sequence files.
for k in mapped_files:
mapped_files[k].sort()
return mapped_files, total_files
def map_modules_from_folder(in_folder, run, path_template, karabo_da,
sequences=None):
"""
Prepare queues of files to process.
Queues are stored in dictionary with module name Q{}M{} as a key
:param in_folder: Input folder with raw data
:param run: Run number
:param path_template: Template for file name
e.g. `RAW-R{:04d}-{}-S{:05d}.h5`
:param karabo_da: List of data aggregators e.g. [AGIPD00, AGIPD01]
:param sequences: List of sequences to be considered
:return: Dictionary of queues of files, dictionary of module indexes,
total number of sequences, dictionary of number of sequences per module
"""
module_files = OrderedDict()
mod_ids = OrderedDict()
total_sequences = 0
total_file_size = 0
sequences_qm = {}
for inset in karabo_da:
module_idx = int(inset[-2:])
name = module_index_to_qm(module_idx)
module_files[name] = Queue()
sequences_qm[name] = 0
mod_ids[name] = module_idx
if sequences is None:
fname = path_template.format(run, inset, 0).replace("S00000", "S*")
abs_fname = "{}/r{:04d}/{}".format(in_folder, run, fname)
for filename in glob(abs_fname):
module_files[name].put(filename)
total_sequences += 1
sequences_qm[name] += 1
total_file_size += path.getsize(filename)
else:
for sequence in sequences:
fname = path_template.format(run, inset, sequence)
abs_fname = "{}/r{:04d}/{}".format(in_folder, run, fname)
if not isfile(abs_fname):
continue
module_files[name].put(abs_fname)
total_sequences += 1
sequences_qm[name] += 1
total_file_size += path.getsize(abs_fname)
return (module_files, mod_ids, total_sequences,
sequences_qm, total_file_size)
def map_gain_stages(in_folder, runs, path_template, karabo_da, sequences=None):
"""
Prepare queues of files to process.
Queues are stored in dictionary with module name Q{}M{}
and gain name as a keys
:param in_folder: Input folder with raw data
:param runs: Dictionary of runs with key naming the gain stages
:param path_template: Template for file name
e.g. `RAW-R{:04d}-{}-S{:05d}.h5`
:param karabo_da: List of data aggregators e.g. [AGIPD00, AGIPD01]
:param sequences: List of sequences to be considered
:return: Dictionary of queues of files,
total number of sequences
"""
total_sequences = 0
total_file_size = 0
gain_mapped_files = OrderedDict()
for gain, run in runs.items():
mapped_files, _, seq, _, fs = map_modules_from_folder(in_folder, run,
path_template,
karabo_da,
sequences)
total_sequences += seq
total_file_size += fs
gain_mapped_files[gain] = mapped_files
return gain_mapped_files, total_sequences, total_file_size / 1e9
def map_modules_from_files(filelist, file_inset, quadrants, modules_per_quad):
total_sequences = 0
total_file_size = 0
module_files = {}
mod_ids = {}
for quadrant in range(quadrants):
for module in range(modules_per_quad):
name = "Q{}M{}".format(quadrant + 1, module + 1)
module_files[name] = Queue()
num = quadrant * 4 + module
mod_ids[name] = num
file_infix = "{}{:02d}".format(file_inset, num)
for file in filelist:
if file_infix in file:
module_files[name].put(file)
total_sequences += 1
total_file_size += path.getsize(file)
return module_files, mod_ids, total_sequences, total_file_size
def gain_map_files(in_folder, runs, sequences, file_inset, quadrants,
mods_per_quad):
total_sequences = 0
total_file_size = 0
gain_mapped_files = OrderedDict()
for gain, run in runs.items():
ginfolder = "{}/{}".format(in_folder, run)
dirlist = listdir(ginfolder)
file_list = []
for entry in dirlist:
# only h5 file
abs_entry = "{}/{}".format(ginfolder, entry)
if path.isfile(abs_entry) and path.splitext(abs_entry)[1] == ".h5":
if sequences is None:
file_list.append(abs_entry)
else:
for seq in sequences:
if "{:05d}.h5".format(seq) in abs_entry:
file_list.append(path.abspath(abs_entry))
mapped_files, mod_ids, seq, fs = map_modules_from_files(file_list,
file_inset,
quadrants,
mods_per_quad)
total_sequences += seq
total_file_size += fs
gain_mapped_files[gain] = mapped_files
return gain_mapped_files, total_sequences, total_file_size / 1e9
def get_notebook_name():
"""
Return the full path of the jupyter notebook.
"""
try:
kernel_id = re.search('kernel-(.*).json',
ipykernel.connect.get_connection_file()).group(1)
servers = list_running_servers()
for ss in servers:
response = requests.get(urljoin(ss['url'], 'api/sessions'),
params={'token': ss.get('token', '')})
for nn in json.loads(response.text):
if nn['kernel']['id'] == kernel_id:
return nn['notebook']['path']
except:
return environ.get("CAL_NOTEBOOK_NAME", "Unknown Notebook")
def creation_date_file_metadata(
run_folder: Path,
) -> Optional[datetime.datetime]:
"""Get run directory creation date from
METADATA/CreationDate of the oldest file using EXtra-data.
# TODO: update after DAQ store the same date as myMDC.
:param dc: EXtra-data DataCollection for the run directory.
:return Optional[datetime.datetime]: Run creation date.
"""
md_dict = RunDirectory(run_folder).run_metadata()
if md_dict["dataFormatVersion"] != "0.5":
creation_dates = [
H5File(f).run_metadata()["creationDate"]
for f in run_folder.glob("*.h5")
]
return datetime.datetime.strptime(
min(creation_dates), "%Y%m%dT%H%M%S%z")
else:
print("WARNING: input files contains old datasets. "
"No `METADATA/creationDate` to read.")
def creation_date_train_timestamp(
dc: RunDirectory
) -> Optional[datetime.datetime]:
"""Get creation date from the timestamp of the first train.
:param dc: EXtra-data DataCollection for the run directory.
:return Optional[datetime.datetime]: Run creation date.
"""
creation_date = np.datetime64(
dc.select_trains(np.s_[0]).train_timestamps()[0], 'us').item()
if creation_date is None:
print("WARNING: input files contains old datasets without"
" trains timestamps.")
return None
return creation_date.replace(tzinfo=datetime.timezone.utc)
def get_dir_creation_date(directory: Union[str, Path], run: int,
verbosity: int = 0) -> datetime.datetime:
"""Get the directory creation data based on 3 different methods.
1) Return run start time from myMDC. (get_runtime_metadata_client)
2) If myMDC connection is not set,
get the date from the files metadata. (get_runtime_metadata_file)
3) If data files are older than 2020 (dataformatversion == "0.5"),
get the data from the oldest file's modified time.
If the data is not available from either source,
this function will raise a FileNotFoundError.
:param directory: path to a directory which contains runs
(e.g. /gpfs/exfel/data/exp/callab/202031/p900113/raw/).
:param run: run number.
:param verbosity: Level of verbosity (0 - silent)
:return: creation datetime for the directory.
"""
directory = Path(directory, f'r{run:04d}')
# Validate the availability of the input folder.
# And show a clear error message, if it was not found.
try:
dc = RunDirectory(directory)
except FileNotFoundError as e:
raise FileNotFoundError(
"- Failed to read creation time, wrong input folder",
directory) from e
cdate = creation_date_train_timestamp(dc)
if cdate is not None:
# Exposing the method used for reading the creation_date.
print("Reading creation_date from input files metadata"
" `INDEX/timestamp`")
else: # It's an older dataset.
print("Reading creation_date from last modification data "
"for the oldest input file.")
cdate = datetime.datetime.fromtimestamp(
sorted(
directory.glob("*.h5"), key=path.getmtime,
)[0].stat().st_mtime,
tz=datetime.timezone.utc,
)
return cdate
def calcat_creation_time(
in_folder: Path,
run: str,
creation_time: Optional[str] = "",
) -> datetime.datetime:
"""Return the creation time to use with CALCAT."""
# Run's creation time:
if creation_time:
creation_time = datetime.datetime.strptime(
creation_time,
'%Y-%m-%d %H:%M:%S').astimezone(tz=datetime.timezone.utc)
else:
creation_time = get_dir_creation_date(in_folder, run)
return creation_time
def _init_metadata(constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
creation_time: Optional[str] = None
) -> 'ConstantMetaData':
"""Initializing a ConstantMetaData class instance and
add the correct creation time of the constant metadata.
"""
metadata = ConstantMetaData()
metadata.calibration_constant = constant
metadata.detector_condition = condition
if creation_time is None:
metadata.calibration_constant_version = Versions.Now()
else:
metadata.calibration_constant_version = Versions.Timespan(
start=creation_time)
return metadata
def save_const_to_h5(db_module: str, karabo_id: str,
constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
data: np.array, file_loc: str,
report: str,
creation_time: datetime.datetime,
out_folder: str) -> 'ConstantMetaData':
""" Save constant in h5 file with its metadata
(e.g. db_module, condition, creation_time)
:param db_module: database module (PDU/Physical Detector Unit).
:param karabo_id: karabo identifier.
:param constant: Calibration constant known for given detector.
:param condition: Calibration condition.
:param data: Constant data to save.
:param file_loc: Location of raw data "proposal:{} runs:{} {} {}".
:param creation_time: creation_time for the saved constant.
:param out_folder: path to output folder.
:return: metadata of the saved constant.
"""
metadata = _init_metadata(constant, condition, creation_time)
metadata.calibration_constant_version.raw_data_location = file_loc
dpar = {
parm.name: {
'lower_deviation_value': parm.lower_deviation,
'upper_deviation_value': parm.upper_deviation,
'value': parm.value,
'flg_logarithmic': parm.logarithmic,
}
for parm in metadata.detector_condition.parameters
}
creation_time = metadata.calibration_constant_version.begin_at
raw_data = metadata.calibration_constant_version.raw_data_location
constant_name = metadata.calibration_constant.__class__.__name__
data_to_store = {
'condition': dpar,
'db_module': db_module,
'karabo_id': karabo_id,
'constant': constant_name,
'data': data,
'creation_time': creation_time,
'file_loc': raw_data,
'report': report,
}
ofile = f"{out_folder}/const_{constant_name}_{db_module}.h5"
if isfile(ofile):
print(f'File {ofile} already exists and will be overwritten')
save_dict_to_hdf5(data_to_store, ofile)
return metadata
def get_random_db_interface(cal_db_interface):
"""Return interface to calibration DB with
random (with given range) port.
"""
# Initialize the random generator with a random seed value,
# in case the function was executed within a multiprocessing pool.
np.random.seed()
if "#" in cal_db_interface:
prot, serv, ran = cal_db_interface.split(":")
r1, r2 = ran.split("#")
return ":".join(
[prot, serv, str(np.random.randint(int(r1), int(r2)))])
return cal_db_interface
def get_report(out_folder: str, default_path: str = ""):
"""Get the report path from calibration_metadata.yml
stored in the out_folder.
"""
metadata = CalibrationMetadata(out_folder)
report_path = metadata.get("report-path", default_path)
if not report_path:
print("WARNING: No report path will be injected "
"with the constants.\n")
return report_path
def get_pdu_from_db(karabo_id: str, karabo_da: Union[str, list],
constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
cal_db_interface: str,
snapshot_at: Optional[datetime.datetime] = None,
timeout: int = 30000) -> List[str]:
"""Return all physical detector units for a
karabo_id and list of karabo_da
:param karabo_id: Karabo identifier.
:param karabo_da: Karabo data aggregator.
:param constant: Calibration constant object to
intialize CalibrationConstantMetadata class.
:param condition: Detector condition object to
intialize CalibrationConstantMetadata class.
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl016:8015".
:param snapshot_at: Database snapshot.
:param timeout: Calibration Database timeout.
:return: List of physical detector units (db_modules)
"""
if not isinstance(karabo_da, (str, list)):
raise TypeError("karabo_da should either be a list of multiple "
"karabo_da or a string of one karabo_da or 'all'")
metadata = _init_metadata(constant, condition, None)
# CalibrationDBRemote expects a string.
if snapshot_at is not None and hasattr(snapshot_at, 'isoformat'):
snapshot_at = snapshot_at.isoformat()
# A random interface is chosen if there is # for address range.
db_interface = get_random_db_interface(cal_db_interface)
pdu_dicts = metadata.retrieve_pdus_for_detector(receiver=db_interface,
karabo_id=karabo_id,
snapshot_at=snapshot_at,
timeout=timeout)
# Get a list of pdus based on requested karabo_das
if karabo_da == 'all':
db_modules = [d["pdu_physical_name"] for d in pdu_dicts]
else:
k_indices = []
if isinstance(karabo_da, str):
karabo_da = [karabo_da]
# Get indices of dict with the right karabo_da,
# else use None.
for k in karabo_da:
pdu_found = False
for i, d in enumerate(pdu_dicts):
if d["karabo_da"] == k:
k_indices.append(i)
pdu_found = True
break
if not pdu_found:
k_indices.append(None)
db_modules = []
for i in k_indices:
if i is None:
db_modules.append(None)
else:
db_modules.append(pdu_dicts[i]["pdu_physical_name"])
return db_modules
already_printed = {}
def get_from_db(karabo_id: str, karabo_da: str,
constant: 'iCalibrationDB.calibration_constant',
condition: 'iCalibrationDB.detector_conditions',
empty_constant: np.array,
cal_db_interface: str,
creation_time: Optional[datetime.datetime] = None,
verbosity: int = 1,
timeout: int = 30000,
ntries: int = 7,
meta_only: bool = True,
load_data: bool = True,
version_info: bool = False,
doraise: bool = False,
strategy: str = "pdu_closest_by_time"
) -> Tuple[np.array, 'ConstantMetaData']:
"""Return calibration constants and metadata requested from CalDB
This feature uses the karabo-id and karabo-da to retrieve the
desired CCV
:param karabo_id: karabo identifier (detector identifier).
:param karabo_da: karabo data aggregator.
:param constant: Calibration constant known for given detector.
:param condition: Calibration condition.
:param empty_constant: Constant to be returned in case of failure.
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl016:8015"
:param creation_time: Latest time for constant to be created.
:param verbosity: Level of verbosity (0 - silent)
:param timeout: Timeout for zmq request
ntries is set to 7 so that if the timeout started at 30s last timeout
will be ~ 1h.
:param ntries: number of tries to contact the database.
:param meta_only: Retrieve only metadata via ZMQ. Constants are
taken directly from the h5 file on maxwell.
:param version_info: Flag to show the info for the retrieved Constant.
:param doraise: if True raise errors during communication with DB.
:param strategy: Retrieving strategy for calibrationDBRemote.
:return: Calibration constant, metadata.
"""
if version_info:
meta_only = False
metadata = _init_metadata(constant, condition, creation_time)
if karabo_id and karabo_da:
when = None
if creation_time is not None and hasattr(creation_time, 'isoformat'):
when = creation_time.isoformat()
metadata.calibration_constant_version.karabo_id = karabo_id
metadata.calibration_constant_version.karabo_da = karabo_da
# make sure to remove device name from metadata dict before
# retrieving to keep using karabo_id and karabo_da only
# during retrieval. As device_name could have been set after
# retrieval from iCalibrationDB
metadata.calibration_constant_version.device_name = None
while ntries > 0:
this_interface = get_random_db_interface(cal_db_interface)
try:
r = metadata.retrieve(this_interface, timeout=timeout,
when=when, meta_only=meta_only,
version_info=version_info,
strategy=strategy)
if version_info:
return r
break
except zmq.error.Again:
ntries -= 1
timeout *= 2
sleep(np.random.randint(30))
# TODO: reevaluate the need for doraise
# and remove if not needed.
if ntries == 0 and doraise:
raise
except Exception as e:
if verbosity > 0:
print(e)
if 'missing_token' in str(e):
ntries -= 1
else:
ntries = 0
if ntries == 0 and doraise:
raise RuntimeError(f'{e}')
if ntries > 0:
mdata_const = metadata.calibration_constant_version
if load_data and meta_only:
hdf5path = getattr(mdata_const, 'hdf5path', None)
filename = getattr(mdata_const, 'filename', None)
h5path = getattr(mdata_const, 'h5path', None)
if not (hdf5path and filename and h5path):
raise ValueError(
"Wrong metadata received to access the constant data."
f" Retrieved constant filepath is {hdf5path}/{filename}" # noqa
f" and data_set_name is {h5path}."
)
with h5py.File(Path(hdf5path, filename), "r") as f:
metadata.calibration_constant.data = f[f"{h5path}/data"][()] # noqa
# The variant attribute is missing for old constants.
if "variant" in f[h5path].attrs.keys():
metadata.calibration_constant_version.variant = f[h5path].attrs["variant"] # noqa
if verbosity > 0:
if constant.name not in already_printed or verbosity > 1:
already_printed[constant.name] = True
# TODO: Reset mdata_const.begin_at
# if comm_db_success is False.
begin_at = mdata_const.begin_at
print(f"Retrieved {constant.name} "
f"with creation time: {begin_at}")
return constant.data, metadata
else:
return empty_constant, metadata
else:
return empty_constant, None
def send_to_db(db_module: str, karabo_id: str, constant, condition,
file_loc: str, report_path: str, cal_db_interface: str,
creation_time: Optional[datetime.datetime] = None,
timeout: int = 30000,
ntries: int = 7,
doraise: bool = False,
variant: int = 0):
"""Send new calibration constants and metadata requested to CalDB
:param db_module: database module (PDU/Physical Detector Unit)
:param karabo_id: karabo identifier
:param constant: Calibration constant known for given detector
:param condition: Calibration condition
:param file_loc: Location of raw data.
:param report_path: xfel-calbrate report path to inject along with
the calibration constant versions to the database.
:param cal_db_interface: Interface string, e.g. "tcp://max-exfl016:8015"
:param creation_time: Latest time for constant to be created
:param timeout: Timeout for zmq request
:param ntries: number of tries to contact the database,
ntries is set to 7 so that if the timeout started
at 30s last timeout will be ~ 1h.
:param doraise: if True raise errors during communication with DB
:param variant: A calibration constant version variant attribute
for the constant file.
"""
success = False
snapshot_at = None
metadata = _init_metadata(constant, condition, creation_time)
if db_module:
# Add injected constant's file source info as a file location
metadata.calibration_constant_version.raw_data_location = file_loc
if report_path:
# calibration_client expects a dict of injected report path
# of at least 2 characters for each key.
if not isinstance(report_path, str) or len(report_path) < 2:
raise TypeError(
"\"report_path\" needs to be a string "
"of at least 2 characters."
)
report = {"name": path.basename(report_path),
"file_path": report_path}
metadata.calibration_constant_version.report_path = report
metadata.calibration_constant_version.karabo_id = karabo_id
metadata.calibration_constant_version.device_name = db_module
metadata.calibration_constant_version.karabo_da = None
metadata.calibration_constant_version.raw_data_location = file_loc
metadata.calibration_constant_version.variant = variant
if constant.data is None:
raise ValueError(
"There is no data available to "
"inject to the database."
)
while ntries > 0:
this_interface = get_random_db_interface(cal_db_interface)
if (
creation_time is not None and
hasattr(creation_time, 'isoformat')
):
# This snapshot will be used only while retrieving
# the correct PDU and appending its UUID.
snapshot_at = creation_time.isoformat()
try:
metadata.send(
this_interface,
snapshot_at=snapshot_at,
timeout=timeout,
)
success = True # TODO: use comm_db_success
break
except zmq.error.Again:
ntries -= 1
timeout *= 2
sleep(np.random.randint(30))
if ntries == 0 and doraise:
raise
except Exception as e:
# TODO: refactor to use custom exception class
# Refactor error message for re-injecting an
# identical CCV to the database.
if all(s in str(e) for s in [
"Error creating calibration_constant_version",
"has already been taken",
]):
print(
f"WARNING: {constant.name} for {db_module}"
" has already been injected with the same "
"parameter conditions."
)
else:
print(f"{e}\n")
if 'missing_token' in str(e):
ntries -= 1
else:
ntries = 0
if ntries == 0 and doraise:
raise RuntimeError(f'{e}')
if success:
print(
f"{constant.name} for {db_module} "
"is injected with creation-time: "
f"{metadata.calibration_constant_version.begin_at}."
)
return metadata
def get_constant_from_db(karabo_id: str, karabo_da: str,
constant, condition, empty_constant,
cal_db_interface: str, creation_time=None,
print_once=True, timeout=30000, ntries=120,
meta_only=True):
"""Return calibration constants requested from CalDB
"""
data, _ = get_from_db(karabo_id, karabo_da, constant,
condition, empty_constant,
cal_db_interface, creation_time,
int(print_once), timeout, ntries, meta_only)
return data
def get_constant_from_db_and_time(karabo_id: str, karabo_da: str,
constant, condition, empty_constant,
cal_db_interface: str, creation_time=None,
print_once=True, timeout=30000, ntries=120):
"""Return calibration constants requested from CalDB,
alongside injection time
"""
data, m = get_from_db(karabo_id, karabo_da, constant,
condition, empty_constant,
cal_db_interface, creation_time,
int(print_once), timeout, ntries)
if m and m.comm_db_success:
return data, m.calibration_constant_version.begin_at
else:
# return None for injection time if communication with db failed.
# reasons (no constant or condition found,
# or network problem)
return data, None
def module_index_to_qm(index: int, total_modules: int = 16):
"""Maps module index (0-indexed) to quadrant + module string (1-indexed)"""
assert index < total_modules, f'{index} is greater than {total_modules}'
modules_per_quad = total_modules // 4
quad, mod = divmod(index, modules_per_quad)
return f"Q{quad+1}M{mod+1}"
def recursive_update(target: dict, source: dict):
"""Recursively merge source into target, checking for conflicts
Conflicting entries will not be copied to target. Returns True if any
conflicts were found.
"""
conflict = False
for k, v2 in source.items():
v1 = target.get(k, None)
if isinstance(v1, dict) and isinstance(v2, dict):
conflict = recursive_update(v1, v2) or conflict
elif (v1 is not None) and (v1 != v2):
conflict = True
else:
target[k] = v2
return conflict
class CalibrationMetadata(dict):
"""Convenience class: dictionary stored in metadata YAML file
If metadata file already exists, it will be loaded (this may override
additional constructor parameters given to this class). Use new=True to
skip loading it.
"""
def __init__(self, output_dir: Union[Path, str], *args, new=False):
dict.__init__(self, args)
self._yaml_fn = Path(output_dir) / "calibration_metadata.yml"
if self._yaml_fn.exists():
if new:
# TODO: update after resolving this discussion
# https://git.xfel.eu/detectors/pycalibration/-/merge_requests/624 # noqa
self.save()
else:
with self._yaml_fn.open("r") as fd:
data = yaml.safe_load(fd)
if isinstance(data, dict):
self.update(data)
else:
print(f"Warning: existing {self._yaml_fn} is malformed, "
"will be overwritten")
@property
def filename(self):
return self._yaml_fn
def save(self):
with self._yaml_fn.open("w") as fd:
yaml.safe_dump(dict(self), fd)
def save_copy(self, copy_dir: Path):
with (copy_dir / self._yaml_fn.name).open("w") as fd:
yaml.safe_dump(dict(self), fd)
def add_fragment(self, data: dict):
"""Save metadata to a separate 'fragment' file to be merged later
Avoids a risk of corrupting the main file by writing in parallel.
"""
prefix = f"metadata_frag_j{os.environ.get('SLURM_JOB_ID', '')}_"
with NamedTemporaryFile("w", dir=self._yaml_fn.parent,
prefix=prefix, suffix='.yml', delete=False) as fd:
yaml.safe_dump(data, fd)
def gather_fragments(self):
"""Merge in fragments saved by add_fragment(), then delete them"""
frag_files = list(self._yaml_fn.parent.glob('metadata_frag_*.yml'))
to_delete = []
for fn in frag_files:
with fn.open("r") as fd:
data = yaml.safe_load(fd)
if recursive_update(self, data):
print(f"{fn} contained conflicting metadata. "
f"This file will be left for debugging")
else:
to_delete.append(fn)
self.save()
for fn in to_delete:
fn.unlink()
def save_constant_metadata(
retrieved_constants: dict,
mdata: ConstantMetaData,
constant_name: str,
):
"""Save constant metadata to the input meta data dictionary.
The constant's metadata stored are file path, dataset name,
creation time, and physical detector unit name.
:param retrieved_constants: A dictionary to store the metadata for
the retrieved constant.
:param mdata: A ConstantMetaData object after retrieving trying
to retrieve a constant with get_from_db().
:param constant_name: String for constant name to be used as a key.
:param constants_key: The key name when all constants metadata
will be stored.
"""
mdata_const = mdata.calibration_constant_version
const_mdata = retrieved_constants[constant_name] = dict()
# check if constant was successfully retrieved.
if mdata.comm_db_success:
const_mdata["file-path"] = (
f"{mdata_const.hdf5path}" f"{mdata_const.filename}"
)
const_mdata["dataset-name"] = mdata_const.h5path
const_mdata["creation-time"] = mdata_const.begin_at
else:
const_mdata["file-path"] = None
const_mdata["creation-time"] = None
def load_specified_constants(
retrieved_constants: dict,
empty_constants: Optional[dict] = None,
) -> Tuple[dict, dict]:
"""Load constant data from metadata in the
retrieved_constants dictionary.
:param retrieved_constants: A dict. with the constant filepaths and
dataset-name to read the constant data arrays.
{
'Constant Name': {
'file-path': '/gpfs/.../*.h5',
'dataset-name': '/module_name/...',
'creation-time': str(datetime),},
}
:param empty_constants: A dict of constant names keys and
the empty constant array to use in case of not non-retrieved constants.
:return constant_data: A dict of constant names keys and their data.
"""
const_data = dict()
when = dict()
for cname, mdata in retrieved_constants.items():
const_data[cname] = dict()
when[cname] = mdata["creation-time"]
if when[cname]:
with h5py.File(mdata["file-path"], "r") as cf:
const_data[cname] = np.copy(
cf[f"{mdata['dataset-name']}/data"])
else:
const_data[cname] = (
empty_constants[cname] if empty_constants else None)
return const_data, when
def write_constants_fragment(
out_folder: Path,
det_metadata: dict,
caldb_root: Path,
):
"""Record calibration constants metadata to a fragment file.
Args:
out_folder (Path): The output folder to store the fragment file.
det_metadata (dict): A dictionary with the desired detector metadata.
{karabo_da: {constant_name: metadata}}
caldb_root (Path): The calibration database root path for constant files.
"""
metadata = {"retrieved-constants": {}}
for karabo_da, const_metadata in det_metadata.items():
mod_metadata = {}
mod_metadata["constants"] = {
cname: {
"path": str(caldb_root / ccv_metadata["path"]),
"dataset": ccv_metadata["dataset"],
"creation-time": ccv_metadata["begin_validity_at"],
"ccv_id": ccv_metadata["ccv_id"],
} for cname, ccv_metadata in const_metadata.items()
}
mod_metadata["physical-name"] = list(
const_metadata.values())[0]["physical_name"]
metadata["retrieved-constants"][karabo_da] = mod_metadata
CalibrationMetadata(out_folder).add_fragment(metadata)
def write_compressed_frames(
arr: np.ndarray,
ofile: h5py.File,
dataset_path: str,
comp_threads: int = 1):
"""Compress gain/mask frames in multiple threads, and save their data
This is significantly faster than letting HDF5 do the compression
in a single thread.
"""
def _compress_frame(idx):
# Equivalent to the HDF5 'shuffle' filter: transpose bytes for better
# compression.
shuffled = np.ascontiguousarray(
arr[idx].view(np.uint8).reshape((-1, arr.itemsize)).transpose()
)
return idx, zlib.compress(shuffled, level=1)
# gain/mask compressed with gzip level 1, but not
# checksummed as we would have to implement this.
dataset = ofile.create_dataset(
dataset_path,
shape=arr.shape,
chunks=((1,) + arr.shape[1:]),
compression="gzip",
compression_opts=1,
shuffle=True,
dtype=arr.dtype,
)
with ThreadPool(comp_threads) as pool:
for i, compressed in pool.imap(_compress_frame, range(len(arr))):
# Each frame is 1 complete chunk
chunk_start = (i,) + (0,) * (dataset.ndim - 1)
dataset.id.write_direct_chunk(chunk_start, compressed)
return dataset
def reorder_axes(a, from_order, to_order):
"""Rearrange axes of array a from from_order to to_order
This does the same as np.transpose(), but making the before & after axes
more explicit. from_order is a sequence of strings labelling the axes of a,
and to_order is a similar sequence for the axes of the result.
"""
assert len(from_order) == a.ndim
assert sorted(from_order) == sorted(to_order)
from_order = list(from_order)
order = tuple([from_order.index(lbl) for lbl in to_order])
return a.transpose(order)