Skip to content
Snippets Groups Projects

Simplify reduction operation implementations and error handling

Merged Philipp Schmidt requested to merge feat/error-handling into feat/compress-keys
@@ -68,6 +68,8 @@ class ReduceWriter(SourceDataWriter):
raise ReduceInitError('Currently input files are required to be '
'EXDF-v1.0+')
self.log.debug(f'Input data EXDF version {input_version}')
if version == 'same':
version = input_version
else:
@@ -135,6 +137,10 @@ class ReduceWriter(SourceDataWriter):
if not self._sources:
raise ReduceInitError('Reduction operations and output scope '
'yield an empty dataset')
else:
self.log.debug(
f'Sources being modified: {sorted(self._touched_sources)}')
self.log.debug(f'Sources included in output: {self._sources}')
def _filter_ops(self, op):
return [args[1:] for args in self._ops if args[0] == op]
@@ -302,10 +308,12 @@ class ReduceWriter(SourceDataWriter):
# Keys are guaranteed to never use both custom chunking and
# compression.
if (source, key) in self._rechunked_keys:
sourcekey = source, key
if sourcekey in self._rechunked_keys:
orig_chunks = kwargs['chunks']
chunks = list(self._rechunked_keys[source, key])
chunks = list(self._rechunked_keys[sourcekey])
assert len(chunks) == len(orig_chunks)
for i, dim_len in enumerate(chunks):
@@ -318,14 +326,14 @@ class ReduceWriter(SourceDataWriter):
kwargs['chunks'] = tuple(chunks)
elif (source, key) in self._compressed_keys or orig_dset.compression:
elif sourcekey in self._compressed_keys or orig_dset.compression:
# TODO: Maintain more of existing properties, for now it is
# forced to use gzip and (1, *entry) chunking.
kwargs['chunks'] = (1,) + kwargs['shape'][1:]
kwargs['shuffle'] = True
kwargs['compression'] = 'gzip'
kwargs['compression_opts'] = self._compressed_keys.setdefault(
(source, key), orig_dset.compression_opts)
sourcekey, orig_dset.compression_opts)
return kwargs
@@ -383,37 +391,46 @@ class ReduceWriter(SourceDataWriter):
@apply_by_source('remove-sources')
def _handle_remove_sources(self, source):
self._touched_sources.add(source)
self.log.debug(f'Removing {source}')
@apply_by_key('remove-keys')
def _handle_remove_keys(self, source, key):
self._custom_keys[source].remove(key)
self.log.debug(f'Removing {source}, {key}')
@apply_by_source('select-trains')
def _handle_select_trains(self, source, train_sel):
self._custom_trains[source] = select_train_ids(
self._custom_trains.setdefault(source, list(self._data.train_ids)),
train_sel)
self.log.debug(f'Selecting {len(self._custom_trains[source])} trains '
f'for {source}')
@apply_by_source('select-entries')
def _handle_select_entries(self, source, idx_group, train_sel, entry_sel):
if idx_group not in self._data[source].index_groups:
raise ReduceInitError(f'{idx_group} not index group of {source}')
raise ReduceInitError(
f'{idx_group} not an index group of {source}')
self._custom_entry_masks.setdefault((source, idx_group), {}).update(
self._get_entry_masks(source, idx_group, train_sel, entry_sel))
self.log.debug(f'Applying entry selection to {source}, {idx_group}')
@apply_by_source('select-xtdf')
def _handle_select_xtdf(self, source, train_sel, entry_sel):
if not source.endswith(':xtdf'):
# Simply ignore matches without trailing :xtdf.
self.log.warning(
f'Ignoring non-XTDF source {source} based on name')
return
if not self._is_xtdf_source(source):
# Raise exception if essentials are missing.
raise ReduceInitError(f'{source} is not a valid XTDF source')
self.log.warning(
f'Ignoring non-XTDF source {source} based on structure')
return
self._custom_xtdf_masks.setdefault(source, {}).update(
self._get_entry_masks(source, 'image', train_sel, entry_sel))
self.log.debug(f'Applying XTDF selection to {source}')
@apply_by_key('rechunk-keys')
def _handle_rechunk_keys(self, source, key, chunking):
@@ -429,11 +446,14 @@ class ReduceWriter(SourceDataWriter):
f'{source}.{key}: {old_chunking}, {chunking}')
self._rechunked_keys[(source, key)] = chunking
self.log.debug(f'Rechunking {source}, {key} to {chunking}')
@apply_by_key('subslice-keys')
def _handle_subslice_keys(self, source, key, region):
self._subsliced_keys.setdefault((source, key), []).append(region)
self.log.debug(f'Subslicing {region} of {source}, {key}')
@apply_by_key('compress-keys')
def _handle_compress_keys(self, source, key, level):
self._compressed_keys[source, key] = level
self.log.debug(f'Compressing {source}, {key}')
Loading