From 29dc22df2394d1911ab568caafc54fff0c380cf7 Mon Sep 17 00:00:00 2001
From: Philipp Schmidt <philipp.schmidt@xfel.eu>
Date: Mon, 15 May 2023 15:23:12 +0200
Subject: [PATCH] Reorganize packages and replace pyproject.toml by actual
 setup.py

---
 pyproject.toml                                |  33 ---
 setup.py                                      |  68 +++++
 src/exdf/__init__.py                          |   2 +
 src/{exdf_tools => exdf/cli}/glance.py        |   0
 src/exdf/cli/reduce.py                        | 187 ++++++++++++
 src/exdf/data_reduction/__init__.py           |   2 +
 .../data_reduction/builtins.py}               |  15 +-
 src/exdf/data_reduction/method.py             |  45 +++
 .../data_reduction/sequence.py}               | 275 +++---------------
 src/{exdf_tools/exdf.py => exdf/datafile.py}  |  13 +-
 src/exdf_tools/__init__.py                    |   0
 11 files changed, 356 insertions(+), 284 deletions(-)
 delete mode 100644 pyproject.toml
 create mode 100644 setup.py
 create mode 100644 src/exdf/__init__.py
 rename src/{exdf_tools => exdf/cli}/glance.py (100%)
 create mode 100644 src/exdf/cli/reduce.py
 create mode 100644 src/exdf/data_reduction/__init__.py
 rename src/{exdf_tools/reduction_methods.py => exdf/data_reduction/builtins.py} (96%)
 create mode 100644 src/exdf/data_reduction/method.py
 rename src/{exdf_tools/reduce.py => exdf/data_reduction/sequence.py} (57%)
 rename src/{exdf_tools/exdf.py => exdf/datafile.py} (98%)
 delete mode 100644 src/exdf_tools/__init__.py

