Skip to content
Snippets Groups Projects
Commit 5ba47ce0 authored by David Hammer's avatar David Hammer
Browse files

Add metadata for control sources, too

parent 9251066c
No related branches found
No related tags found
No related merge requests found
......@@ -42,15 +42,6 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
new_meta = {}
new_data = {}
# additional unstacked sources
if data.has("unstacked"):
for source in data.get("unstacked").getKeys():
forward, ignore = self._filter_properties(data, source)
new_data[source] = conversion.hash_to_dict(
data.get(f"unstacked.{source}"), paths=forward, version=self.version
)
data.erase("unstacked")
# stacked data from ModuleStacker
for source, handle_string in zip(data.get("sources"), data.get("image.data")):
if source not in self.sources:
......@@ -105,6 +96,16 @@ class ShmemToZMQ(PipeToZeroMQ.PipeToZeroMQ):
new_data[source] = (dic, arr)
# note: assumes version "2.2" of the PipeToZeroMQ packing format
# additional unstacked sources
if data.has("unstacked"):
for source in data.get("unstacked").getKeys():
forward, ignore = self._filter_properties(data, source)
new_data[source] = conversion.hash_to_dict(
data.get(f"unstacked.{source}"), paths=forward, version=self.version
)
new_meta[source] = conversion.meta_to_dict(metadata)
data.erase("unstacked")
# forward data to all connected ZMQ sockets
self._send(new_data, new_meta)
......
  • Owner

    I think I realize now what happened. I'm sorry for not taking not earlier, this is almost definitely a consequence of starting out with the ModuleMatcher.

    The original TrainMatcher takes any number of pipeline and control sources, matches their data and sends it over its output pipeline. However, they are not sent as one hash, but each source is its own hash complete with metadata. Basically, you can write any number of data packets before .update() on a channel. On the receiver side, all packets are called an input, and there's callback either for the whole input event or each individual data packet. In most cases however, these two are identical with one data packet per input.

    The bridge now takes an input event and puts all data packets in it into one output for the connected clients, i.e. the multiple hashes are turned into one dictionary.

    We should try to use this protocol in order to be compatible on each side to the vanilla device. This means that the ModuleStacker stacks the module into one hash, but continues sending any additional non-module pipeline or control data as additional hashes.

    Then, on the bridge side, we do the very same but simply add the support for shared memory handles.

  • Owner
  • Author Owner

    The current state of things is definitely a result of me not understanding the typical protocol well enough and quickly trying to tack on slow data. I actually hadn't thought of control sources (and we didn't have them in testing) until the users started asking for it. I'd hoped we'd have time to make redundant and kill off the ModuleStacker before redeployment:

    • For preview purposes, a ModuleMatcher should suffice (optionally with ToF and configurable stacking path)
      • Maybe a future assembler device will not need pre-stacked data, in which case we even drop the ModuleMatcher and use a vanilla TrainMatcher
    • For piping out to ZMQ, the bridge - as you mention - should multiple hashes anyway, so gains nothing from stacking
      • ShmemToZMQ should just be regular bridge replacing the shmem handle under image.data with actual data (I think we already said something like this in planning, but I have not realized it yet)

    Incidentally, your explanation also includes the answer to a TODO I left for myself at one point: why would one use KARABO_ON_INPUT instead of KARABO_ON_DATA (I'm using the latter, will reconsider).

    Edited by David Hammer
  • Author Owner

    Update: just tried it out, going back to almost copy of onInput from PipeToZeroMQ (includes code duplication because no good place to hook), one can throw in a TrainMatcher (achtung: has to be started explicitly after instantiation) and not worry about it so much. https://git.xfel.eu/gitlab/karaboDevices/calng/-/commit/0a2f327760576f323863df5dfcccbb09a10cd70c Part of glimpse from test:

    Source 3: 'SQS_DET_DSSC1M-1/DET/2CH0:xtdf' @ 1140040645
    timestamp: 2021-08-25 20:20:47 (1629915647.257587) | delay: 969.00 ms
    data:
     - [ndarray] detector.data, uint8, (416,)
     - [int] detector.trainId, 1140040645
     - [int] header.dataId, 1
     - [int] header.linkId, 1628765276958789561
     - [ndarray] header.magicNumberBegin, int8, (8,)
     - [int] header.majorTrainFormatVersion, 2
     - [int] header.minorTrainFormatVersion, 1
     - [int] header.pulseCount, 400
     - [ndarray] header.reserved, uint8, (16,)
     - [int] header.trainId, 1140040645
     - [ndarray] image.cellId, uint16, (246, 1)
     - [ndarray] image.data, float16, (512, 128, 246)
     - [ndarray] image.length, uint32, (400, 1)
     - [list] image.passport, []
     - [ndarray] image.pulseId, uint16, (246, 1)
     - [ndarray] image.status, uint16, (400, 1)
     - [ndarray] image.trainId, uint64, (400, 1)
     - [str] metadata.source, SQS_DET_DSSC1M-1/DET/2CH0:xtdf
     - [int] metadata.timestamp.tid, 1140040645
     - [ndarray] trailer.checksum, int8, (16,)
     - [ndarray] trailer.magicNumberEnd, int8, (8,)
     - [int] trailer.status, 0
     - [int] trailer.trainId, 1140040645
    metadata:
     - [list] ignored_keys, []
     - [str] source, SQS_DET_DSSC1M-1/DET/2CH0:xtdf
     - [float] timestamp, 1629915647.257587
     - [str] timestamp.frac, 257587673196396544
     - [str] timestamp.sec, 1629915647
     - [int] timestamp.tid, 1140040645
    
    Source 4: 'SQS_DET_DSSC1M-1/DET/3CH0:xtdf' @ 1140040645
    timestamp: 2021-08-25 20:20:48 (1629915648.040651) | delay: 185.94 ms
    data:
     - [ndarray] detector.data, uint8, (416,)
     - [int] detector.trainId, 1140040645
     - [int] header.dataId, 1
     - [int] header.linkId, 1628765276955711119
     - [ndarray] header.magicNumberBegin, int8, (8,)
     - [int] header.majorTrainFormatVersion, 2
     - [int] header.minorTrainFormatVersion, 1
     - [int] header.pulseCount, 400
     - [ndarray] header.reserved, uint8, (16,)
     - [int] header.trainId, 1140040645
     - [ndarray] image.cellId, uint16, (242, 1)
     - [ndarray] image.data, float16, (512, 128, 242)
     - [ndarray] image.length, uint32, (400, 1)
     - [list] image.passport, []
     - [ndarray] image.pulseId, uint16, (242, 1)
     - [ndarray] image.status, uint16, (400, 1)
     - [ndarray] image.trainId, uint64, (400, 1)
     - [str] metadata.source, SQS_DET_DSSC1M-1/DET/3CH0:xtdf
     - [int] metadata.timestamp.tid, 1140040645
     - [ndarray] trailer.checksum, int8, (16,)
     - [ndarray] trailer.magicNumberEnd, int8, (8,)
     - [int] trailer.status, 0
     - [int] trailer.trainId, 1140040645
    metadata:
     - [list] ignored_keys, []
     - [str] source, SQS_DET_DSSC1M-1/DET/3CH0:xtdf
     - [float] timestamp, 1629915648.040651
     - [str] timestamp.frac, 040651608161255424
     - [str] timestamp.sec, 1629915648
     - [int] timestamp.tid, 1140040645
    
    Source 5: 'runtopipe_00' @ 1140040645
    timestamp: 2021-08-25 20:20:48 (1629915648.124762) | delay: 101.83 ms
    data:
     - [int] dataSent.timestamp.tid, 0
     - [float] dataSent.timestamp.timestamp, 1629915648.113888
     - [int] dataSent.value, 2581
    metadata:
     - [list] ignored_keys, []
     - [str] source, runtopipe_00
     - [float] timestamp, 1629915648.124762
     - [str] timestamp.frac, 124762487000000000
     - [str] timestamp.sec, 1629915648
     - [int] timestamp.tid, 1140040645
  • David Hammer @hammerd

    mentioned in commit 78bf590c

    ·

    mentioned in commit 78bf590c

    Toggle commit list
  • Owner

    Oh, I honestly thought at some point we want to stack the module data for the bridge, but only now I realize it's been separated back into sources there.

    So as I understand it, if we want each module to be a separate source in the bridge stream alongside non-module sources, we do not need any custom TrainMatcher anyway? Then we only need to figure out start/stop the TrainMatcher/Bridge from the manager and improve on the sources configuration.

  • Author Owner

    I think I may have thought so at some point, too. But I believe that the only place where np.stacking is currently needed is for the FemDataAssembler (expecting the "combiner format").

    So yes, for bridge output to look like it currently does, we can use a regular TrainMatcher (in test project, I did just this plus the one change in the manager to tell ShmemToZMQ which paths to dereference). I put the changes needed for ShmemToZMQ in a separate branch as it's not worth it to interrupt the currently running pipeline for aesthetics.

  • David Hammer @hammerd

    mentioned in commit 3566342d

    ·

    mentioned in commit 3566342d

    Toggle commit list
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment