diff --git a/src/exdf/cli/reduce.py b/src/exdf/cli/reduce.py index 0a192bb6d365396b07ebf0600d1a76527e654b15..bdb4a41d74c2ab91ceb3caba00eb81aa3ea58816 100644 --- a/src/exdf/cli/reduce.py +++ b/src/exdf/cli/reduce.py @@ -5,6 +5,7 @@ Data Format (EXDF). from argparse import ArgumentParser, RawDescriptionHelpFormatter from pathlib import Path +from time import perf_counter import logging import re import sys @@ -192,17 +193,24 @@ def main(argv=None): logging.getLogger('exdf').setLevel(level) log = logging.getLogger('exdf.cli.reduce') + before_open = perf_counter() inp_data = _open_input_data( args.input, inc_suspect_trains=not args.exclude_suspect_trains) + after_open = perf_counter() log.info(f'Opened data from {Path(inp_data.files[0].filename).parent} ' f'containing {len(inp_data.all_sources)} sources and ' f'{len(inp_data.train_ids)} trains across {len(inp_data.files)} ' f'files') + debug_strs = [f'open={(after_open - before_open):.3g}'] + methods = {} for ep in iter_entry_points('exdf.data_reduction.method'): + before_method = perf_counter() method = ep.load()(inp_data, args) + after_method = perf_counter() + debug_strs.append(f'{ep.name}={(after_method - before_method):.3g}') if method: log.info(f'Applying {ep.name} with {len(method)} reduction ' @@ -213,12 +221,17 @@ def main(argv=None): _to_recipe(args.to_recipe, methods, inp_data, argv) try: + before_writer = perf_counter() writer = ReduceWriter(inp_data, methods, args.output_scope, args.output_sequence_len, args.output_version) + after_writer = perf_counter() + debug_strs.append(f'writer={(after_writer - before_writer):.3g}') except ReduceInitError: log.critical('Failed to initialize reduction writer') return + log.debug(f'Initialized ({", ".join(debug_strs)})') + if args.output_scope == 'none': log.info('Not writing out any data files') return diff --git a/src/exdf/write/sd_writer.py b/src/exdf/write/sd_writer.py index cdeae23d4ea6831272b3ca5f1c2f824f2d4215c9..9e05ac7f55349b19e1f502e2ab964cc95acf396f 100644 --- a/src/exdf/write/sd_writer.py +++ b/src/exdf/write/sd_writer.py @@ -13,6 +13,8 @@ from functools import reduce from itertools import accumulate from logging import getLogger from operator import or_ +from os.path import basename +from time import perf_counter import numpy as np @@ -106,14 +108,31 @@ class SourceDataWriter: None """ + start = perf_counter() + with DataFile(output_path, 'w', driver='core') as f: + after_open = perf_counter() + self.write_base(f, sources, sequence) + after_base = perf_counter() self.write_control( f, [sd for sd in sources if sd.is_control]) + after_control = perf_counter() self.write_instrument( f, [sd for sd in sources if sd.is_instrument]) + after_instrument = perf_counter() + + after_close = perf_counter() + + log.debug('Sequence {} written (total={:.3g}, open={:.3g}, ' + 'base={:.3g}, control={:.3g}, instrument={:.3g}, ' + 'close={:.3g})'.format( + basename(output_path), + after_close - start, after_open - start, + after_base - after_open, after_control - after_base, + after_instrument - after_control, after_close - after_instrument)) def write_base(self, f, sources, sequence): """Write METADATA, INDEX and source groups. @@ -219,6 +238,8 @@ class SourceDataWriter: None """ + file_base = basename(f.filename) + # Must be re-read at this point, as additional trains could have # been introduced in this sequence. train_ids = np.array(f['INDEX/trainId']) @@ -269,16 +290,42 @@ class SourceDataWriter: for sd in sources: h5source = f.source[sd.source] + start_source = perf_counter() + for index_group in sd.index_groups: mask = masks.get(index_group, np.s_[:]) for key in iter_index_group_keys(keys, index_group): # TODO: Copy by chunk / file if too large + start_key = perf_counter() + + full_data = sd[key].ndarray() + after_read = perf_counter() + + masked_data = full_data[mask] + after_mask = perf_counter() + self.copy_instrument_data( sd.source, key, h5source.key[key], sd[key].train_id_coordinates()[mask], - sd[key].ndarray()[mask]) + masked_data) + + after_copy = perf_counter() + + log.debug('INSTRUMENT/{}/{} written to {} (total={:.3g}, ' + 'read={:.3g} @ {:.3g}M, mask={:.3g}, ' + 'write={:.3g} @ {:.3g}M)'.format( + sd.source, key, file_base, + after_copy - start_key, after_read - start_key, + full_data.nbytes / 2**20 / (after_read - start_key), + after_mask - after_read, after_copy - after_mask, + masked_data.nbytes / 2**20 / (after_copy - after_mask))) + + after_source = perf_counter() + + log.debug('INSTRUMENT/{} written to {} (total={:.3g})'.format( + sd.source, file_base, after_source - start_source)) def get_index_root_data(sources):