Skip to content
Snippets Groups Projects
reduce.py 5.93 KiB
Newer Older
"""Apply data reduction to HDF5 files structured in the European XFEL
Data Format (EXDF).
"""

from argparse import ArgumentParser

from pathlib import Path
import logging
import re
import sys

from pkg_resources import iter_entry_points
from extra_data import RunDirectory, open_run

from exdf.data_reduction.sequence import ReductionSequence


proposal_run_input_pattern = re.compile(
    r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$')


def main(argv=None):
    ap = ArgumentParser(
        description='Apply data reduction to HDF5 files structured in the '
                    'European XFEL Data Format (EXDF).'
    )

    ap.add_argument(
        'input', metavar='INPUT', type=Path,
        help='folder for input data or pX:rY[:prefix=raw] to load a proposal '
             'run from /gpfs/exfel/exp directly, currently always assumed to '
             'be a single run.'
    )

    logging_group = ap.add_mutually_exclusive_group()

    logging_group.add_argument(
        '--verbose', '-v', action='store_true',
        help='set logging filter to DEBUG'
    )

    logging_group.add_argument(
        '--quiet', '-q', action='store_true',
        help='set logging filter to WARNING'
    )

    logging_group.add_argument(
        '--silent', '-s', action='store_true',
        help='set logging filter to ERROR'
    )

    for ep in iter_entry_points('exdf.data_reduction.method'):
        ep.load().arguments(ap)

    output_group = ap.add_argument_group(
        'output arguments',
        'Allows to configure the output location, scope and layout.'
    )

    output_group.add_argument(
        '--output', '-o', metavar='PATH', type=Path, action='store',
        default='./out',
        help='folder for output data, or file if ending with .h5 '
             'and used with \'voview\' output layout')

    output_group.add_argument(
        '--output-scope', action='store',
        choices=['sources', 'aggregators', 'all', 'none'],
        default='aggregators',
        help='scope of what is contained in output, either only the '
             '\'sources\' touched by data reduction, the \'aggregator\' '
             'touched, \'all\' data found in the input or \'none\' of it '
             'resulting in no actual output data files'
    )

    output_group.add_argument(
        '--output-layout', action='store',
        choices=['collection', 'voview', 'collapsed'],
        default='collection',
        help='layout of output files, either as a \'collection\' of separate '
             'aggregator and sequence files, as a \'voview\' file containing '
             'virtual datasets for all data or a single \'collapsed\' '
             'collection combining all former aggregators into one'
    )

    output_group.add_argument(
        '--output-sequence-len', action='store', type=int, default=256,
        help='number of trains per sequence file or -1 to preserve input '
             'length, only used for \'collection\' and \'collapsed\' output '
             'layout'
    )

    output_group.add_argument(
        '--to-script', action='store', type=Path,
        help='export reduction operations to a script file'
    )

    # TODO: Whether to use suspect trains or not
    # TODO: Whether to drop entirely empty sources

    args = ap.parse_args(argv)

    if args.verbose:
        level = logging.DEBUG
    elif args.quiet:
        level = logging.WARNING
    elif args.silent:
        level = logging.ERROR
    else:
        level = logging.INFO

    logging.basicConfig(stream=sys.stdout, level=level)
    log = logging.getLogger('exdf.cli.reduce')

    if not args.input.is_dir():
        m = proposal_run_input_pattern.match(str(args.input))

        if m:
            inp_data = open_run(proposal=m[1], run=m[2], data=m[3] or 'raw')
            log.info(f'Found proposal run at '
                     f'{Path(inp_data.files[0].filename).parent}')

    else:
        inp_data = RunDirectory(args.input)

    log.info(f'Opened data with {len(inp_data.all_sources)} sources with '
             f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
             f'files')

    methods = {}

    for ep in iter_entry_points('exdf.data_reduction.method'):
        method = ep.load()()
        method.compile(inp_data, args)

        if method:
            log.info(f'Applying {ep.name} with {len(method)} reduction '
                     f'operations')
            methods[ep.name] = method

    if args.to_script:
        with open(args.to_script, 'w') as f:
            for name, method in methods.items():
                f.write(f'# {name}\n')

                for instr in method._instructions:
                    f.write('{}: {}\n'.format(
                        instr[0], '; '.join([str(x) for x in instr[1:]])))

    seq = ReductionSequence(inp_data, methods)
    seq.link(args.output_scope)

    if args.output_scope == 'none':
        log.info('Not writing out any data files')
        return

    if args.output_layout == 'collection':
        args.output.mkdir(parents=True, exist_ok=True)

        log.debug(f'Writing collection to {args.output}')
        seq.write_collection(args.output, args.output_sequence_len)

    elif args.output_layout == 'voview':
        if args.output.is_file() or args.output.suffix == '.h5':
            output_path = args.output
        else:
            args.output.mkdir(parents=True, exist_ok=True)
            output_path = args.output / 'output.h5'

        log.debug(f'Writing voview to {output_path}')
        seq.write_voview(output_path)

    elif args.output_layout == 'collapsed':
        # Requires datasets to be in memory for now.
        log.warning('In the current implementation, the entire output data '
                    'must be loaded to memory')
        if input('Still continue? [y/n] ') != 'y':
            return

        args.output.mkdir(parents=True, exist_ok=True)

        log.debug(f'Writing collapsed collection to {args.output}')
        seq.write_collapsed(args.output, args.output_sequence_len)


if __name__ == '__main__':
    main()