Newer
Older
"""Apply data reduction to HDF5 files structured in the European XFEL
Data Format (EXDF).
"""
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from pathlib import Path
import logging
import re
import sys
from pkg_resources import iter_entry_points
from extra_data import RunDirectory, open_run
from ..data_reduction.writer import ReduceWriter
def _parse_args(argv):
argv = argv or sys.argv[1:]
ap = ArgumentParser(
description='Apply data reduction to HDF5 files structured in the '
'European XFEL Data Format (EXDF).',
formatter_class=RawDescriptionHelpFormatter)
ap.add_argument(
'input', metavar='INPUT', type=Path,
help='folder for input data or pX:rY[:prefix=raw] to load a proposal '
'run from /gpfs/exfel/exp directly, currently always assumed to '
logging_group = ap.add_mutually_exclusive_group()
logging_group.add_argument(
'--verbose', '-v', action='store_true',
help='set logging filter to DEBUG')
logging_group.add_argument(
'--quiet', '-q', action='store_true',
help='set logging filter to WARNING')
logging_group.add_argument(
'--silent', '-s', action='store_true',
help='set logging filter to ERROR')
ap.add_argument(
'--exclude-suspect-trains', action='store_true',
help='whether trains considered suspect by their origin flag are '
'excluded from the input data, by default all trains are '
'considered ')
for ep in iter_entry_points('exdf.data_reduction.method'):
ep.load().arguments(ap)
output_group = ap.add_argument_group(
'output arguments',
'Allows to configure the output location, scope and layout.\n'
'The scope limits what part of the input dataset is contained in the '
'output in relation to the parts that were changed. It may range from '
'only writing out the affected sources, the affected aggregators or '
'producing an entire copy of all changed and unchanged sources.\n'
'The layout defines the way sources are distributed among '
'aggregators. By default in the `collection` layout, they are '
'assigned to the same aggregators as found in the input data. The '
'`collapsed` layout on the other hand combines all sources in a new '
'single aggregator. The `voview` layout appear similar by presenting '
'all data within a single file, but contains mostly virtual datasets '
'pointing to the original input files.\n'
'Depending on the output layout, different default filename formats '
'are used that may be overriden:\n'
'- collection: '
'R{data_category}-R{run:04d}-{aggregator}-S{sequence:05d}.h5\n'
'- collapsed: RED-R{run:04d}-FLAT01-S{sequence:0d}.h5\n'
'- voview: RED-R{run:04d}-VOVIEW01-S{sequence:05d}.h5')
output_group.add_argument(
'--output-folder', '-o', metavar='PATH', type=Path, action='store',
default='./out',
help='folder for output data')
output_group.add_argument(
'--output-scope', action='store',
choices=['sources', 'aggregators', 'all', 'none'],
default='aggregators',
help='scope of what is contained in output, either only the '
'`sources` touched by data reduction, the `aggregator` '
'touched, `all` data found in the input or `none` of it '
'resulting in no actual output data files')
output_group.add_argument(
'--output-layout', action='store',
choices=['collection', 'collapsed', 'voview'],
default='collection',
help='layout of output files, either as a `collection` of separate '
'aggregator and sequence files, a single `collapsed` collection '
'combining all former aggregators into one or a `voview` file '
'referencing all data via virtual datasets')
output_group.add_argument(
'--output-sequence-len', metavar='NUM_TRAINS', action='store',
help='number of trains per sequence file or non-positive value to '
'preserve input length (default), only used for `collection` and '
'`collapsed` output layout')
output_group.add_argument(
'--output-filename', metavar='FORMAT', action='store', type=str,
help='filename format for output files with depending on layout may '
'contain formatting placeholders for: data_category, run, '
'aggregator, sequence')
output_group.add_argument(
'--output-version', metavar='VERSION', action='store', type=str,
choices=['1.3', '1.2', '1.0', 'same'], default='1.3',
help='file format version output files are written in, supported '
'options are 1.3 (current version), 1.2 (no attributes) 1.0 '
'(no origin flag) or same (as input)')
output_group.add_argument(
'--to-recipe', action='store', type=Path,
help='export to a data reduction recipe')
return ap.parse_args(argv)
proposal_run_input_pattern = re.compile(
r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$')
def _open_input_data(path, **dc_kwargs):
dc_kwargs['_use_voview'] = False
if not path.is_dir():
m = proposal_run_input_pattern.match(str(path))
if m:
return open_run(proposal=m[1], run=m[2], data=m[3] or 'raw',
**dc_kwargs)
else:
raise ValueError('input path neither a directory nor '
'proposal/run pattern')
else:
return RunDirectory(path, **dc_kwargs)
def _to_recipe(recipe_path, methods, inp_data, argv):
headers = ['Generated by exdf-tools 1.0']
meta = inp_data.run_metadata()
if meta.keys() >= {'proposalNumber', 'runNumber'}:
headers.append('Applied to proposal {proposalNumber}, '
'run {runNumber}'.format(**meta))
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
headers.append('Command line used:')
argv = argv or sys.argv[1:]
num_args = len(argv)
i = 0
while i < num_args:
if '--to-recipe' in argv[i]:
i += 2
continue
line = f' {argv[i]}'
i += 1
if i < num_args and not argv[i].startswith('-'):
line += f' {argv[i]}'
i += 1
headers.append(line)
from ..data_reduction import recipe
with open(recipe_path, 'w') as f:
recipe.dump(methods, f, headers=headers)
def main(argv=None):
args = _parse_args(argv)
if args.verbose:
level = logging.DEBUG
elif args.quiet:
level = logging.WARNING
elif args.silent:
level = logging.ERROR
else:
level = logging.INFO
logging.basicConfig(stream=sys.stdout)
logging.getLogger('exdf').setLevel(level)
log = logging.getLogger('exdf.cli.reduce')
inp_data = _open_input_data(
args.input, inc_suspect_trains=not args.exclude_suspect_trains)
log.info(f'Opened data from {Path(inp_data.files[0].filename).parent} '
f'containing {len(inp_data.all_sources)} sources and '
f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
f'files')
methods = {}
for ep in iter_entry_points('exdf.data_reduction.method'):
method = ep.load()(inp_data, args)
if method:
log.info(f'Applying {ep.name} with {len(method)} reduction '
f'operations')
methods[ep.name] = method
if args.to_recipe:
_to_recipe(args.to_recipe, methods, inp_data, argv)
writer = ReduceWriter(
inp_data, methods,
args.output_scope, args.output_sequence_len, args.output_version)
if args.output_scope == 'none':
log.info('Not writing out any data files')
return
if args.output_filename is not None:
output_filename = args.output_filename
elif args.output_layout == 'collection':
output_filename = 'R{data_category}-R{run:04d}-{aggregator}' \
'-S{sequence:05d}.h5'
elif args.output_layout == 'collapsed':
output_filename = 'RED-R{run:04d}-FLAT01-S{sequence:05d}.h5'
elif args.output_layout == 'voview':
output_filename = 'RED-R{run:04d}-VOVIEW01-S{sequence:05d}.h5'
args.output_folder.mkdir(parents=True, exist_ok=True)
output_path = args.output_folder.resolve() / output_filename
if args.output_layout == 'collection':
log.info(f'Writing collection to {output_path.parent}')
writer.write_collection(output_path)
elif args.output_layout == 'collapsed':
log.info(f'Writing collapsed collection to {output_path.parent}')
writer.write_collapsed(output_path)
elif args.output_layout == 'voview':
log.info(f'Writing voview to {output_path.parent}')
writer.write_voview(output_path)
if __name__ == '__main__':
main()