diff --git a/pyproject.toml b/pyproject.toml
deleted file mode 100644
index 6ce3026..0000000
--- a/pyproject.toml
+++ /dev/null
@@ -1,33 +0,0 @@
-[build-system]
-requires = ["flit_core >=2,<4"]
-build-backend = "flit_core.buildapi"
-
-[project]
-name = "EXDF-tools"
-description = "Tools to view and and manipulate HDF5 files in the European XFEL Data Format."
-readme = {file = "README.md", content-type = "text/markdown"}
-license = {file = "LICENSE"}
-version = "0.0.1"
-authors = [
-    {name = "European XFEL GmbH", email = "da-support@xfel.eu"},
-]
-classifiers = [
-    "License :: OSI Approved :: BSD License",
-]
-requires-python = ">=3.8"
-dependencies = [
-    "extra_data >= 1.12.0",
-    "uniplot >= 0.10.0",
-]
-
-[tool.flit.module]
-name = "exdf_tools"
-
-[project.scripts]
-exdf-glance = "exdf_tools.glance:main"
-exdf-reduce = "exdf_tools.reduce:main"
-
-[project.entry-points."exdf_tools.reduction_method"]
-manualRemoval = "exdf_tools.reduction_methods:ManualRemoval"
-ppuTrainSequences = "exdf_tools.reduction_methods:PpuTrainSequences"
-agipdGain = "exdf_tools.reduction_methods:AgipdGain"
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..58d5774
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+
+# Distributed under the terms of the BSD 3-Clause License.
+# The full license is in the file LICENSE, distributed with this software.
+#
+# Author: Philipp Schmidt <philipp.schmidt@xfel.eu>
+# Copyright (c) 2020, European X-Ray Free-Electron Laser Facility GmbH.
+# All rights reserved.
+
+from pathlib import Path
+import re
+
+import numpy as np
+from Cython.Build import cythonize
+
+from setuptools import setup, find_packages
+from setuptools.extension import Extension
+
+
+parent_path = Path(__file__).parent
+
+setup(
+    name='exdf-tools',
+    version='1.0.0',
+    description='Libraries and tools to work with the EXDF HDF5 tools.',
+    long_description=(parent_path / 'README.md').read_text(),
+    long_description_content_type='text/markdown',
+    url='',
+    author='European XFEL',
+    author_email='da@xfel.eu',
+    license='BSD-3-Clause',
+
+    package_dir={'': 'src'},
+    packages=find_packages('src'),
+
+    entry_points={
+        'console_scripts': [
+            'exdf-glance = exdf.cli.glance:main',
+            'exdf-reduce = exdf.cli.reduce:main'
+        ],
+
+        'exdf.data_reduction.method': [
+            'manualRemoval = exdf.data_reduction.builtins:ManualRemoval',
+            'ppuTrainSequences = exdf.data_reduction.builtins:PpuTrainSequences',
+            'agipdGain = exdf.data_reduction.builtins:AgipdGain'
+        ]
+    },
+
+    python_requires='>=3.8',
+    install_requires=[
+        'h5py',
+        'extra_data',
+    ],
+    extras_require={
+        'glance': ['uniplot'],
+        'test': ['pytest',],
+    },
+
+    classifiers=[
+          'Development Status :: 3 - Alpha',
+          'Intended Audience :: Developers',
+          'Intended Audience :: Science/Research',
+          'License :: OSI Approved :: BSD License',
+          'Operating System :: POSIX :: Linux',
+          'Topic :: Scientific/Engineering :: Information Analysis',
+          'Topic :: Scientific/Engineering :: Physics',
+    ]
+)
diff --git a/src/exdf/__init__.py b/src/exdf/__init__.py
new file mode 100644
index 0000000..11d1d79
--- /dev/null
+++ b/src/exdf/__init__.py
@@ -0,0 +1,2 @@
+
+from .datafile import DataFile
\ No newline at end of file
diff --git a/src/exdf_tools/glance.py b/src/exdf/cli/glance.py
similarity index 100%
rename from src/exdf_tools/glance.py
rename to src/exdf/cli/glance.py
diff --git a/src/exdf/cli/reduce.py b/src/exdf/cli/reduce.py
new file mode 100644
index 0000000..7543f70
--- /dev/null
+++ b/src/exdf/cli/reduce.py
@@ -0,0 +1,187 @@
+"""Apply data reduction to HDF5 files structured in the European XFEL
+Data Format (EXDF).
+"""
+
+from argparse import ArgumentParser
+
+from pathlib import Path
+import logging
+import re
+import sys
+
+from pkg_resources import iter_entry_points
+from extra_data import RunDirectory, open_run
+
+from exdf.data_reduction.sequence import ReductionSequence
+
+
+proposal_run_input_pattern = re.compile(
+    r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$')
+
+
+def main(argv=None):
+    ap = ArgumentParser(
+        description='Apply data reduction to HDF5 files structured in the '
+                    'European XFEL Data Format (EXDF).'
+    )
+
+    ap.add_argument(
+        'input', metavar='INPUT', type=Path,
+        help='folder for input data or pX:rY[:prefix=raw] to load a proposal '
+             'run from /gpfs/exfel/exp directly, currently always assumed to '
+             'be a single run.'
+    )
+
+    logging_group = ap.add_mutually_exclusive_group()
+
+    logging_group.add_argument(
+        '--verbose', '-v', action='store_true',
+        help='set logging filter to DEBUG'
+    )
+
+    logging_group.add_argument(
+        '--quiet', '-q', action='store_true',
+        help='set logging filter to WARNING'
+    )
+
+    logging_group.add_argument(
+        '--silent', '-s', action='store_true',
+        help='set logging filter to ERROR'
+    )
+
+    for ep in iter_entry_points('exdf.data_reduction.method'):
+        ep.load().arguments(ap)
+
+    output_group = ap.add_argument_group(
+        'output arguments',
+        'Allows to configure the output location, scope and layout.'
+    )
+
+    output_group.add_argument(
+        '--output', '-o', metavar='PATH', type=Path, action='store',
+        default='./out',
+        help='folder for output data, or file if ending with .h5 '
+             'and used with \'voview\' output layout')
+
+    output_group.add_argument(
+        '--output-scope', action='store',
+        choices=['sources', 'aggregators', 'all', 'none'],
+        default='aggregators',
+        help='scope of what is contained in output, either only the '
+             '\'sources\' touched by data reduction, the \'aggregator\' '
+             'touched, \'all\' data found in the input or \'none\' of it '
+             'resulting in no actual output data files'
+    )
+
+    output_group.add_argument(
+        '--output-layout', action='store',
+        choices=['collection', 'voview', 'collapsed'],
+        default='collection',
+        help='layout of output files, either as a \'collection\' of separate '
+             'aggregator and sequence files, as a \'voview\' file containing '
+             'virtual datasets for all data or a single \'collapsed\' '
+             'collection combining all former aggregators into one'
+    )
+
+    output_group.add_argument(
+        '--output-sequence-len', action='store', type=int, default=256,
+        help='number of trains per sequence file or -1 to preserve input '
+             'length, only used for \'collection\' and \'collapsed\' output '
+             'layout'
+    )
+
+    output_group.add_argument(
+        '--to-script', action='store', type=Path,
+        help='export reduction operations to a script file'
+    )
+
+    # TODO: Whether to use suspect trains or not
+    # TODO: Whether to drop entirely empty sources
+
+    args = ap.parse_args(argv)
+
+    if args.verbose:
+        level = logging.DEBUG
+    elif args.quiet:
+        level = logging.WARNING
+    elif args.silent:
+        level = logging.ERROR
+    else:
+        level = logging.INFO
+
+    logging.basicConfig(stream=sys.stdout, level=level)
+    log = logging.getLogger('exdf.cli.reduce')
+
+    if not args.input.is_dir():
+        m = proposal_run_input_pattern.match(str(args.input))
+
+        if m:
+            inp_data = open_run(proposal=m[1], run=m[2], data=m[3] or 'raw')
+            log.info(f'Found proposal run at '
+                     f'{Path(inp_data.files[0].filename).parent}')
+
+    else:
+        inp_data = RunDirectory(args.input)
+
+    log.info(f'Opened data with {len(inp_data.all_sources)} sources with '
+             f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
+             f'files')
+
+    methods = {}
+
+    for ep in iter_entry_points('exdf.data_reduction.method'):
+        method = ep.load()()
+        method.compile(inp_data, args)
+
+        if method:
+            log.info(f'Applying {ep.name} with {len(method)} reduction '
+                     f'operations')
+            methods[ep.name] = method
+
+    if args.to_script:
+        with open(args.to_script, 'w') as f:
+            for name, method in methods.items():
+                f.write(f'# {name}\n')
+
+                for instr in method._instructions:
+                    f.write('{}: {}\n'.format(
+                        instr[0], '; '.join([str(x) for x in instr[1:]])))
+
+    seq = ReductionSequence(inp_data, methods)
+    seq.link(args.output_scope)
+
+    if args.output_scope == 'none':
+        log.info('Not writing out any data files')
+        return
+
+    if args.output_layout == 'collection':
+        args.output.mkdir(parents=True, exist_ok=True)
+
+        log.debug(f'Writing collection to {args.output}')
+        seq.write_collection(args.output, args.output_sequence_len)
+
+    elif args.output_layout == 'voview':
+        if args.output.is_file() or args.output.suffix == '.h5':
+            output_path = args.output
+        else:
+            args.output.mkdir(parents=True, exist_ok=True)
+            output_path = args.output / 'output.h5'
+
+        log.debug(f'Writing voview to {output_path}')
+        seq.write_voview(output_path)
+
+    elif args.output_layout == 'collapsed':
+        # Requires datasets to be in memory for now.
+        log.warning('In the current implementation, the entire output data '
+                    'must be loaded to memory')
+        if input('Still continue? [y/n] ') != 'y':
+            return
+
+        args.output.mkdir(parents=True, exist_ok=True)
+
+        log.debug(f'Writing collapsed collection to {args.output}')
+        seq.write_collapsed(args.output, args.output_sequence_len)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/src/exdf/data_reduction/__init__.py b/src/exdf/data_reduction/__init__.py
new file mode 100644
index 0000000..5cb9488
--- /dev/null
+++ b/src/exdf/data_reduction/__init__.py
@@ -0,0 +1,2 @@
+
+from .method import ReductionMethod
\ No newline at end of file
diff --git a/src/exdf_tools/reduction_methods.py b/src/exdf/data_reduction/builtins.py
similarity index 96%
rename from src/exdf_tools/reduction_methods.py
rename to src/exdf/data_reduction/builtins.py
index 5f9c993..88b2594 100644
--- a/src/exdf_tools/reduction_methods.py
+++ b/src/exdf/data_reduction/builtins.py
@@ -4,7 +4,7 @@ from logging import getLogger
 import numpy as np
 
 from extra_data import by_id, SourceNameError
-from exdf_tools.reduce import ReductionMethod
+from exdf.data_reduction import ReductionMethod
 
 
 class ManualRemoval(ReductionMethod):
@@ -35,7 +35,7 @@ class ManualRemoval(ReductionMethod):
             # Easiest would be to copy file and just remove the source.
             for source_glob in args.remove_sources:
                 self.remove_sources(source_glob)
-    
+
         if args.remove_keys:
             # Easiest would be to copy file and just remove the key.
             for sourcekey_str in args.remove_keys:
@@ -45,7 +45,7 @@ class ManualRemoval(ReductionMethod):
         if args.remove_trains:
             for slice_expr in args.remove_trains:
                 # TODO: Convert slice_expr top slice
-                self.remove_trains('*', xd.by_id[args.remove_trains])
+                self.remove_trains('*', by_id[args.remove_trains])
 
 
 class PpuTrainSequences(ReductionMethod):  # p3160, r250
@@ -157,7 +157,7 @@ class AgipdGain(ReductionMethod):
 
             self.log.info(f'Found {domain}')
             agipd_sources = f'{domain}/DET/*CH0:xtdf'
-        
+
         else:
             agipd_sources = args.agipd_gain_sources
 
@@ -181,10 +181,9 @@ class AgipdGain(ReductionMethod):
                            f'keys found in data')
             return
 
