diff --git a/src/exdf/cli/reduce.py b/src/exdf/cli/reduce.py
index 0a192bb6d365396b07ebf0600d1a76527e654b15..be4d7d21af5c664cb99c9613f89e524a62a1baff 100644
--- a/src/exdf/cli/reduce.py
+++ b/src/exdf/cli/reduce.py
@@ -12,7 +12,7 @@ import sys
 from pkg_resources import iter_entry_points
 from extra_data import RunDirectory, open_run
 
-from ..data_reduction.red_writer import ReduceWriter, ReduceInitError
+from ..data_reduction.writer import ReduceWriter
 
 
 def _parse_args(argv):
@@ -212,12 +212,9 @@ def main(argv=None):
     if args.to_recipe:
         _to_recipe(args.to_recipe, methods, inp_data, argv)
 
-    try:
-        writer = ReduceWriter(inp_data, methods, args.output_scope,
-                              args.output_sequence_len, args.output_version)
-    except ReduceInitError:
-        log.critical('Failed to initialize reduction writer')
-        return
+    writer = ReduceWriter(
+        inp_data, methods,
+        args.output_scope, args.output_sequence_len, args.output_version)
 
     if args.output_scope == 'none':
         log.info('Not writing out any data files')
diff --git a/src/exdf/data_reduction/builtins.py b/src/exdf/data_reduction/builtins.py
index 965f40197df9b1fb39d6f1ff9774a616b7799119..a9f3e8884f02d34d34b95bda87fa19f20739e971 100644
--- a/src/exdf/data_reduction/builtins.py
+++ b/src/exdf/data_reduction/builtins.py
@@ -199,8 +199,7 @@ class AgipdGain(ReductionMethod):
                                'explicit gain sources')
                 return
 
-            self.log.info(f'No detector specified, detected {domain} '
-                          f'automatically')
+            self.log.info(f'Found {domain}')
             agipd_sources = f'{domain}/DET/*CH0:xtdf'
 
         else:
@@ -215,8 +214,6 @@ class AgipdGain(ReductionMethod):
                         # Unfortunately HED uses a different domain for
                         # its AGIPD control devices, hardcode it here
                         # for convenience.
-                        # TODO: This could also be solved via a CalCat
-                        # query.
                         domain = domain.replace('_DET_', '_EXP_')
 
                     control_source = f'{domain}/MDL/FPGA_COMP'
diff --git a/src/exdf/data_reduction/red_writer.py b/src/exdf/data_reduction/writer.py
similarity index 63%
rename from src/exdf/data_reduction/red_writer.py
rename to src/exdf/data_reduction/writer.py
index 190c9cf15064ac4e7acbfab8f808a646a8e75fea..d25422de21179a8b0c27baa17e8dd78c5bca142c 100644
--- a/src/exdf/data_reduction/red_writer.py
+++ b/src/exdf/data_reduction/writer.py
@@ -14,43 +14,6 @@ from exdf.write import SourceDataWriter
 from ..write.datafile import write_compressed_frames
 
 
-def apply_by_source(op_name):
-    def op_decorator(op_func):
-        def op_handler(self):
-            assert isinstance(self, ReduceWriter)
-            for source_glob, *args in self._filter_ops(op_name):
-                for source in fnmatch.filter(self._sources, source_glob):
-                    op_func(self, source, *args)
-                    self._touched_sources.add(source)
-
-        return op_handler
-    return op_decorator
-
-
-def apply_by_key(op_name):
-    def op_decorator(op_func):
-        def op_handler(self):
-            assert isinstance(self, ReduceWriter)
-            for source_glob, key_glob, *args in self._filter_ops(op_name):
-                for source in fnmatch.filter(self._sources, source_glob):
-                    keys = self._custom_keys.get(
-                        source, set(self._data[source].keys()))
-
-                    for key in fnmatch.filter(keys, key_glob):
-                        op_func(source, key, *args)
-
-                    self._touched_sources.add(source)
-
-        return op_handler
-    return op_decorator
-
-
-class ReduceInitError(RuntimeError):
-    def __init__(self, msg):
-        super().__init__(msg)
-        ReduceWriter.log.error(msg)
-
-
 class ReduceWriter(SourceDataWriter):
     log = logging.getLogger('exdf.data_reduction.ReduceWriter')
 
@@ -65,10 +28,8 @@ class ReduceWriter(SourceDataWriter):
         input_version = Version(metadata.get('dataFormatVersion', '1.0'))
 
         if input_version < Version('1.0'):
