Skip to content
Snippets Groups Projects
reduce.py 8.83 KiB
Newer Older
"""Apply data reduction to HDF5 files structured in the European XFEL
Data Format (EXDF).
"""

from argparse import ArgumentParser, RawDescriptionHelpFormatter

from pathlib import Path
import logging
import re
import sys

from pkg_resources import iter_entry_points
from extra_data import RunDirectory, open_run

from ..data_reduction.writer import ReduceWriter
def _parse_args(argv):
    argv = argv or sys.argv[1:]

    ap = ArgumentParser(
        description='Apply data reduction to HDF5 files structured in the '
                    'European XFEL Data Format (EXDF).',
        formatter_class=RawDescriptionHelpFormatter)

    ap.add_argument(
        'input', metavar='INPUT', type=Path,
        help='folder for input data or pX:rY[:prefix=raw] to load a proposal '
             'run from /gpfs/exfel/exp directly, currently always assumed to '

    logging_group = ap.add_mutually_exclusive_group()

    logging_group.add_argument(
        '--verbose', '-v', action='store_true',
        help='set logging filter to DEBUG')

    logging_group.add_argument(
        '--quiet', '-q', action='store_true',
        help='set logging filter to WARNING')

    logging_group.add_argument(
        '--silent', '-s', action='store_true',
        help='set logging filter to ERROR')

    ap.add_argument(
        '--exclude-suspect-trains', action='store_true',
        help='whether trains considered suspect by their origin flag are '
             'excluded from the input data, by default all trains are '
             'considered ')

    for ep in iter_entry_points('exdf.data_reduction.method'):
        ep.load().arguments(ap)

    output_group = ap.add_argument_group(
        'output arguments',
        'Allows to configure the output location, scope and layout.\n'
        'The scope limits what part of the input dataset is contained in the '
        'output in relation to the parts that were changed. It may range from '
        'only writing out the affected sources, the affected aggregators or '
        'producing an entire copy of all changed and unchanged sources.\n'
        'The layout defines the way sources are distributed among '
        'aggregators. By default in the `collection` layout, they are '
        'assigned to the same aggregators as found in the input data. The '
        '`collapsed` layout on the other hand combines all sources in a new '
        'single aggregator. The `voview` layout appear similar by presenting '
        'all data within a single file, but contains mostly virtual datasets '
        'pointing to the original input files.\n'
        'Depending on the output layout, different default filename formats '
        'are used that may be overriden:\n'
        '- collection: '
        'R{data_category}-R{run:04d}-{aggregator}-S{sequence:05d}.h5\n'
        '- collapsed: RED-R{run:04d}-FLAT01-S{sequence:0d}.h5\n'
        '- voview: RED-R{run:04d}-VOVIEW01-S{sequence:05d}.h5')
        '--output-folder', '-o', metavar='PATH', type=Path, action='store',
        help='folder for output data')

    output_group.add_argument(
        '--output-scope', action='store',
        choices=['sources', 'aggregators', 'all', 'none'],
        default='aggregators',
        help='scope of what is contained in output, either only the '
             '`sources` touched by data reduction, the `aggregator` '
             'touched, `all` data found in the input or `none` of it '
             'resulting in no actual output data files')

    output_group.add_argument(
        '--output-layout', action='store',
        choices=['collection', 'collapsed', 'voview'],
        help='layout of output files, either as a `collection` of separate '
             'aggregator and sequence files, a single `collapsed` collection '
             'combining all former aggregators into one or a `voview` file '
             'referencing all data via virtual datasets')
        '--output-sequence-len', metavar='NUM_TRAINS', action='store',
        type=int, default=-1,
        help='number of trains per sequence file or non-positive value to '
             'preserve input length (default), only used for `collection` and '
             '`collapsed` output layout')
        '--output-filename', metavar='FORMAT', action='store', type=str,
        help='filename format for output files with depending on layout may '
             'contain formatting placeholders for: data_category, run, '
             'aggregator, sequence')

    output_group.add_argument(
        '--output-version', metavar='VERSION', action='store', type=str,
        choices=['1.3', '1.2', '1.0', 'same'], default='1.3',
        help='file format version output files are written in, supported '
             'options are 1.3 (current version), 1.2 (no attributes) 1.0 '
             '(no origin flag) or same (as input)')

    output_group.add_argument(
        '--to-recipe', action='store', type=Path,
        help='export to a data reduction recipe')

    return ap.parse_args(argv)


