diff --git a/setup.py b/setup.py index 2dae071bc05a54899308b6dfe0e0154dd5ba159b..6f173dedca4941c4faaba4349204e75579390192 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,7 @@ setup( 'AgipdGain = exdf.data_reduction.builtins:AgipdGain', 'LpdMini = exdf.data_reduction.builtins:LpdMini', 'LitFrames = exdf.data_reduction.builtins:LitFrames', + 'FrameList = exdf.data_reduction.builtins:FrameList', ] }, diff --git a/src/exdf/data_reduction/builtins.py b/src/exdf/data_reduction/builtins.py index 37bc2923ca650d70297e61389acb4ea91b832bf0..10d73f2f9331c469fd3c36808991f4a1e028b82f 100644 --- a/src/exdf/data_reduction/builtins.py +++ b/src/exdf/data_reduction/builtins.py @@ -3,7 +3,7 @@ from logging import getLogger import numpy as np from exdf.data_reduction import ReductionMethod -from extra_data import SourceNameError, by_id +from extra_data import SourceNameError, by_id, RunDirectory def parse_slice(value): @@ -399,3 +399,192 @@ class LitFrames(ReductionMethod): ) if nrec > max_lines: self.log.info(f"... {nrec - max_lines + 1} more lines skipped") + + +class FrameList(ReductionMethod): + log = getLogger('exdf.data_reduction.builtins.FrameList') + + event_type = np.dtype([ + ('tid', np.uint64), ('pid', np.uint64), ('cls', np.uint16)]) + + @staticmethod + def arguments(ap): + group = ap.add_argument_group( + 'Candidate frame selection', + 'Allows to filter frames by candidate list') + + group.add_argument( + '--framelist-det-sources', + action='store', type=str, + help='Detector sources to filter', + ) + group.add_argument( + '--framelist-file', + action='store', type=str, + help='Candidate list file, if none use data collection', + ) + group.add_argument( + '--framelist-filetype', + action='store', type=str, choices=['exdf', 'h5', 'csv'], default='exdf', + help='Type of the candidate list file', + ) + group.add_argument( + '--framelist-entry', + action='store', type=str, default='', + help='Source name in EXDF format or dataset prefix in other formats', + ) + group.add_argument( + '--framelist-class-key', + action='store', type=str, + help='Key of frame class dataset', + ) + group.add_argument( + '--framelist-train-id-key', + action='store', type=str, + help='Key of the train id dataset', + ) + group.add_argument( + '--framelist-pulse-id-key', + action='store', type=str, + help='Key of the pulse id dataset', + ) + group.add_argument( + '--framelist-keep-missed', + action='store_true', + help='Keep frames that are not covered by the candidate list file, + ) + group.add_argument( + '--framelist-drop-classes', + nargs='+', type=int, default=[0], + help='The list of class numbers to drop, default [0]', + ) + + def check_keys(self, args): + self.class_key = args.framelist_class_key + if self.class_key is None: + raise ValueError( + "The --framelist-class-key option is not specified.") + ckey_rpart = self.class_key.rpartition('.') + ckey_prefix = ckey_rpart[0] + ckey_rpart[1] + + self.train_id_key = args.framelist_train_id_key + if self.train_id_key is None: + self.train_id_key = ckey_prefix + 'trainId' + + self.pulse_id_key = args.framelist_pulse_id_key + if self.pulse_id_key is None: + self.pulse_id_key = ckey_prefix + 'pulseId' + + def from_exdf(self, data, args): + clist = ( + data if args.framelist_file is None + else RunDirectory(args.framelist_file) + ) + source = args.framelist_entry + group = clist[source] + tid = group[self.train_id_key].ndarray() + pid = group[self.pulse_id_key].ndarray() + cls = group[self.class_key].ndarray() + return np.rec.fromarrays([tid, pid, cls], dtype=self.event_type) + + def from_h5(self, args): + import h5py + with h5py.File(args.framelist_file) as f: + entry = f[args.framelist_entry] + tid = entry[self.train_id_key][:] + pid = entry[self.pulse_id_key][:] + cls = entry[self.class_key][:] + return np.rec.fromarrays([tid, pid, cls], dtype=self.event_type) + + def from_csv(self, args): + import pandas as pd + ds = pd.read_csv(args.framelist_file) + return np.rec.fromarrays( + [ + ds[args.framelist_entry + self.train_id_key], + ds[args.framelist_entry + self.pulse_id_key], + ds[args.framelist_entry + self.class_key] + ], + dtype=self.event_type + ) + + def __init__(self, data, args): + if not args.framelist_det_sources: + return + + self.check_keys(args) + try: + if args.framelist_filetype == 'exdf': + evt = self.from_exdf(data, args) + elif args.framelist_filetype == 'h5': + evt = self.from_h5(args) + elif args.framelist_filetype == 'csv': + evt = self.from_csv(args) + else: + # This branch is for completeness here + # The value of `--framelist-filetype` options + # should be already checked by the `argparse` + raise ValueError("Unknown filetype.") + except FileNotFoundError: + self.log.error(f"File '{args.framelist_file}' not found.") + return + except (SourceNameError, KeyError): + self.log.error("Required keys are not found in data.") + return + except Exception as e: + self.log.error(f"Exception when reading the candidate list: {e}.") + return + + # group by trains + trains, first, count = np.unique(evt.tid, return_index=True, return_counts=True) + drop_classes = np.array(sorted(set(args.framelist_drop_classes))) + + # loop over sources + det = data.select(args.framelist_det_sources, "image.*") + for source_req in det.instrument_sources: + # resolve legacy source + src = data.legacy_sources.get(source_req, source_req) + self.log.info("select frames in " + src) + + num_drop = 0 + num_keep = 0 + num_missed_frames = 0 + num_missed_trains = 0 + + key = data[src, "image.pulseId"].drop_empty_trains() + for train_id, pulse_ids in key.trains(): + num_frames = len(pulse_ids) + if args.framelist_keep_missed: + mask = np.ones(num_frames, bool) + else: + mask = np.zeros(num_frames, bool) + i = np.argmax(trains == train_id) + if trains[i] == train_id: + f0 = first[i] + fN = f0 + count[i] + classified = np.isin(pulse_ids, evt.pid[f0:fN]) + available = np.isin(evt.pid[f0:fN], pulse_ids[classified]) + mask[classified] = np.isin( + evt.cls[f0:fN][available], + drop_classes, + assume_unique=True, + invert=True, + ) + num_missed_frames += num_frames - np.sum(classified) + else: + num_missed_trains += 1 + + reduced_num_frames = np.sum(mask) + num_keep += reduced_num_frames + num_drop += num_frames - reduced_num_frames + + if src.endswith(':xtdf'): + self.select_xtdf(src, by_id[[train_id]], mask) + else: + self.select_entries(src, 'image', by_id[[train_id]], mask) + + ratio = num_keep / (num_keep + num_drop) + self.log.info(f" - {num_keep} frames keep, {num_drop} frames drop, " + f"ratio: {ratio:.2%}") + self.log.info(f" - decision is missed for {num_missed_trains} trains, " + f"{num_missed_frames} frames")