-        #if gain_mode == 0:
-        #    self.log.error('Data taken in adaptive mode, aborting')
-        #    return
+        if gain_mode == 0:
+            self.log.error('Data taken in adaptive mode, aborting')
+            return
 
         self.partial_copy(agipd_sources, 'image.data',
                           np.s_[0, :, :], (64, 1, 512, 128))
-
diff --git a/src/exdf/data_reduction/method.py b/src/exdf/data_reduction/method.py
new file mode 100644
index 0000000..a1642fc
--- /dev/null
+++ b/src/exdf/data_reduction/method.py
@@ -0,0 +1,45 @@
+
+import logging
+
+
+class ReductionMethod:
+    log = logging.getLogger('exdf.data_reduction.ReductionMethod')
+
+    # Stores list of instructions and can apply them
+    # Allows (de-)serialization with files
+    # Convenience interface for reduction methods to emit instructions.
+
+    # Populate some arguments
+    # Check if the method is enabled -> could be in the main body
+    # Actually compile instructions for removal with DataCollection
+
+    def __init__(self):
+        self._instructions = []
+
+    def __bool__(self):
+        # Whether this program contains any instructions.
+        return bool(self._instructions)
+
+    def __len__(self):
+        # How many instructions this program contains.
+        return len(self._instructions)
+
+    def select_trains(self, sources, trains):
+        # init level
+        self._instructions.append(('selectTrains', sources, trains))
+        self.log.debug(f'Emitted {self._instructions[-1]}')
+
+    def remove_sources(self, source_glob):
+        # init level
+        self._instructions.append(('removeSources', source_glob))
+        self.log.debug(f'Emitted {self._instructions[-1]}')
+
+    def remove_keys(self, source_glob, key_glob):
+        # copy_dataset level
+        self._instructions.append(('removeKeys', source_glob, key_glob))
+        self.log.debug(f'Emitted {self._instructions[-1]}')
+
+    def partial_copy(self, sources, key, region, chunks):
+        self._instructions.append(
+            ('partialCopy', sources, key, region, chunks))
+        self.log.debug(f'Emitted {self._instructions[-1]}')
diff --git a/src/exdf_tools/reduce.py b/src/exdf/data_reduction/sequence.py
similarity index 57%
rename from src/exdf_tools/reduce.py
rename to src/exdf/data_reduction/sequence.py
index 85dfca7..49c2947 100644
--- a/src/exdf_tools/reduce.py
+++ b/src/exdf/data_reduction/sequence.py
@@ -1,29 +1,24 @@
-"""Apply data reduction to HDF5 files structured in the European XFEL Data Format (EXDF).
-"""
 
