Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""Apply data reduction to HDF5 files structured in the European XFEL
Data Format (EXDF).
"""
from argparse import ArgumentParser
from pathlib import Path
import logging
import re
import sys
from pkg_resources import iter_entry_points
from extra_data import RunDirectory, open_run
from exdf.data_reduction.sequence import ReductionSequence
proposal_run_input_pattern = re.compile(
r'^p(\d{4}|9\d{5}):r(\d{1,3}):*([a-z]*)$')
def main(argv=None):
ap = ArgumentParser(
description='Apply data reduction to HDF5 files structured in the '
'European XFEL Data Format (EXDF).'
)
ap.add_argument(
'input', metavar='INPUT', type=Path,
help='folder for input data or pX:rY[:prefix=raw] to load a proposal '
'run from /gpfs/exfel/exp directly, currently always assumed to '
'be a single run.'
)
logging_group = ap.add_mutually_exclusive_group()
logging_group.add_argument(
'--verbose', '-v', action='store_true',
help='set logging filter to DEBUG'
)
logging_group.add_argument(
'--quiet', '-q', action='store_true',
help='set logging filter to WARNING'
)
logging_group.add_argument(
'--silent', '-s', action='store_true',
help='set logging filter to ERROR'
)
for ep in iter_entry_points('exdf.data_reduction.method'):
ep.load().arguments(ap)
output_group = ap.add_argument_group(
'output arguments',
'Allows to configure the output location, scope and layout.'
)
output_group.add_argument(
'--output', '-o', metavar='PATH', type=Path, action='store',
default='./out',
help='folder for output data, or file if ending with .h5 '
'and used with \'voview\' output layout')
output_group.add_argument(
'--output-scope', action='store',
choices=['sources', 'aggregators', 'all', 'none'],
default='aggregators',
help='scope of what is contained in output, either only the '
'\'sources\' touched by data reduction, the \'aggregator\' '
'touched, \'all\' data found in the input or \'none\' of it '
'resulting in no actual output data files'
)
output_group.add_argument(
'--output-layout', action='store',
choices=['collection', 'voview', 'collapsed'],
default='collection',
help='layout of output files, either as a \'collection\' of separate '
'aggregator and sequence files, as a \'voview\' file containing '
'virtual datasets for all data or a single \'collapsed\' '
'collection combining all former aggregators into one'
)
output_group.add_argument(
'--output-sequence-len', action='store', type=int, default=256,
help='number of trains per sequence file or -1 to preserve input '
'length, only used for \'collection\' and \'collapsed\' output '
'layout'
)
output_group.add_argument(
'--to-script', action='store', type=Path,
help='export reduction operations to a script file'
)
# TODO: Whether to use suspect trains or not
# TODO: Whether to drop entirely empty sources
args = ap.parse_args(argv)
if args.verbose:
level = logging.DEBUG
elif args.quiet:
level = logging.WARNING
elif args.silent:
level = logging.ERROR
else:
level = logging.INFO
logging.basicConfig(stream=sys.stdout, level=level)
log = logging.getLogger('exdf.cli.reduce')
if not args.input.is_dir():
m = proposal_run_input_pattern.match(str(args.input))
if m:
inp_data = open_run(proposal=m[1], run=m[2], data=m[3] or 'raw')
log.info(f'Found proposal run at '
f'{Path(inp_data.files[0].filename).parent}')
else:
inp_data = RunDirectory(args.input)
log.info(f'Opened data with {len(inp_data.all_sources)} sources with '
f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
f'files')
methods = {}
for ep in iter_entry_points('exdf.data_reduction.method'):
method = ep.load()()
method.compile(inp_data, args)
if method:
log.info(f'Applying {ep.name} with {len(method)} reduction '
f'operations')
methods[ep.name] = method
if args.to_script:
with open(args.to_script, 'w') as f:
for name, method in methods.items():
f.write(f'# {name}\n')
for instr in method._instructions:
f.write('{}: {}\n'.format(
instr[0], '; '.join([str(x) for x in instr[1:]])))
seq = ReductionSequence(inp_data, methods)
seq.link(args.output_scope)
if args.output_scope == 'none':
log.info('Not writing out any data files')
return
if args.output_layout == 'collection':
args.output.mkdir(parents=True, exist_ok=True)
log.debug(f'Writing collection to {args.output}')
seq.write_collection(args.output, args.output_sequence_len)
elif args.output_layout == 'voview':
if args.output.is_file() or args.output.suffix == '.h5':
output_path = args.output
else:
args.output.mkdir(parents=True, exist_ok=True)
output_path = args.output / 'output.h5'
log.debug(f'Writing voview to {output_path}')
seq.write_voview(output_path)
elif args.output_layout == 'collapsed':
# Requires datasets to be in memory for now.
log.warning('In the current implementation, the entire output data '
'must be loaded to memory')
if input('Still continue? [y/n] ') != 'y':
return
args.output.mkdir(parents=True, exist_ok=True)
log.debug(f'Writing collapsed collection to {args.output}')
seq.write_collapsed(args.output, args.output_sequence_len)
if __name__ == '__main__':
main()