Skip to content
Snippets Groups Projects

Refactor stacking for reuse and overlappability

Merged David Hammer requested to merge refactor-stacking into master
Files
2
@@ -103,22 +103,19 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher):
def on_matched_data(self, train_id, sources):
frame_selection_mask = self._frameselection_friend.get_mask(sources)
# note: should not do stacking and frame selection for now!
with self._stacking_friend.stacking_context as stacker:
concurrent.futures.wait(
[
self._thread_pool.submit(
self._handle_source,
source,
data,
timestamp,
stacker,
frame_selection_mask,
)
for source, (data, timestamp) in sources.items()
]
)
sources.update(stacker.new_source_map)
concurrent.futures.wait(
[
self._thread_pool.submit(
self._handle_source,
source,
data,
timestamp,
frame_selection_mask,
)
for source, (data, timestamp) in sources.items()
]
)
self._stacking_friend.process(sources, self._thread_pool)
# karabo output
if self.output is not None:
@@ -142,9 +139,7 @@ class ShmemTrainMatcher(TrainMatcher.TrainMatcher):
source,
data_hash,
timestamp,
stacker,
frame_selection_mask,
):
self._shmem_handler.dereference_shmem_handles(data_hash)
self._frameselection_friend.apply_mask(source, data_hash, frame_selection_mask)
stacker.process(source, data_hash)
Loading