-from argparse import ArgumentParser
 from collections import defaultdict
 from pathlib import Path
 import fnmatch
 import logging
 import re
-import sys
 
 import numpy as np
 
-from pkg_resources import iter_entry_points
-from extra_data import RunDirectory, open_run, by_id
-from extra_data.writer import FileWriter, VirtualFileWriter
+from extra_data import by_id
 from extra_data.read_machinery import select_train_ids
 
-from . import exdf
+from exdf import DataFile
 
 
 exdf_filename_pattern = re.compile(
     r'([A-Z]+)-R(\d{4})-(\w+)-S(\d{5}).h5')
 
+
 def get_source_origin(sourcedata):
-    # TODO: Can be replaced by EXtra-data functionality later.
+    # TODO: Move to EXtra-data.
 
     try:
         files = sourcedata.source_files
@@ -52,6 +47,8 @@ def get_source_origin(sourcedata):
 
 
 def sourcedata_drop_empty_trains(sd):
+    # TODO: Move to EXtra-data.
+
     if sd._is_control:
         train_sel = sd[next(iter(sd.keys()))].drop_empty_trains().train_ids
     else:
@@ -68,6 +65,8 @@ def sourcedata_drop_empty_trains(sd):
 
 
 def sourcedata_data_counts(sd):
