Skip to content
Snippets Groups Projects
Commit 4922b975 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'feat/metadata-fragments' into 'master'

Support for saving metadata fragments & merging into calibration_metadata.yml

See merge request !781
parents e73a5bc9 f17abcf1
No related branches found
No related tags found
1 merge request!781Support for saving metadata fragments & merging into calibration_metadata.yml
...@@ -10,6 +10,7 @@ from os import environ, listdir, path ...@@ -10,6 +10,7 @@ from os import environ, listdir, path
from os.path import isfile from os.path import isfile
from pathlib import Path from pathlib import Path
from queue import Queue from queue import Queue
from tempfile import NamedTemporaryFile
from time import sleep from time import sleep
from typing import List, Optional, Tuple, Union from typing import List, Optional, Tuple, Union
from urllib.parse import urljoin from urllib.parse import urljoin
...@@ -811,6 +812,24 @@ def module_index_to_qm(index: int, total_modules: int = 16): ...@@ -811,6 +812,24 @@ def module_index_to_qm(index: int, total_modules: int = 16):
return f"Q{quad+1}M{mod+1}" return f"Q{quad+1}M{mod+1}"
def recursive_update(target: dict, source: dict):
"""Recursively merge source into target, checking for conflicts
Conflicting entries will not be copied to target. Returns True if any
conflicts were found.
"""
conflict = False
for k, v2 in source.items():
v1 = target.get(k, None)
if isinstance(v1, dict) and isinstance(v2, dict):
conflict = recursive_update(v1, v2) or conflict
elif (v1 is not None) and (v1 != v2):
conflict = True
else:
target[k] = v2
return conflict
class CalibrationMetadata(dict): class CalibrationMetadata(dict):
"""Convenience class: dictionary stored in metadata YAML file """Convenience class: dictionary stored in metadata YAML file
...@@ -847,6 +866,34 @@ class CalibrationMetadata(dict): ...@@ -847,6 +866,34 @@ class CalibrationMetadata(dict):
with (copy_dir / self._yaml_fn.name).open("w") as fd: with (copy_dir / self._yaml_fn.name).open("w") as fd:
yaml.safe_dump(dict(self), fd) yaml.safe_dump(dict(self), fd)
def add_fragment(self, data: dict):
"""Save metadata to a separate 'fragment' file to be merged later
Avoids a risk of corrupting the main file by writing in parallel.
"""
prefix = f"metadata_frag_j{os.environ.get('SLURM_JOB_ID', '')}_"
with NamedTemporaryFile("w", dir=self._yaml_fn.parent,
prefix=prefix, suffix='.yml', delete=False) as fd:
yaml.safe_dump(data, fd)
def gather_fragments(self):
"""Merge in fragments saved by add_fragment(), then delete them"""
frag_files = list(self._yaml_fn.parent.glob('metadata_frag_*.yml'))
to_delete = []
for fn in frag_files:
with fn.open("r") as fd:
data = yaml.safe_load(fd)
if recursive_update(self, data):
print(f"{fn} contained conflicting metadata. "
f"This file will be left for debugging")
else:
to_delete.append(fn)
self.save()
for fn in to_delete:
fn.unlink()
def save_constant_metadata( def save_constant_metadata(
retrieved_constants: dict, retrieved_constants: dict,
......
...@@ -379,6 +379,7 @@ def finalize(joblist, finaljob, cal_work_dir, out_path, version, title, author, ...@@ -379,6 +379,7 @@ def finalize(joblist, finaljob, cal_work_dir, out_path, version, title, author,
if finaljob: if finaljob:
joblist.append(str(finaljob)) joblist.append(str(finaljob))
metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir) metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir)
metadata.gather_fragments()
job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',') job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',')
job_time_summary = get_job_info(joblist, job_time_fmt) job_time_summary = get_job_info(joblist, job_time_fmt)
......
...@@ -19,6 +19,7 @@ from cal_tools.tools import ( ...@@ -19,6 +19,7 @@ from cal_tools.tools import (
map_seq_files, map_seq_files,
module_index_to_qm, module_index_to_qm,
send_to_db, send_to_db,
recursive_update,
) )
# AGIPD operating conditions. # AGIPD operating conditions.
...@@ -471,3 +472,15 @@ def test_module_index_to_qm(): ...@@ -471,3 +472,15 @@ def test_module_index_to_qm():
with pytest.raises(AssertionError): with pytest.raises(AssertionError):
module_index_to_qm(7, 5) module_index_to_qm(7, 5)
def test_recursive_update():
tgt = {"a": {"b": 1}, "c": 2}
src = {"a": {"d": 3}, "e": 4}
assert recursive_update(tgt, src) is False
assert tgt == {"a": {"b": 1, "d": 3}, "c": 2, "e": 4}
tgt = {"a": {"b": 1}, "c": 2}
src = {"a": {"b": 3}, "e": 4}
assert recursive_update(tgt, src) is True
assert tgt == {"a": {"b": 1}, "c": 2, "e": 4}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment