"""Apply data reduction to HDF5 files structured in the European XFEL Data Format (EXDF). """ from argparse import ArgumentParser, RawDescriptionHelpFormatter from pathlib import Path from time import perf_counter import logging import re import sys from pkg_resources import iter_entry_points from extra_data import RunDirectory, open_run from ..data_reduction.red_writer import ReduceWriter, ReduceInitError def _parse_args(argv): argv = argv or sys.argv[1:] ap = ArgumentParser( description='Apply data reduction to HDF5 files structured in the ' 'European XFEL Data Format (EXDF).', formatter_class=RawDescriptionHelpFormatter) ap.add_argument( 'input', metavar='INPUT', type=Path, help='folder for input data or pX:rY[:prefix=raw] to load a proposal ' 'run from /gpfs/exfel/exp directly, currently always assumed to ' 'be a single run.') logging_group = ap.add_mutually_exclusive_group() logging_group.add_argument( '--verbose', '-v', action='store_true', help='set logging filter to DEBUG') logging_group.add_argument( '--quiet', '-q', action='store_true', help='set logging filter to WARNING') logging_group.add_argument( '--silent', '-s', action='store_true', help='set logging filter to ERROR') ap.add_argument( '--exclude-suspect-trains', action='store_true', help='whether trains considered suspect by their origin flag are ' 'excluded from the input data, by default all trains are ' 'considered ') for ep in iter_entry_points('exdf.data_reduction.method'): ep.load().arguments(ap) output_group = ap.add_argument_group( 'output arguments', 'Allows to configure the output location, scope and layout.\n' 'The scope limits what part of the input dataset is contained in the ' 'output in relation to the parts that were changed. It may range from ' 'only writing out the affected sources, the affected aggregators or ' 'producing an entire copy of all changed and unchanged sources.\n' 'The layout defines the way sources are distributed among ' 'aggregators. By default in the `collection` layout, they are ' 'assigned to the same aggregators as found in the input data. The ' '`collapsed` layout on the other hand combines all sources in a new ' 'single aggregator. The `voview` layout appear similar by presenting ' 'all data within a single file, but contains mostly virtual datasets ' 'pointing to the original input files.\n' 'Depending on the output layout, different default filename formats ' 'are used that may be overriden:\n' '- collection: ' 'R{data_category}-R{run:04d}-{aggregator}-S{sequence:05d}.h5\n' '- collapsed: RED-R{run:04d}-FLAT01-S{sequence:0d}.h5\n' '- voview: RED-R{run:04d}-VOVIEW01-S{sequence:05d}.h5') output_group.add_argument( '--output-folder', '-o', metavar='PATH', type=Path, action='store', default='./out', help='folder for output data') output_group.add_argument( '--output-scope', action='store', choices=['sources', 'aggregators', 'all', 'none'], default='aggregators', help='scope of what is contained in output, either only the ' '`sources` touched by data reduction, the `aggregator` ' 'touched, `all` data found in the input or `none` of it ' 'resulting in no actual output data files') output_group.add_argument( '--output-layout', action='store', choices=['collection', 'collapsed', 'voview'], default='collection', help='layout of output files, either as a `collection` of separate ' 'aggregator and sequence files, a single `collapsed` collection ' 'combining all former aggregators into one or a `voview` file ' 'referencing all data via virtual datasets') output_group.add_argument( '--output-sequence-len', metavar='NUM_TRAINS', action='store', type=int, default=-1, help='number of trains per sequence file or non-positive value to ' 'preserve input length (default), only used for `collection` and ' '`collapsed` output layout') output_group.add_argument( '--output-filename', metavar='FORMAT', action='store', type=str, help='filename format for output files with depending on layout may ' 'contain formatting placeholders for: data_category, run, ' 'aggregator, sequence') output_group.add_argument( '--output-version', metavar='VERSION', action='store', type=str, choices=['1.3', '1.2', '1.0', 'same'], default='1.3', help='file format version output files are written in, supported ' 'options are 1.3 (current version), 1.2 (no attributes) 1.0 ' '(no origin flag) or same (as input)') output_group.add_argument( '--to-recipe', action='store', type=Path, help='export to a data reduction recipe') return ap.parse_args(argv) proposal_run_input_pattern = re.compile( r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$') def _open_input_data(path, **dc_kwargs): dc_kwargs['_use_voview'] = False if not path.is_dir(): m = proposal_run_input_pattern.match(str(path)) if m: return open_run(proposal=m[1], run=m[2], data=m[3] or 'raw', **dc_kwargs) else: raise ValueError('input path neither a directory nor ' 'proposal/run pattern') else: return RunDirectory(path, **dc_kwargs) def _to_recipe(recipe_path, methods, inp_data, argv): headers = ['Generated by exdf-tools 1.0'] meta = inp_data.run_metadata() if meta.keys() >= {'proposalNumber', 'runNumber'}: headers.append('Applied to proposal {proposalNumber}, ' 'run {runNumber}'.format(**meta)) headers.append('Command line used:') argv = argv or sys.argv[1:] num_args = len(argv) i = 0 while i < num_args: if '--to-recipe' in argv[i]: i += 2 continue line = f' {argv[i]}' i += 1 if i < num_args and not argv[i].startswith('-'): line += f' {argv[i]}' i += 1 headers.append(line) from ..data_reduction import recipe with open(recipe_path, 'w') as f: recipe.dump(methods, f, headers=headers) def main(argv=None): args = _parse_args(argv) if args.verbose: level = logging.DEBUG elif args.quiet: level = logging.WARNING elif args.silent: level = logging.ERROR else: level = logging.INFO logging.basicConfig(stream=sys.stdout) logging.getLogger('exdf').setLevel(level) log = logging.getLogger('exdf.cli.reduce') before_open = perf_counter() inp_data = _open_input_data( args.input, inc_suspect_trains=not args.exclude_suspect_trains) after_open = perf_counter() log.info(f'Opened data from {Path(inp_data.files[0].filename).parent} ' f'containing {len(inp_data.all_sources)} sources and ' f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} ' f'files') debug_strs = [f'open={(after_open - before_open):.3g}'] methods = {} for ep in iter_entry_points('exdf.data_reduction.method'): before_method = perf_counter() method = ep.load()(inp_data, args) after_method = perf_counter() debug_strs.append(f'{ep.name}={(after_method - before_method):.3g}') if method: log.info(f'Applying {ep.name} with {len(method)} reduction ' f'operations') methods[ep.name] = method if args.to_recipe: _to_recipe(args.to_recipe, methods, inp_data, argv) try: before_writer = perf_counter() writer = ReduceWriter(inp_data, methods, args.output_scope, args.output_sequence_len, args.output_version) after_writer = perf_counter() debug_strs.append(f'writer={(after_writer - before_writer):.3g}') except ReduceInitError: log.critical('Failed to initialize reduction writer') return log.debug(f'Initialized ({", ".join(debug_strs)})') if args.output_scope == 'none': log.info('Not writing out any data files') return if args.output_filename is not None: output_filename = args.output_filename elif args.output_layout == 'collection': output_filename = 'R{data_category}-R{run:04d}-{aggregator}' \ '-S{sequence:05d}.h5' elif args.output_layout == 'collapsed': output_filename = 'RED-R{run:04d}-FLAT01-S{sequence:05d}.h5' elif args.output_layout == 'voview': output_filename = 'RED-R{run:04d}-VOVIEW01-S{sequence:05d}.h5' args.output_folder.mkdir(parents=True, exist_ok=True) output_path = args.output_folder.resolve() / output_filename if args.output_layout == 'collection': log.info(f'Writing collection to {output_path.parent}') writer.write_collection(output_path) elif args.output_layout == 'collapsed': log.info(f'Writing collapsed collection to {output_path.parent}') writer.write_collapsed(output_path) elif args.output_layout == 'voview': log.info(f'Writing voview to {output_path.parent}') writer.write_voview(output_path) if __name__ == '__main__': main()