+    # TODO: Move to EXtra-data
+
     if sd._is_control:
         return sd[next(iter(sd.keys()))].data_counts(labelled=False)
     else:
@@ -84,7 +83,7 @@ def sourcedata_data_counts(sd):
 class ReductionSequence:
     # A sequence of reduction methods.
 
-    log = logging.getLogger('reduce.ReductionSequence')
+    log = logging.getLogger('exdf.data_reduction.ReductionSequence')
 
     def __init__(self, data, methods):
         self._data = data
@@ -129,9 +128,11 @@ class ReductionSequence:
                     source, list(self._data.train_ids))
 
                 self._custom_trains[source] = select_train_ids(
-                    self._custom_trains[source], train_sel)
+                    train_ids, train_sel)
 
-        for source_glob, key, region, chunking in self._filter_ops('partialCopy'):
+        for source_glob, key, region, chunking in self._filter_ops(
+            'partialCopy'
+        ):
             for source in fnmatch.filter(self._sources, source_glob):
                 self._touched_sources.add(source)
                 self._partial_copies[(source, key)] = (region, chunking)
@@ -146,14 +147,18 @@ class ReductionSequence:
 
             self._sources = sorted(
                 {source for source in self._sources
-                if get_source_origin(self._data[source])[2]
-                    in touched_aggregators})
+                 if (get_source_origin(self._data[source])[2]
+                     in touched_aggregators)})
 
         if not self._sources:
             self.log.warn('Linking reduction methods yielded empty source '
                           'selection')
 
     def write_collection(self, output_path, sequence_len=512):
