diff --git a/src/exdf/cli/reduce.py b/src/exdf/cli/reduce.py index be4d7d21af5c664cb99c9613f89e524a62a1baff..dcef61342c9bb7c98b1fde4728ee8e478f7056e7 100644 --- a/src/exdf/cli/reduce.py +++ b/src/exdf/cli/reduce.py @@ -12,7 +12,8 @@ import sys from pkg_resources import iter_entry_points from extra_data import RunDirectory, open_run -from ..data_reduction.writer import ReduceWriter +from ..data_reduction.writer import ReduceWriter, \ + InputDataError, ReduceMethodError def _parse_args(argv): @@ -212,9 +213,18 @@ def main(argv=None): if args.to_recipe: _to_recipe(args.to_recipe, methods, inp_data, argv) - writer = ReduceWriter( - inp_data, methods, - args.output_scope, args.output_sequence_len, args.output_version) + try: + writer = ReduceWriter(inp_data, methods, args.output_scope, + args.output_sequence_len, args.output_version) + except InputDataError as e: + log.error(str(e)) + log.critical('Failed to initialize reduction writer for this input') + return + except ReduceMethodError as e: + log.error(str(e)) + log.critical('Failed to initialize reduction writer for generated ' + 'reduction operations') + return if args.output_scope == 'none': log.info('Not writing out any data files') diff --git a/src/exdf/data_reduction/writer.py b/src/exdf/data_reduction/writer.py index 37cf682c534a646677fcc897924bac2da6004e4f..e095fbda45b2555ac8da9a0567f5f1d4ae83bd73 100644 --- a/src/exdf/data_reduction/writer.py +++ b/src/exdf/data_reduction/writer.py @@ -43,6 +43,14 @@ def apply_by_key(op_name): return op_decorator +class InputDataError(ValueError): + pass + + +class ReduceMethodError(ValueError): + pass + + class ReduceWriter(SourceDataWriter): log = logging.getLogger('exdf.data_reduction.ReduceWriter') @@ -57,8 +65,8 @@ class ReduceWriter(SourceDataWriter): input_version = Version(metadata.get('dataFormatVersion', '1.0')) if input_version < Version('1.0'): - raise ValueError('Currently input files are required to be ' - 'EXDF-v1.0+') + raise InputDataError('Currently input files are required ' + 'to be EXDF-v1.0+') if version == 'same': version = input_version @@ -68,7 +76,8 @@ class ReduceWriter(SourceDataWriter): try: self.run_number = int(metadata['runNumber']) except KeyError: - raise ValueError('runNumber dataset required in input METADATA') + raise InputDataError('runNumber dataset required to be present ' + 'in input METADATA') self._ops = sum(methods.values(), []) @@ -104,14 +113,14 @@ class ReduceWriter(SourceDataWriter): custom_entry_sources = {x[0] for x in self._custom_entry_masks.keys()} if custom_entry_sources & self._custom_xtdf_masks.keys(): - raise ValueError( + raise ReduceMethodError( 'Source may not be affected by both select-entries and ' 'select-xtdf operations') if self._rechunked_keys.keys() & self._compressed_keys.keys(): - raise ValueError('Key may not be affected by both ' - 'compress-keys and rechunk-keys') + raise ReduceMethodError('Key may not be affected by both ' + 'compress-keys and rechunk-keys') if self._scope == 'sources': self._sources = sorted( @@ -126,8 +135,8 @@ class ReduceWriter(SourceDataWriter): if (self._data[source].aggregator in touched_aggregators)}) if not self._sources: - raise ValueError('reduction sequence yields empty source ' - 'selection') + raise ReduceMethodError('Reduction operations and output ' + 'scope yield an empty dataset') def _filter_ops(self, op): return [args[1:] for args in self._ops if args[0] == op] @@ -156,8 +165,8 @@ class ReduceWriter(SourceDataWriter): if count == 0: continue elif max_entry >= count: - raise ValueError( - f'entry index exceeds data counts of train {train_id}') + raise ReduceMethodError(f'Entry index exceeds data ' + f'counts of train {train_id}') masks[train_id] = np.zeros(count, dtype=bool) masks[train_id][entry_sel] = True @@ -169,13 +178,13 @@ class ReduceWriter(SourceDataWriter): if count == 0: continue elif mask_len != counts.get(train_id, 0): - raise ValueError( - f'mask length mismatch for train {train_id}') + raise ReduceMethodError(f'Mask length mismatch for ' + f'train {train_id}') masks[train_id] = entry_sel else: - raise ValueError('unknown entry mask format') + raise ReduceMethodError('Unknown entry mask format') return masks @@ -196,7 +205,7 @@ class ReduceWriter(SourceDataWriter): @apply_by_source('select-entries') def _handle_select_entries(self, source, idx_group, train_sel, entry_sel): if idx_group not in self._data[source].index_groups: - raise ValueError(f'{idx_group} not index group of {source}') + raise ReduceMethodError(f'{idx_group} not index group of {source}') self._custom_entry_masks.setdefault((source, idx_group), {}).update( self._get_entry_masks(source, idx_group, train_sel, entry_sel)) @@ -209,7 +218,7 @@ class ReduceWriter(SourceDataWriter): if not self._is_xtdf_source(source): # Raise exception if essentials are missing. - raise ValueError(f'{source} is not a valid XTDF source') + raise ReduceMethodError(f'{source} is not a valid XTDF source') self._custom_xtdf_masks.setdefault(source, {}).update( self._get_entry_masks(source, 'image', train_sel, entry_sel)) @@ -223,7 +232,7 @@ class ReduceWriter(SourceDataWriter): old_chunking = self._rechunked_keys.setdefault((source, key), chunking) if old_chunking != chunking: - raise ValueError( + raise ReduceMethodError( f'Reduction sequence yields conflicting chunks for ' f'{source}.{key}: {old_chunking}, {chunking}')