-            raise ReduceInitError('Currently input files are required to be '
-                                  'EXDF-v1.0+')
-
-        self.log.debug(f'Input data EXDF version {input_version}')
+            raise ValueError('Currently input files are required to be '
+                             'EXDF-v1.0+')
 
         if version == 'same':
             version = input_version
@@ -78,8 +39,7 @@ class ReduceWriter(SourceDataWriter):
         try:
             self.run_number = int(metadata['runNumber'])
         except KeyError:
-            raise ReduceInitError('runNumber dataset required to be present '
-                                  'in input METADATA')
+            raise ValueError('runNumber dataset required in input METADATA')
 
         self._ops = sum(methods.values(), [])
 
@@ -90,7 +50,7 @@ class ReduceWriter(SourceDataWriter):
         self._sources = sorted(data.all_sources)
         self._touched_sources = set()
 
-        # Only populated for sources/keys that are modified.
+        # Only populated if trains/keys are selected/removed for sources.
         self._custom_keys = {}  # source -> set(<keys>)
         self._custom_trains = {}  # source -> list(<trains>)
         self._custom_xtdf_masks = {}  # source -> dict(train_id -> mask)
@@ -100,27 +60,124 @@ class ReduceWriter(SourceDataWriter):
         self._subsliced_keys = {}  # (source, key) -> list(<regions>)
         self._compressed_keys = {}  # (source, key) -> level
 
+        # TODO: Raise error if rechunking is overwritten!
+        # TODO: make partial copies a list of slices!
+
         # Collect reductions resulting from operations.
-        # This is the most efficient order of operations to minimize
-        # more expensive operations for source or trains that may not
-        # end up being selected.
-        self._handle_remove_sources()
-        self._handle_remove_keys()
-        self._handle_select_trains()
-        self._handle_select_entries()
-        self._handle_select_xtdf()
-        self._handle_rechunk_keys()
-        self._handle_subslice_keys()
-        self._handle_compress_keys()
-
-        custom_entry_sources = {x[0] for x in self._custom_entry_masks.keys()}
-        if custom_entry_sources & self._custom_xtdf_masks.keys():
-            raise ReduceInitError('Source may not be affected by both '
-                                  'select-entries and select-xtdf operations')
-
-        if self._rechunked_keys.keys() & self._compressed_keys.keys():
-            raise ReduceInitError('Key may not be affected by both '
-                                  'compress-keys and rechunk-keys')
+        for source_glob, in self._filter_ops('remove-sources'):
+            for source in fnmatch.filter(self._sources, source_glob):
+                self._touched_sources.add(source)
+                self._sources.remove(source)
+
+        for source_glob, key_glob in self._filter_ops('remove-keys'):
+            for source in fnmatch.filter(self._sources, source_glob):
+                self._touched_sources.add(source)
+
+                keys = self._custom_keys.setdefault(
+                    source, set(self._data[source].keys()))
+
+                for key in fnmatch.filter(keys, key_glob):
+                    keys.remove(key)
+
+        for source_glob, train_sel in self._filter_ops('select-trains'):
+            for source in fnmatch.filter(self._sources, source_glob):
+                self._touched_sources.add(source)
+                train_ids = self._custom_trains.setdefault(
+                    source, list(self._data.train_ids))
+
+                self._custom_trains[source] = select_train_ids(
+                    train_ids, train_sel)
+
+        for source_glob, index_group, train_sel, entry_sel in self._filter_ops(
+            'select-entries'
+        ):
+            for source in fnmatch.filter(self._sources, source_glob):
+                if index_group not in self._data[source].index_groups:
+                    raise ValueError(f'{index_group} not index group of '
+                                     f'{source}')
+
+                new_mask = self._get_entry_masks(
+                    source, index_group, train_sel, entry_sel)
+
+                self._touched_sources.add(source)
+                self._custom_entry_masks.setdefault(
+                    (source, index_group), {}).update(new_mask)
+
+        for source_glob, train_sel, entry_sel in self._filter_ops(
+            'select-xtdf'
+        ):
+            for source in fnmatch.filter(self._sources, source_glob):
+                if not source.endswith(':xtdf'):
+                    # Simply ignore matches without trailing :xtdf.
+                    continue
+
+                if not self._is_xtdf_source(source):
+                    # Raise exception if essentials are missing.
+                    raise ValueError(f'{source} is not a valid XTDF source')
+
+                new_mask = self._get_entry_masks(
+                    source, 'image', train_sel, entry_sel)
+
+                self._touched_sources.add(source)
+                self._custom_xtdf_masks.setdefault(source, {}).update(new_mask)
+
+        if (
+            {x[0] for x in self._custom_entry_masks.keys()} &
+            self._custom_xtdf_masks.keys()
+        ):
+            raise ValueError('source may not be affected by both '
+                             'select-entries and select-xtdf operations')
+
+        for source_glob, key_glob, chunking in self._filter_ops(
+            'rechunk-keys'
+        ):
+            for source in fnmatch.filter(self._sources, source_glob):
+                if not self._data[source].is_instrument:
+                    raise ValueError(
+                        f'rechunking keys only supported for instrument '
+                        f'sources, but {source_glob} matches '
+                        f'{self._data[source].section}/{source}')
+
+                self._touched_sources.add(source)
+
+                keys = self._custom_keys.get(
+                    source, set(self._data[source].keys()))
+
+                for key in fnmatch.filter(keys, key_glob):
+                    old_chunking = self._rechunked_keys.setdefault(
+                        (source, key), chunking)
+
+                    if old_chunking != chunking:
+                        raise ValueError(
+                            f'reduction sequence yields conflicting chunks '
+                            f'for {source}.{key}: {old_chunking}, {chunking}')
+
+                    self._rechunked_keys[(source, key)] = chunking
+
+        for source_glob, key_glob, region in self._filter_ops('subslice-keys'):
+            for source in fnmatch.filter(self._sources, source_glob):
+                self._touched_sources.add(source)
+
+                keys = self._custom_keys.get(
+                    source, set(self._data[source].keys()))
+
+                for key in fnmatch.filter(keys, key_glob):
+                    self._subsliced_keys.setdefault((source, key), []).append(
+                        region)
+
+        for source_glob, key_glob, level in self._filter_ops('compress-keys'):
+            for source in fnmatch.filter(self._sources, source_glob):
+                self._touched_sources.add(source)
+
+                keys = self._custom_keys.get(
+                    source, set(self._data[source].keys()))
+
+                for key in fnmatch.filter(keys, key_glob):
+                    self._compressed_keys[source, key] = level
+
+        if (self._rechunked_keys.keys() & self._compressed_keys.keys()):
+            raise ValueError('keys may not be affected by both compress-keys '
+                             'and rechunk-keys operations')
 
         if self._scope == 'sources':
             self._sources = sorted(
@@ -135,12 +192,8 @@ class ReduceWriter(SourceDataWriter):
                  if (self._data[source].aggregator in touched_aggregators)})
 
         if not self._sources:
-            raise ReduceInitError('Reduction operations and output scope '
-                                  'yield an empty dataset')
-        else:
-            self.log.debug(
-                f'Sources being modified: {sorted(self._touched_sources)}')
-            self.log.debug(f'Sources included in output: {self._sources}')
+            raise ValueError('reduction sequence yields empty source '
+                             'selection')
 
     def _filter_ops(self, op):
         return [args[1:] for args in self._ops if args[0] == op]
@@ -169,8 +222,8 @@ class ReduceWriter(SourceDataWriter):
                 if count == 0:
                     continue
                 elif max_entry >= count:
-                    raise ReduceInitError(f'Entry index exceeds data counts '
-                                          f'of train {train_id}')
+                    raise ValueError(
+                        f'entry index exceeds data counts of train {train_id}')
 
                 masks[train_id] = np.zeros(count, dtype=bool)
                 masks[train_id][entry_sel] = True
@@ -182,18 +235,16 @@ class ReduceWriter(SourceDataWriter):
                 if count == 0:
                     continue
                 elif mask_len != counts.get(train_id, 0):
-                    raise ReduceInitError(f'Mask length mismatch for '
-                                          f'train {train_id}')
+                    raise ValueError(
+                        f'mask length mismatch for train {train_id}')
 
                 masks[train_id] = entry_sel
 
         else:
-            raise ReduceInitError('Unknown entry mask format')
+            raise ValueError('unknown entry mask format')
 
         return masks
 
-    # Public API
-
     def write_collection(self, output_path):
         outp_data = self._data.select([(s, '*') for s in self._sources])
 
@@ -308,12 +359,10 @@ class ReduceWriter(SourceDataWriter):
         # Keys are guaranteed to never use both custom chunking and
         # compression.
 
-        sourcekey = source, key
-
-        if sourcekey in self._rechunked_keys:
+        if (source, key) in self._rechunked_keys:
             orig_chunks = kwargs['chunks']
 
-            chunks = list(self._rechunked_keys[sourcekey])
+            chunks = list(self._rechunked_keys[source, key])
             assert len(chunks) == len(orig_chunks)
 
             for i, dim_len in enumerate(chunks):
@@ -326,14 +375,14 @@ class ReduceWriter(SourceDataWriter):
 
             kwargs['chunks'] = tuple(chunks)
 
-        elif sourcekey in self._compressed_keys or orig_dset.compression:
+        elif (source, key) in self._compressed_keys or orig_dset.compression:
             # TODO: Maintain more of existing properties, for now it is
             # forced to use gzip and (1, *entry) chunking.
             kwargs['chunks'] = (1,) + kwargs['shape'][1:]
             kwargs['shuffle'] = True
             kwargs['compression'] = 'gzip'
             kwargs['compression_opts'] = self._compressed_keys.setdefault(
-                sourcekey, orig_dset.compression_opts)
+                (source, key), orig_dset.compression_opts)
 
         return kwargs
 
@@ -385,75 +434,3 @@ class ReduceWriter(SourceDataWriter):
 
         else:
             dest[:] = data
-
-    # Reduction operation handlers.
-
-    @apply_by_source('remove-sources')
-    def _handle_remove_sources(self, source):
-        self._touched_sources.add(source)
-        self.log.debug(f'Removing {source}')
-
-    @apply_by_key('remove-keys')
-    def _handle_remove_keys(self, source, key):
-        self._custom_keys[source].remove(key)
-        self.log.debug(f'Removing {source}, {key}')
-
-    @apply_by_source('select-trains')
-    def _handle_select_trains(self, source, train_sel):
-        self._custom_trains[source] = select_train_ids(
-            self._custom_trains.setdefault(source, list(self._data.train_ids)),
-            train_sel)
-        self.log.debug(f'Selecting {len(self._custom_trains[source])} trains '
-                       f'for {source}')
-
-    @apply_by_source('select-entries')
-    def _handle_select_entries(self, source, idx_group, train_sel, entry_sel):
-        if idx_group not in self._data[source].index_groups:
-            raise ReduceInitError(
-                f'{idx_group} not an index group of {source}')
-
-        self._custom_entry_masks.setdefault((source, idx_group), {}).update(
-            self._get_entry_masks(source, idx_group, train_sel, entry_sel))
-        self.log.debug(f'Applying entry selection to {source}, {idx_group}')
-
-    @apply_by_source('select-xtdf')
-    def _handle_select_xtdf(self, source, train_sel, entry_sel):
-        if not source.endswith(':xtdf'):
-            self.log.warning(
-                f'Ignoring non-XTDF source {source} based on name')
-            return
-
-        if not self._is_xtdf_source(source):
-            self.log.warning(
-                f'Ignoring non-XTDF source {source} based on structure')
-            return
-
-        self._custom_xtdf_masks.setdefault(source, {}).update(
-            self._get_entry_masks(source, 'image', train_sel, entry_sel))
-        self.log.debug(f'Applying XTDF selection to {source}')
-
-    @apply_by_key('rechunk-keys')
-    def _handle_rechunk_keys(self, source, key, chunking):
-        if not self._data[source].is_instrument:
-            # Ignore CONTROL sources.
-            return
-
-        old_chunking = self._rechunked_keys.setdefault((source, key), chunking)
-
-        if old_chunking != chunking:
-            raise ReduceInitError(
-                f'Reduction sequence yields conflicting chunks for '
-                f'{source}.{key}: {old_chunking}, {chunking}')
-
-        self._rechunked_keys[(source, key)] = chunking
-        self.log.debug(f'Rechunking {source}, {key} to {chunking}')
-
-    @apply_by_key('subslice-keys')
-    def _handle_subslice_keys(self, source, key, region):
-        self._subsliced_keys.setdefault((source, key), []).append(region)
-        self.log.debug(f'Subslicing {region} of {source}, {key}')
-
-    @apply_by_key('compress-keys')
-    def _handle_compress_keys(self, source, key, level):
-        self._compressed_keys[source, key] = level
-        self.log.debug(f'Compressing {source}, {key}')