Skip to content
Snippets Groups Projects

Frame filtering by candidate list

Open Egor Sobolev requested to merge feat/candidate-list into feat/legacy
2 unresolved threads
Files
2
@@ -3,7 +3,7 @@ from logging import getLogger
import numpy as np
from exdf.data_reduction import ReductionMethod
from extra_data import SourceNameError, by_id
from extra_data import SourceNameError, by_id, RunDirectory
def parse_slice(value):
@@ -399,3 +399,192 @@ class LitFrames(ReductionMethod):
)
if nrec > max_lines:
self.log.info(f"... {nrec - max_lines + 1} more lines skipped")
class FrameList(ReductionMethod):
log = getLogger('exdf.data_reduction.builtins.FrameList')
event_type = np.dtype([
('tid', np.uint64), ('pid', np.uint64), ('cls', np.uint16)])
@staticmethod
def arguments(ap):
group = ap.add_argument_group(
'Candidate frame selection',
'Allows to filter frames by candidate list')
group.add_argument(
'--framelist-det-sources',
action='store', type=str,
help='Detector sources to filter',
)
group.add_argument(
'--framelist-file',
action='store', type=str,
help='Candidate list file, if none use data collection',
)
group.add_argument(
'--framelist-filetype',
action='store', type=str, choices=['exdf', 'h5', 'csv'], default='exdf',
help='Type of the candidate list file',
)
group.add_argument(
'--framelist-entry',
action='store', type=str, default='',
help='Source name in EXDF format or dataset prefix in other formats',
)
group.add_argument(
'--framelist-class-key',
action='store', type=str,
help='Key of frame class dataset',
)
group.add_argument(
'--framelist-train-id-key',
action='store', type=str,
help='Key of the train id dataset',
)
group.add_argument(
'--framelist-pulse-id-key',
action='store', type=str,
help='Key of the pulse id dataset',
)
group.add_argument(
'--framelist-keep-missed',
action='store_true',
help='Keep frames that are not covered by the candidate list file,
)
group.add_argument(
'--framelist-drop-classes',
nargs='+', type=int, default=[0],
help='The list of class numbers to drop, default [0]',
)
def check_keys(self, args):
self.class_key = args.framelist_class_key
if self.class_key is None:
raise ValueError(
"The --framelist-class-key option is not specified.")
ckey_rpart = self.class_key.rpartition('.')
ckey_prefix = ckey_rpart[0] + ckey_rpart[1]
self.train_id_key = args.framelist_train_id_key
if self.train_id_key is None:
self.train_id_key = ckey_prefix + 'trainId'
self.pulse_id_key = args.framelist_pulse_id_key
if self.pulse_id_key is None:
self.pulse_id_key = ckey_prefix + 'pulseId'
def from_exdf(self, data, args):
clist = (
data if args.framelist_file is None
else RunDirectory(args.framelist_file)
)
source = args.framelist_entry
group = clist[source]
tid = group[self.train_id_key].ndarray()
pid = group[self.pulse_id_key].ndarray()
cls = group[self.class_key].ndarray()
return np.rec.fromarrays([tid, pid, cls], dtype=self.event_type)
def from_h5(self, args):
import h5py
with h5py.File(args.framelist_file) as f:
entry = f[args.framelist_entry]
tid = entry[self.train_id_key][:]
pid = entry[self.pulse_id_key][:]
cls = entry[self.class_key][:]
return np.rec.fromarrays([tid, pid, cls], dtype=self.event_type)
def from_csv(self, args):
import pandas as pd
ds = pd.read_csv(args.framelist_file)
return np.rec.fromarrays(
[
ds[args.framelist_entry + self.train_id_key],
ds[args.framelist_entry + self.pulse_id_key],
ds[args.framelist_entry + self.class_key]
],
dtype=self.event_type
)
def __init__(self, data, args):
if not args.framelist_det_sources:
return
self.check_keys(args)
try:
if args.framelist_filetype == 'exdf':
evt = self.from_exdf(data, args)
elif args.framelist_filetype == 'h5':
evt = self.from_h5(args)
elif args.framelist_filetype == 'csv':
evt = self.from_csv(args)
else:
# This branch is for completeness here
# The value of `--framelist-filetype` options
# should be already checked by the `argparse`
raise ValueError("Unknown filetype.")
except FileNotFoundError:
self.log.error(f"File '{args.framelist_file}' not found.")
return
except (SourceNameError, KeyError):
self.log.error("Required keys are not found in data.")
return
except Exception as e:
self.log.error(f"Exception when reading the candidate list: {e}.")
return
# group by trains
trains, first, count = np.unique(evt.tid, return_index=True, return_counts=True)
drop_classes = np.array(sorted(set(args.framelist_drop_classes)))
# loop over sources
det = data.select(args.framelist_det_sources, "image.*")
for source_req in det.instrument_sources:
# resolve legacy source
src = data.legacy_sources.get(source_req, source_req)
self.log.info("select frames in " + src)
num_drop = 0
num_keep = 0
num_missed_frames = 0
num_missed_trains = 0
key = data[src, "image.pulseId"].drop_empty_trains()
for train_id, pulse_ids in key.trains():
num_frames = len(pulse_ids)
if args.framelist_keep_missed:
mask = np.ones(num_frames, bool)
else:
mask = np.zeros(num_frames, bool)
i = np.argmax(trains == train_id)
if trains[i] == train_id:
f0 = first[i]
fN = f0 + count[i]
classified = np.isin(pulse_ids, evt.pid[f0:fN])
available = np.isin(evt.pid[f0:fN], pulse_ids[classified])
mask[classified] = np.isin(
evt.cls[f0:fN][available],
drop_classes,
assume_unique=True,
invert=True,
)
num_missed_frames += num_frames - np.sum(classified)
else:
num_missed_trains += 1
reduced_num_frames = np.sum(mask)
num_keep += reduced_num_frames
num_drop += num_frames - reduced_num_frames
if src.endswith(':xtdf'):
self.select_xtdf(src, by_id[[train_id]], mask)
else:
self.select_entries(src, 'image', by_id[[train_id]], mask)
ratio = num_keep / (num_keep + num_drop)
self.log.info(f" - {num_keep} frames keep, {num_drop} frames drop, "
f"ratio: {ratio:.2%}")
self.log.info(f" - decision is missed for {num_missed_trains} trains, "
f"{num_missed_frames} frames")
Loading