Skip to content
Snippets Groups Projects

Frame filtering by candidate list

Open Egor Sobolev requested to merge feat/candidate-list into feat/legacy
Files
2
@@ -3,7 +3,7 @@ from logging import getLogger
import numpy as np
from exdf.data_reduction import ReductionMethod
from extra_data import SourceNameError, by_id
from extra_data import SourceNameError, by_id, RunDirectory
def parse_slice(value):
@@ -399,3 +399,169 @@ class LitFrames(ReductionMethod):
)
if nrec > max_lines:
self.log.info(f"... {nrec - max_lines + 1} more lines skipped")
class FrameList(ReductionMethod):
log = getLogger('exdf.data_reduction.builtins.FrameList')
event_type = np.dtype([
('tid', np.uint64), ('pid', np.uint64), ('cls', np.uint16)])
@staticmethod
def arguments(ap):
group = ap.add_argument_group(
'Candidate frame selection',
'Allows to filter frames by candidate list')
group.add_argument(
'--framelist-det-sources',
action='store', type=str,
help='Detector sources to filter',
)
group.add_argument(
'--framelist-file',
action='store', type=str,
help='Candidate list file, if none use data collection',
)
group.add_argument(
'--framelist-filetype',
action='store', type=str, choices=['exdf', 'h5', 'csv'], default='exdf',
help='Candidate list file, if none use data collection',
)
group.add_argument(
'--framelist-entry',
action='store', type=str, default='',
help='Source name in EXDF format or dataset prefix in other formats',
)
group.add_argument(
'--framelist-class-key',
action='store', type=str,
help='Key of frame class dataset',
)
group.add_argument(
'--framelist-train-id-key',
action='store', type=str,
help='Key of frame class dataset',
)
group.add_argument(
'--framelist-pulse-id-key',
action='store', type=str,
help='Key of frame class dataset',
)
group.add_argument(
'--framelist-keep-missed',
action='store_true',
help='Key of frame class dataset',
)
def from_exdf(self, data, args):
clist = (
data if args.framelist_file is None
else RunDirectory(args.framelist_file)
)
source = args.framelist_entry
class_key = args.framelist_class_key
if class_key is None:
raise ValueError(
"The --framelist-class-key option is not specified.")
train_id_key = args.framelist_train_id_key
if train_id_key is None:
train_id_key = class_key.rpartition('.')[0] + '.trainId'
pulse_id_key = args.framelist_pulse_id_key
if pulse_id_key is None:
pulse_id_key = class_key.rpartition('.')[0] + '.pulseId'
group = clist[source]
cls = group[class_key].ndarray()
tid = group[train_id_key].ndarray()
pid = group[pulse_id_key].ndarray()
return np.rec.fromarrays([tid, pid, cls], dtype=self.event_type)
def from_h5(self, args):
import h5py
with h5py.File(args.framelist_file) as f:
entry = f[args.framelist_entry]
tid = entry[args.framelist_train_id_key][:]
pid = entry[args.framelist_pulse_id_key][:]
cls = entry[args.framelist_class_key][:]
return np.rec.fromarrays([tid, pid, cls], dtype=self.event_type)
def from_csv(self, args):
import pandas as pd
ds = pd.read_csv(args.framelist_file)
return np.rec.fromarrays(
[
ds[args.framelist_entry + args.framelist_train_id_key],
ds[args.framelist_entry + args.framelist_pulse_id_key],
ds[args.framelist_entry + args.framelist_class_key]
],
dtype=self.event_type
)
def __init__(self, data, args):
try:
if args.framelist_filetype == 'exdf':
evt = self.from_exdf(data, args)
elif args.framelist_filetype == 'h5':
evt = self.from_h5(args)
elif args.framelist_filetype == 'csv':
evt = self.from_csv(args)
else:
raise ValueError("Unknown filetype.")
except FileNotFoundError:
self.log.error(f"File '{args.framelist_file}' not found.")
return
except (SourceNameError, KeyError):
self.log.error("Required keys are not found in data.")
return
except Exception as e:
self.log.error(f"Exception when reading the candidate list: {e}.")
return
# group by trains
trains, first, count = np.unique(evt.tid, return_index=True, return_counts=True)
# loop over sources
det = data.select(args.framelist_det_sources, "image.*")
for source_req in det.instrument_sources:
# resolve legacy source
src = data.legacy_sources.get(source_req, source_req)
self.log.info("select frames in " + src)
num_drop = 0
num_keep = 0
num_missed_frames = 0
num_missed_trains = 0
key = data[src, "image.pulseId"].drop_empty_trains()
for train_id, pulse_ids in key.trains():
num_frames = len(pulse_ids)
if args.framelist_keep_missed:
mask = np.ones(num_frames, bool)
else:
mask = np.zeros(num_frames, bool)
i = np.argmax(trains == train_id)
if trains[i] == train_id:
f0 = first[i]
fN = f0 + count[i]
classified = np.isin(pulse_ids, evt.pid[f0:fN])
available = np.isin(evt.pid[f0:fN], pulse_ids[classified])
mask[classified] = evt.cls[f0:fN][available] != 0
num_missed_frames += abs(num_frames - np.sum(classified))
else:
num_missed_trains += 1
reduced_num_frames = np.sum(mask)
num_keep += reduced_num_frames
num_drop += num_frames - reduced_num_frames
if src.endswith(':xtdf'):
self.select_xtdf(src, by_id[[train_id]], mask)
else:
self.select_entries(src, 'image', by_id[[train_id]], mask)
ratio = num_keep / (num_keep + num_drop)
self.log.info(f" - {num_keep} frames keep, {num_drop} frames drop, "
f"ratio: {ratio:.2%}")
self.log.info(f" - decision is missed for {num_missed_trains} trains, "
f"{num_missed_frames} frames")
Loading