+        if not self._sources:
+            self.log.error('No sources to write to collection.')
+            return
+
         outp_data = self._data.select([(s, '*') for s in self._sources])
 
         # An origin is a pair of (storage_class, aggregator)
@@ -189,7 +194,7 @@ class ReductionSequence:
             # Switch to representation of SourceData objects for
             # per-source tracking of trains.
             origin_sources = {source: origin_data[source]
-                            for source in origin_data.all_sources}
+                              for source in origin_data.all_sources}
 
             # Apply custom train selections.
             for source, train_sel in self._custom_trains.items():
@@ -219,7 +224,7 @@ class ReductionSequence:
 
                 # Select origin data down to what's in this file.
                 seq_sources = {source: sd.select_trains(by_id[seq_train_ids])
-                                for source, sd in origin_sources.items()}
+                               for source, sd in origin_sources.items()}
 
                 seq_path = self._write_sequence(
                     output_path, aggregator, run_no, seq_no, data_category,
@@ -253,13 +258,14 @@ class ReductionSequence:
             else:
                 instrument_indices[sd.source] = sourcedata_data_counts(sd)
 
-        with exdf.File.from_details(folder, aggregator, run, sequence,
-                                    prefix=data_category, mode='w') as f:
-            path = Path(f.filename)
+        with DataFile.from_details(folder, aggregator, run, sequence,
+                                   prefix=data_category, mode='w') as f:
+            output_path = Path(f.filename)
 
             # TODO: Handle old files without extended METADATA?
             # TODO: Handle timestamps, origin
             # TODO: Attributes
+            # TODO: Chunking of run/control data?
 
             # Create METADATA section.
             f.create_metadata(
@@ -275,7 +281,7 @@ class ReductionSequence:
 
             for source, counts in control_indices.items():
                 control_src = f.create_control_source(source)
-                control_src.create_index(len(train_ids))
+                control_src.create_index(len(train_ids), per_train=True)
 
             for source, channel_counts in instrument_indices.items():
                 instrument_src = f.create_instrument_source(source)
@@ -286,8 +292,16 @@ class ReductionSequence:
                 exdf_source = f.source[source]
                 sd = sources[source]
 
-                for key, value in sd.run_values().items():
-                    exdf_source.create_run_key(key, value)
+                run_data_leafs = {}
+
+                for path, value in sd.run_values().items():
+                    key = path[:path.rfind('.')]
+                    leaf = path[path.rfind('.')+1:]
+
+                    run_data_leafs.setdefault(key, {})[leaf] = value
+
+                for key, leafs in run_data_leafs.items():
+                    exdf_source.create_run_key(key, **leafs)
 
                 for key in sd.keys(False):
                     exdf_source.create_key(key,
@@ -332,219 +346,4 @@ class ReductionSequence:
                         full_region = (np.s_[:], *region)
                         exdf_source.key[key][full_region] = data[full_region]
 
-        return path
-
-
-class ReductionMethod:
-    log = logging.getLogger('ReductionMethod')
-
-    # Stores list of instructions and can apply them
-    # Allows (de-)serialization with files
-    # Convenience interface for reduction methods to emit instructions.
-
-    # Populate some arguments
-    # Check if the method is enabled -> could be in the main body
-    # Actually compile instructions for removal with DataCollection
-
-    def __init__(self):
-        self._instructions = []
-
-    def __bool__(self):
-        # Whether this program contains any instructions.
-        return bool(self._instructions)
-
-    def __len__(self):
-        # How many instructions this program contains.
-        return len(self._instructions)
-
-    def select_trains(self, sources, trains):
-        # init level
-        self._instructions.append(('selectTrains', sources, trains))
-        self.log.debug(f'Emitted {self._instructions[-1]}')
-
-    def remove_sources(self, source_glob):
-        # init level
-        self._instructions.append(('removeSources', source_glob))
-        self.log.debug(f'Emitted {self._instructions[-1]}')
-
-    def remove_keys(self, source_glob, key_glob):
-        # copy_dataset level
-        self._instructions.append(('removeKeys', source_glob, key_glob))
-        self.log.debug(f'Emitted {self._instructions[-1]}')
-
-    def partial_copy(self, sources, key, region, chunks):
-        self._instructions.append(
-            ('partialCopy', sources, key, region, chunks))
-        self.log.debug(f'Emitted {self._instructions[-1]}')
-
-
-proposal_run_input_pattern = re.compile(
-    r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$')
-
-
-def main(argv=None):
-    ap = ArgumentParser(
-        description='Apply data reduction to HDF5 files structured in the '
-                    'European XFEL Data Format (EXDF).'
-    )
-
-    ap.add_argument(
-        'input', metavar='INPUT', type=Path,
-        help='folder for input data or pX:rY[:prefix=raw] to load a proposal '
-             'run from /gpfs/exfel/exp directly, currently always assumed to '
-             'be a single run.'
-    )
-
-    logging_group = ap.add_mutually_exclusive_group()
-
-    logging_group.add_argument(
-        '--verbose', '-v', action='store_true',
-        help='set logging filter to DEBUG'
-    )
-
-    logging_group.add_argument(
-        '--quiet', '-q', action='store_true',
-        help='set logging filter to WARNING'
-    )
-
-    logging_group.add_argument(
-        '--silent', '-s', action='store_true',
-        help='set logging filter to ERROR'
-    )
-
-    for ep in iter_entry_points('exdf_tools.reduction_method'):
-        ep.load().arguments(ap)
-
-    output_group = ap.add_argument_group(
-        'output arguments',
-        'Allows to configure the output location, scope and layout.'
-    )
-
-    output_group.add_argument(
-        '--output', '-o', metavar='PATH', type=Path, action='store',
-        default='./out',
-        help='folder for output data, or file if ending with .h5 '
-              'and used with \'voview\' output layout')
-
-    output_group.add_argument(
-        '--output-scope', action='store',
-        choices=['sources', 'aggregators', 'all', 'none'],
-        default='aggregators',
-        help='scope of what is contained in output, either only the '
-             '\'sources\' touched by data reduction, the \'aggregator\' '
-             'touched, \'all\' data found in the input or \'none\' of it '
-             'resulting in no actual output data files'
-    )
-
-    output_group.add_argument(
-        '--output-layout', action='store',
-        choices=['collection', 'voview', 'collapsed'],
-        default='collection',
-        help='layout of output files, either as a \'collection\' of separate '
-             'aggregator and sequence files, as a \'voview\' file containing '
-             'virtual datasets for all data or a single \'collapsed\' '
-             'collection combining all former aggregators into one'
-    )
-
-    output_group.add_argument(
-        '--output-sequence-len', action='store', type=int, default=256,
-        help='number of trains per sequence file or -1 to preserve input '
-             'length, only used for \'collection\' and \'collapsed\' output '
-             'layout'
-    )
-
-    output_group.add_argument(
-        '--to-script', action='store', type=Path,
-        help='export reduction operations to a script file'
-    )
-
-    # TODO: Whether to use suspect trains or not
-    # TODO: Whether to drop entirely empty sources
-
-    args = ap.parse_args(argv)
-
-    if args.verbose:
-        level = logging.DEBUG
-    elif args.quiet:
-        level = logging.WARNING
-    elif args.silent:
-        level = logging.ERROR
-    else:
-        level = logging.INFO
-
-    logging.basicConfig(stream=sys.stdout, level=level)
-    log = logging.getLogger('reduce.main')
-
-    if not args.input.is_dir():
-        m = proposal_run_input_pattern.match(str(args.input))
-
-        if m:
-            inp_data = open_run(proposal=m[1], run=m[2], data=m[3] or 'raw')
-            log.info(f'Found proposal run at '
-                     f'{Path(inp_data.files[0].filename).parent}')
-
-    else:
-        inp_data = RunDirectory(args.input)
-
-    log.info(f'Opened data with {len(inp_data.all_sources)} sources with '
-             f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
-             f'files')
-
-    methods = {}
-
-    for ep in iter_entry_points('exdf_tools.reduction_method'):
-        method = ep.load()()
-        method.compile(inp_data, args)
-
-        if method:
-            log.info(f'Applying {ep.name} with {len(method)} reduction '
-                     f'operations')
-            methods[ep.name] = method
-
-    if args.to_script:
-        with open(args.to_script, 'w') as f:
-            for name, method in methods.items():
-                f.write(f'# {name}\n')
-
-                for instr in method._instructions:
-                    f.write('{}: {}\n'.format(
-                        instr[0], '; '.join([str(x) for x in instr[1:]])))
-
-    seq = ReductionSequence(inp_data, methods)
-    seq.link(args.output_scope)
-
-    if args.output_scope == 'none':
-        log.info(f'Not writing out any data files')
-        return
-
-    if args.output_layout == 'collection':
-        args.output.mkdir(parents=True, exist_ok=True)
-
-        log.debug(f'Writing collection to {args.output}')
-        seq.write_collection(args.output, args.output_sequence_len)
-
-    elif args.output_layout == 'voview':
-        if args.output.is_file() or args.output.suffix == '.h5':
-            output_path = args.output
-        else:
-            args.output.mkdir(parents=True, exist_ok=True)
-            output_path = args.output / 'output.h5'
-
-        log.debug(f'Writing voview to {output_path}')
-        seq.write_voview(output_path)
-
-    elif args.output_layout == 'collapsed':
-        # Requires datasets to be in memory for now.
-        log.warning('In the current implementation, the entire output data '
-                    'must be loaded to memory')
-        if input('Still continue? [y/n] ') != 'y':
-            return
-
-        args.output.mkdir(parents=True, exist_ok=True)
-
-        log.debug(f'Writing collapsed collection to {args.output}')
-        seq.write_collapsed(args.output, args.output_sequence_len)
-
-
-if __name__ == '__main__':
-    main()
+        return output_path
diff --git a/src/exdf_tools/exdf.py b/src/exdf/datafile.py
similarity index 98%
rename from src/exdf_tools/exdf.py
rename to src/exdf/datafile.py
index 89a8684..e6ebb29 100644
--- a/src/exdf_tools/exdf.py
+++ b/src/exdf/datafile.py
@@ -1,5 +1,4 @@
 
-
 from datetime import datetime
 from itertools import chain
 from numbers import Integral
@@ -118,7 +117,7 @@ class KeyIndexer(CustomIndexer):
         return self.parent[escape_key(key)]
 
 
-class File(h5py.File):
+class DataFile(h5py.File):
     """European XFEL HDF5 data file.
 
     This class extends the h5py.File with methods specific to writing
@@ -490,8 +489,9 @@ class ControlSource(Source):
             except IndexError:
                 # Assume empty lists are string-typed.
                 dtype = self.ascii_dt
+
         elif isinstance(value, np.ndarray):
-            shape = value.shape
+            shape = (1, *value.shape) if value.ndim == 1 else value.shape
             dtype = value.dtype
         else:
             shape = (1,)
@@ -505,7 +505,7 @@ class ControlSource(Source):
         self.__run_group.create_dataset(
             f'{key}/timestamp', data=timestamp, shape=(1,), dtype=np.uint64)
 
-    def create_index(self, num_trains):
+    def create_index(self, num_trains, per_train=None):
         """Create source-specific INDEX datasets.
 
         Depending on whether this source has train-varying data or not,
@@ -518,7 +518,10 @@ class ControlSource(Source):
             None
         """
 
-        if len(self) > 0:
+        if per_train is None:
+            per_train = len(self) > 0
+
+        if per_train:
             count_func = np.ones
             first_func = np.arange
         else:
diff --git a/src/exdf_tools/__init__.py b/src/exdf_tools/__init__.py
deleted file mode 100644
index e69de29..0000000
-- 
GitLab