Skip to content
Snippets Groups Projects

Refactor stacking for reuse and overlappability

Merged David Hammer requested to merge refactor-stacking into master
2 files
+ 159
143
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -69,7 +69,9 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher):
def initialization(self):
super().initialization()
self._shmem_handler = shmem_utils.ShmemCircularBufferReceiver()
self._stacking_friend = StackingFriend(self.get("merge"), self.get("sources"))
self._stacking_friend = StackingFriend(
self, self.get("merge"), self.get("sources")
)
self._frameselection_friend = FrameselectionFriend(self.get("frameSelector"))
self._thread_pool = concurrent.futures.ThreadPoolExecutor(
max_workers=self.get("processingThreads")
@@ -102,22 +104,21 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher):
def on_matched_data(self, train_id, sources):
frame_selection_mask = self._frameselection_friend.get_mask(sources)
# note: should not do stacking and frame selection for now!
self._stacking_friend.prepare_stacking_for_train(sources)
concurrent.futures.wait(
[
self._thread_pool.submit(
self._handle_source,
source,
data,
timestamp,
new_sources_map,
frame_selection_mask,
)
for source, (data, timestamp) in sources.items()
]
)
sources.update(new_sources_map)
with self._stacking_friend.stacking_context as stacker:
concurrent.futures.wait(
[
self._thread_pool.submit(
self._handle_source,
source,
data,
timestamp,
stacker,
frame_selection_mask,
)
for source, (data, timestamp) in sources.items()
]
)
sources.update(stacker.new_source_map)
# karabo output
if self.output is not None:
@@ -141,10 +142,9 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher):
source,
data_hash,
timestamp,
new_sources_map,
stacker,
frame_selection_mask,
ignore_stacking,
):
self._shmem_handler.dereference_shmem_handles(data_hash)
self._frameselection_friend.apply_mask(source, data_hash, frame_selection_mask)
self._stacking_friend.handle_source(...)
stacker.process(source, data_hash)
Loading