proposal_run_input_pattern = re.compile(
    r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$')


def _open_input_data(path, **dc_kwargs):
    dc_kwargs['_use_voview'] = False

    if not path.is_dir():
        m = proposal_run_input_pattern.match(str(path))

        if m:
            return open_run(proposal=m[1], run=m[2], data=m[3] or 'raw',
                            **dc_kwargs)
        else:
            raise ValueError('input path neither a directory nor '
                             'proposal/run pattern')

    else:
        return RunDirectory(path, **dc_kwargs)


def _to_recipe(recipe_path, methods, inp_data, argv):
    headers = ['Generated by exdf-tools 1.0']
    meta = inp_data.run_metadata()
    if meta.keys() >= {'proposalNumber', 'runNumber'}:
        headers.append('Applied to proposal {proposalNumber}, '
                       'run {runNumber}'.format(**meta))
    headers.append('Command line used:')

    argv = argv or sys.argv[1:]
    num_args = len(argv)
    i = 0
    while i < num_args:
        if '--to-recipe' in argv[i]:
            i += 2
            continue

        line = f'    {argv[i]}'
        i += 1

        if i < num_args and not argv[i].startswith('-'):
            line += f' {argv[i]}'
            i += 1

        headers.append(line)

    from ..data_reduction import recipe
    with open(recipe_path, 'w') as f:
        recipe.dump(methods, f, headers=headers)


def main(argv=None):
    args = _parse_args(argv)

    if args.verbose:
        level = logging.DEBUG
    elif args.quiet:
        level = logging.WARNING
    elif args.silent:
        level = logging.ERROR
    else:
        level = logging.INFO

    logging.basicConfig(stream=sys.stdout)
    logging.getLogger('exdf').setLevel(level)
    log = logging.getLogger('exdf.cli.reduce')

    inp_data = _open_input_data(
        args.input, inc_suspect_trains=not args.exclude_suspect_trains)
    log.info(f'Opened data from {Path(inp_data.files[0].filename).parent} '
             f'containing {len(inp_data.all_sources)} sources and '
             f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
             f'files')

    methods = {}
    for ep in iter_entry_points('exdf.data_reduction.method'):
        method = ep.load()(inp_data, args)

        if method:
            log.info(f'Applying {ep.name} with {len(method)} reduction '
                     f'operations')
            methods[ep.name] = method

    if args.to_recipe:
        _to_recipe(args.to_recipe, methods, inp_data, argv)
    writer = ReduceWriter(
        inp_data, methods,
        args.output_scope, args.output_sequence_len, args.output_version)

    if args.output_scope == 'none':
        log.info('Not writing out any data files')
        return

    if args.output_filename is not None:
        output_filename = args.output_filename
    elif args.output_layout == 'collection':
        output_filename = 'R{data_category}-R{run:04d}-{aggregator}' \
                          '-S{sequence:05d}.h5'
    elif args.output_layout == 'collapsed':
        output_filename = 'RED-R{run:04d}-FLAT01-S{sequence:05d}.h5'
    elif args.output_layout == 'voview':
        output_filename = 'RED-R{run:04d}-VOVIEW01-S{sequence:05d}.h5'
    args.output_folder.mkdir(parents=True, exist_ok=True)
    output_path = args.output_folder.resolve() / output_filename
    if args.output_layout == 'collection':
        log.info(f'Writing collection to {output_path.parent}')
        writer.write_collection(output_path)
    elif args.output_layout == 'collapsed':
        log.info(f'Writing collapsed collection to {output_path.parent}')
        writer.write_collapsed(output_path)
    elif args.output_layout == 'voview':
        log.info(f'Writing voview to {output_path.parent}')
        writer.write_voview(output_path)