Skip to content
Snippets Groups Projects
Commit 9ab86160 authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Add verbose profiling to debug logging

parent 11d67887
No related branches found
No related tags found
1 merge request!7Add verbose profiling to debug logging
...@@ -5,6 +5,7 @@ Data Format (EXDF). ...@@ -5,6 +5,7 @@ Data Format (EXDF).
from argparse import ArgumentParser, RawDescriptionHelpFormatter from argparse import ArgumentParser, RawDescriptionHelpFormatter
from pathlib import Path from pathlib import Path
from time import perf_counter
import logging import logging
import re import re
import sys import sys
...@@ -192,17 +193,24 @@ def main(argv=None): ...@@ -192,17 +193,24 @@ def main(argv=None):
logging.getLogger('exdf').setLevel(level) logging.getLogger('exdf').setLevel(level)
log = logging.getLogger('exdf.cli.reduce') log = logging.getLogger('exdf.cli.reduce')
before_open = perf_counter()
inp_data = _open_input_data( inp_data = _open_input_data(
args.input, inc_suspect_trains=not args.exclude_suspect_trains) args.input, inc_suspect_trains=not args.exclude_suspect_trains)
after_open = perf_counter()
log.info(f'Opened data from {Path(inp_data.files[0].filename).parent} ' log.info(f'Opened data from {Path(inp_data.files[0].filename).parent} '
f'containing {len(inp_data.all_sources)} sources and ' f'containing {len(inp_data.all_sources)} sources and '
f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} ' f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} '
f'files') f'files')
debug_strs = [f'open={(after_open - before_open):.3g}']
methods = {} methods = {}
for ep in iter_entry_points('exdf.data_reduction.method'): for ep in iter_entry_points('exdf.data_reduction.method'):
before_method = perf_counter()
method = ep.load()(inp_data, args) method = ep.load()(inp_data, args)
after_method = perf_counter()
debug_strs.append(f'{ep.name}={(after_method - before_method):.3g}')
if method: if method:
log.info(f'Applying {ep.name} with {len(method)} reduction ' log.info(f'Applying {ep.name} with {len(method)} reduction '
...@@ -213,12 +221,17 @@ def main(argv=None): ...@@ -213,12 +221,17 @@ def main(argv=None):
_to_recipe(args.to_recipe, methods, inp_data, argv) _to_recipe(args.to_recipe, methods, inp_data, argv)
try: try:
before_writer = perf_counter()
writer = ReduceWriter(inp_data, methods, args.output_scope, writer = ReduceWriter(inp_data, methods, args.output_scope,
args.output_sequence_len, args.output_version) args.output_sequence_len, args.output_version)
after_writer = perf_counter()
debug_strs.append(f'writer={(after_writer - before_writer):.3g}')
except ReduceInitError: except ReduceInitError:
log.critical('Failed to initialize reduction writer') log.critical('Failed to initialize reduction writer')
return return
log.debug(f'Initialized ({", ".join(debug_strs)})')
if args.output_scope == 'none': if args.output_scope == 'none':
log.info('Not writing out any data files') log.info('Not writing out any data files')
return return
......
...@@ -13,6 +13,8 @@ from functools import reduce ...@@ -13,6 +13,8 @@ from functools import reduce
from itertools import accumulate from itertools import accumulate
from logging import getLogger from logging import getLogger
from operator import or_ from operator import or_
from os.path import basename
from time import perf_counter
import numpy as np import numpy as np
...@@ -106,14 +108,31 @@ class SourceDataWriter: ...@@ -106,14 +108,31 @@ class SourceDataWriter:
None None
""" """
start = perf_counter()
with DataFile(output_path, 'w', driver='core') as f: with DataFile(output_path, 'w', driver='core') as f:
after_open = perf_counter()
self.write_base(f, sources, sequence) self.write_base(f, sources, sequence)
after_base = perf_counter()
self.write_control( self.write_control(
f, [sd for sd in sources if sd.is_control]) f, [sd for sd in sources if sd.is_control])
after_control = perf_counter()
self.write_instrument( self.write_instrument(
f, [sd for sd in sources if sd.is_instrument]) f, [sd for sd in sources if sd.is_instrument])
after_instrument = perf_counter()
after_close = perf_counter()
log.debug('Sequence {} written (total={:.3g}, open={:.3g}, '
'base={:.3g}, control={:.3g}, instrument={:.3g}, '
'close={:.3g})'.format(
basename(output_path),
after_close - start, after_open - start,
after_base - after_open, after_control - after_base,
after_instrument - after_control, after_close - after_instrument))
def write_base(self, f, sources, sequence): def write_base(self, f, sources, sequence):
"""Write METADATA, INDEX and source groups. """Write METADATA, INDEX and source groups.
...@@ -219,6 +238,8 @@ class SourceDataWriter: ...@@ -219,6 +238,8 @@ class SourceDataWriter:
None None
""" """
file_base = basename(f.filename)
# Must be re-read at this point, as additional trains could have # Must be re-read at this point, as additional trains could have
# been introduced in this sequence. # been introduced in this sequence.
train_ids = np.array(f['INDEX/trainId']) train_ids = np.array(f['INDEX/trainId'])
...@@ -269,16 +290,42 @@ class SourceDataWriter: ...@@ -269,16 +290,42 @@ class SourceDataWriter:
for sd in sources: for sd in sources:
h5source = f.source[sd.source] h5source = f.source[sd.source]
start_source = perf_counter()
for index_group in sd.index_groups: for index_group in sd.index_groups:
mask = masks.get(index_group, np.s_[:]) mask = masks.get(index_group, np.s_[:])
for key in iter_index_group_keys(keys, index_group): for key in iter_index_group_keys(keys, index_group):
# TODO: Copy by chunk / file if too large # TODO: Copy by chunk / file if too large
start_key = perf_counter()
full_data = sd[key].ndarray()
after_read = perf_counter()
masked_data = full_data[mask]
after_mask = perf_counter()
self.copy_instrument_data( self.copy_instrument_data(
sd.source, key, h5source.key[key], sd.source, key, h5source.key[key],
sd[key].train_id_coordinates()[mask], sd[key].train_id_coordinates()[mask],
sd[key].ndarray()[mask]) masked_data)
after_copy = perf_counter()
log.debug('INSTRUMENT/{}/{} written to {} (total={:.3g}, '
'read={:.3g} @ {:.3g}M, mask={:.3g}, '
'write={:.3g} @ {:.3g}M)'.format(
sd.source, key, file_base,
after_copy - start_key, after_read - start_key,
full_data.nbytes / 2**20 / (after_read - start_key),
after_mask - after_read, after_copy - after_mask,
masked_data.nbytes / 2**20 / (after_copy - after_mask)))
after_source = perf_counter()
log.debug('INSTRUMENT/{} written to {} (total={:.3g})'.format(
sd.source, file_base, after_source - start_source))
def get_index_root_data(sources): def get_index_root_data(sources):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment