From 6075f80293792a0845914a277197c24b18733e57 Mon Sep 17 00:00:00 2001 From: Egor Sobolev <egor.sobolev@xfel.eu> Date: Fri, 7 Mar 2025 14:44:09 +0100 Subject: [PATCH 1/2] Support legacy sources --- src/exdf/write/datafile.py | 28 ++++++++++++++++++++++++++++ src/exdf/write/sd_writer.py | 32 +++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/exdf/write/datafile.py b/src/exdf/write/datafile.py index 82c2cab..b72537a 100644 --- a/src/exdf/write/datafile.py +++ b/src/exdf/write/datafile.py @@ -324,6 +324,34 @@ class DataFile(h5py.File): return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id, source) + def create_legacy_source(self, legacy_source, target_source): + """Create a legacy source. + + A legacy source allows to access an instrument source under a + different name, primarily for the purpose of backwards + compatibility. It inserts soft links under the legacy name in + the INSTRUMENT and INDEX groups, pointing to the respective + groups of this source. + + Args: + legacy_source (str): Legacy source. + target_source (str or InstrumentSource): Target source. + + Returns: + None + """ + + if isinstance(target_source, InstrumentSource): + target_source = target_source.name[1:].partition('/')[2] + elif not isinstance(target_source, str): + raise ValueError('target_source must be str or InstrumentSource') + + self.file[f'/INSTRUMENT/{legacy_source}'] = h5py.SoftLink( + f'/INSTRUMENT/{target_source}') + self.file[f'/INDEX/{legacy_source}'] = h5py.SoftLink( + f'/INDEX/{target_source}') + self.__instrument_sources.add(legacy_source) + def create_metadata(self, like=None, *, creation_date=None, update_date=None, proposal=0, run=0, sequence=None, data_format_version='1.2', diff --git a/src/exdf/write/sd_writer.py b/src/exdf/write/sd_writer.py index a02691b..a53769e 100644 --- a/src/exdf/write/sd_writer.py +++ b/src/exdf/write/sd_writer.py @@ -123,7 +123,8 @@ class SourceDataWriter: after_control = perf_counter() self.write_instrument( - f, [sd for sd in sources if sd.is_instrument]) + f, [sd for sd in sources + if sd.is_instrument and not sd.is_legacy]) after_instrument = perf_counter() after_close = perf_counter() @@ -150,6 +151,7 @@ class SourceDataWriter: train_ids, *index_dsets = get_index_root_data(sources) control_indices, instrument_indices = build_sources_index(sources) + legacy_sources, legacy_source_channels = get_legacy_sources(sources) f.create_metadata( like=sources[0], @@ -159,7 +161,9 @@ class SourceDataWriter: instrument_channels=[ f'{source}/{index_group}' for source, index_group_counts in instrument_indices.items() - for index_group in index_group_counts.keys()]) + for index_group in index_group_counts.keys() + ] + legacy_source_channels + ) f.create_dataset('METADATA/dataWriter', data=b'exdf-tools', shape=(1,)) if not self.with_origin(): @@ -176,6 +180,9 @@ class SourceDataWriter: instrument_src = f.create_instrument_source(source) instrument_src.create_index(**index_group_counts) + for source in legacy_sources: + f.create_legacy_source(source.source, source.canonical_name) + def write_control(self, f, sources): """Write CONTROL and RUN data. @@ -351,11 +358,15 @@ def get_index_root_data(sources): # Collect train IDs for this sequence. train_ids = np.zeros(0, dtype=np.uint64) for sd in sources: + if sd.is_legacy: + continue train_ids = np.union1d(train_ids, sd.train_ids) # Collect input files by index keys (source / index_group). files_by_index_keys = {} for sd in sources: + if sd.is_legacy: + continue for key in sd.keys(): kd = sd[key] index_key = f'{sd.source}/{kd.index_group}' @@ -458,7 +469,7 @@ def build_sources_index(sources): for sd in sources: if sd.is_control: control_indices[sd.source] = sd.data_counts(labelled=False) - else: + elif not sd.is_legacy: instrument_indices[sd.source] = { grp: sd.data_counts(labelled=False, index_group=grp) for grp in sd.index_groups} @@ -466,6 +477,21 @@ def build_sources_index(sources): return control_indices, instrument_indices +def get_legacy_sources(sources): + legacy_sources = [] + channels = [] + for source in sources: + if not source.is_legacy: + continue + legacy_sources.append(source) + if source.source.endswith(":xtdf"): + channels.append(source.source + "/image") + else: + channels.extend( + (f"{source.source}/{grp}" in source.index_groups)) + return legacy_sources, channels + + def get_key_attributes(sd): if sd.is_control: section = 'RUN' -- GitLab From 2019d0665e5de73f40a8635a39b35f8a34fb105d Mon Sep 17 00:00:00 2001 From: Egor Sobolev <egor.sobolev@xfel.eu> Date: Tue, 11 Mar 2025 14:53:08 +0100 Subject: [PATCH 2/2] Apply suggestions: style and legacy control data protection Co-authored-by: Philipp Schmidt <philipp.schmidt@xfel.eu> --- src/exdf/write/sd_writer.py | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/src/exdf/write/sd_writer.py b/src/exdf/write/sd_writer.py index a53769e..c371da9 100644 --- a/src/exdf/write/sd_writer.py +++ b/src/exdf/write/sd_writer.py @@ -119,7 +119,8 @@ class SourceDataWriter: after_base = perf_counter() self.write_control( - f, [sd for sd in sources if sd.is_control]) + f, [sd for sd in sources + if sd.is_control and not sd.is_legacy]) after_control = perf_counter() self.write_instrument( @@ -180,8 +181,8 @@ class SourceDataWriter: instrument_src = f.create_instrument_source(source) instrument_src.create_index(**index_group_counts) - for source in legacy_sources: - f.create_legacy_source(source.source, source.canonical_name) + for sd in legacy_sources: + f.create_legacy_source(sd.source, sd.canonical_name) def write_control(self, f, sources): """Write CONTROL and RUN data. @@ -480,15 +481,18 @@ def build_sources_index(sources): def get_legacy_sources(sources): legacy_sources = [] channels = [] - for source in sources: - if not source.is_legacy: + for sd in sources: + if not sd.is_legacy: continue - legacy_sources.append(source) - if source.source.endswith(":xtdf"): - channels.append(source.source + "/image") + if sd.is_control: + raise ValueError( + "Legacy source name is not supported for CONTROL data") + legacy_sources.append(sd) + if sd.source.endswith(":xtdf"): + channels.append(sd.source + "/image") else: channels.extend( - (f"{source.source}/{grp}" in source.index_groups)) + (f"{sd.source}/{grp}" for grp in sd.index_groups)) return legacy_sources, channels -- GitLab