From a05cdf3e089af2acdec85825f4b22738be1c9967 Mon Sep 17 00:00:00 2001 From: Philipp Schmidt <philipp.schmidt@xfel.eu> Date: Wed, 29 Nov 2023 14:21:18 +0100 Subject: [PATCH] Rename rows to entries --- src/exdf/data_reduction/method.py | 28 +++++++------- src/exdf/data_reduction/writer.py | 64 +++++++++++++++++-------------- src/exdf/write/sd_writer.py | 2 +- 3 files changed, 50 insertions(+), 44 deletions(-) diff --git a/src/exdf/data_reduction/method.py b/src/exdf/data_reduction/method.py index fd75a00..f5ce700 100644 --- a/src/exdf/data_reduction/method.py +++ b/src/exdf/data_reduction/method.py @@ -9,7 +9,7 @@ from extra_data.read_machinery import select_train_ids log = getLogger('exdf.data_reduction.ReductionMethod') train_sel = TypeVar('train_sel') -row_sel = TypeVar('row_sel') +entry_sel = TypeVar('entry_sel') index_exp = TypeVar('index_exp') @@ -24,7 +24,7 @@ def is_train_selection(x): return True -def is_row_selection(x): +def is_entry_selection(x): if isinstance(x, slice): return True @@ -72,42 +72,42 @@ class ReductionMethod(list): assert is_train_selection(trains) self._emit('select-trains', source_glob, trains) - def select_rows( + def select_entries( self, source_glob: str, index_group: str, trains: train_sel, - rows: row_sel + entries: entry_sel ): assert isinstance(source_glob, str) assert isinstance(index_group, str) assert is_train_selection(trains) - assert is_row_selection(rows) - self._emit('select-rows', source_glob, index_group, trains, rows) + assert is_entry_selection(entries) + self._emit('select-entries', source_glob, index_group, trains, entries) def select_xtdf( self, source_glob: str, trains: train_sel, - rows: row_sel + entries: entry_sel ): - """Slice XTDF data by row. + """Slice XTDF data by entry. - Roughly equivalent to select_rows(source_glob, 'image', - train_sel, row_sel), but only acts on XTDF sources and modifies + Roughly equivalent to select_entries(source_glob, 'image', + trains, entries), but only acts on XTDF sources and modifies header data structures according to slicing. Requires sources to end with :xtdf and have all XTDF keys. Args: source_glob (str): Source glob pattern. - train_sel (train_sel): Train selection. - row_sel (row_sel): Row selection. + trains (train_sel): Train selection. + entries (entry_sel): Entry selection. """ assert isinstance(source_glob, str) assert is_train_selection(trains) - assert is_row_selection(rows) - self._emit('select-xtdf', source_glob, trains, rows) + assert is_entry_selection(entries) + self._emit('select-xtdf', source_glob, trains, entries) def remove_sources( self, diff --git a/src/exdf/data_reduction/writer.py b/src/exdf/data_reduction/writer.py index 005f7dc..27a50ff 100644 --- a/src/exdf/data_reduction/writer.py +++ b/src/exdf/data_reduction/writer.py @@ -54,7 +54,7 @@ class ReduceWriter(SourceDataWriter): self._custom_trains = {} # source -> list(<trains>) self._custom_xtdf_masks = {} # source -> dict(train_id -> mask) self._custom_xtdf_counts = {} # source -> ndarray - self._custom_rows = {} # source -> dict(train_id -> mask) + self._custom_entry_masks = {} # source -> dict(train_id -> mask) self._rechunked_keys = {} # (source, key) -> chunks self._partial_copies = {} # (source, key) -> list(<regions>) @@ -86,20 +86,24 @@ class ReduceWriter(SourceDataWriter): self._custom_trains[source] = select_train_ids( train_ids, train_sel) - for source_glob, index_group, train_sel, row_sel in self._filter_ops( - 'select-rows' + for source_glob, index_group, train_sel, entry_sel in self._filter_ops( + 'select-entries' ): for source in fnmatch.filter(self._sources, source_glob): if index_group not in self._data[source].index_groups: raise ValueError(f'{index_group} not index group of ' f'{source}') + new_mask = self._get_entry_masks( + source, index_group, train_sel, entry_sel) + self._touched_sources.add(source) - self._custom_rows.setdefault((source, index_group), {}).update( - self._get_row_masks(source, index_group, - train_sel, row_sel)) + self._custom_entry_masks.setdefault( + (source, index_group), {}).update(new_mask) - for source_glob, train_sel, row_sel in self._filter_ops('select-xtdf'): + for source_glob, train_sel, entry_sel in self._filter_ops( + 'select-xtdf' + ): for source in fnmatch.filter(self._sources, source_glob): if not source.endswith(':xtdf'): # Simply ignore matches without trailing :xtdf. @@ -109,16 +113,18 @@ class ReduceWriter(SourceDataWriter): # Raise exception if essentials are missing. raise ValueError(f'{source} is not a valid XTDF source') + new_mask = self._get_entry_masks( + source, 'image', train_sel, entry_sel) + self._touched_sources.add(source) - self._custom_xtdf_masks.setdefault(source, {}).update( - self._get_row_masks(source, 'image', train_sel, row_sel)) + self._custom_xtdf_masks.setdefault(source, {}).update(new_mask) if ( - {x[0] for x in self._custom_rows.keys()} & + {x[0] for x in self._custom_entry_masks.keys()} & self._custom_xtdf_masks.keys() ): - raise ValueError('source may not be affected by both select-rows ' - 'and select-xtdf operations') + raise ValueError('source may not be affected by both ' + 'select-entries and select-xtdf operations') for source_glob, key_glob, chunking in self._filter_ops( 'rechunk-keys' @@ -179,7 +185,7 @@ class ReduceWriter(SourceDataWriter): def _is_xtdf_source(self, source): return self._data[source].keys() > {'header.pulseCount', 'image.data'} - def _get_row_masks(self, source, index_group, train_sel, row_sel): + def _get_entry_masks(self, source, index_group, train_sel, entry_sel): train_ids = select_train_ids( self._custom_trains.get(source, list(self._data.train_ids)), train_sel) @@ -187,27 +193,27 @@ class ReduceWriter(SourceDataWriter): .data_counts(index_group=index_group) masks = {} - if isinstance(row_sel, slice): + if isinstance(entry_sel, slice): for train_id, count in counts.items(): if count > 0: masks[train_id] = np.zeros(count, dtype=bool) - masks[train_id][row_sel] = True + masks[train_id][entry_sel] = True - elif np.issubdtype(type(row_sel[0]), np.integer): - max_row = max(row_sel) + elif np.issubdtype(type(entry_sel[0]), np.integer): + max_entry = max(entry_sel) for train_id, count in counts.items(): if count == 0: continue - elif max_row >= count: + elif max_entry >= count: raise ValueError( - f'row index exceeds data counts of train {train_id}') + f'entry index exceeds data counts of train {train_id}') masks[train_id] = np.zeros(count, dtype=bool) - masks[train_id][row_sel] = True + masks[train_id][entry_sel] = True - elif np.issubdtype(type(row_sel[0]), bool): - mask_len = len(row_sel) + elif np.issubdtype(type(entry_sel[0]), bool): + mask_len = len(entry_sel) for train_id, count in counts.items(): if count == 0: @@ -216,10 +222,10 @@ class ReduceWriter(SourceDataWriter): raise ValueError( f'mask length mismatch for train {train_id}') - masks[train_id] = row_sel + masks[train_id] = entry_sel else: - raise ValueError('unknown row mask format') + raise ValueError('unknown entry mask format') return masks @@ -353,10 +359,10 @@ class ReduceWriter(SourceDataWriter): def mask_instrument_data(self, source, index_group, train_ids, counts): if source in self._custom_xtdf_masks and index_group == 'image': custom_masks = self._custom_xtdf_masks[source] - elif (source, index_group) in self._custom_rows: - custom_masks = self._custom_rows[source, index_group] + elif (source, index_group) in self._custom_entry_masks: + custom_masks = self._custom_entry_masks[source, index_group] else: - return # None efficiently selects all rows. + return # None efficiently selects all entries. masks = [] @@ -370,8 +376,8 @@ class ReduceWriter(SourceDataWriter): if source in self._custom_xtdf_masks: # Sources are guaranteed to never use both XTDF and general - # row slicing. In the XTDF case, the new data counts for the - # image index group must be determined to be filled into + # entry slicing. In the XTDF case, the new data counts for + # the image index group must be determined to be filled into # the respective header field. self._custom_xtdf_counts[source] = { diff --git a/src/exdf/write/sd_writer.py b/src/exdf/write/sd_writer.py index d9354af..0d04245 100644 --- a/src/exdf/write/sd_writer.py +++ b/src/exdf/write/sd_writer.py @@ -221,7 +221,7 @@ class SourceDataWriter: # been introduced in this sequence. train_ids = np.array(f['INDEX/trainId']) - # Stores mask for each row per index group. + # Stores mask for each entry per index group. masks = {} for sd in sources: -- GitLab