Skip to content
Snippets Groups Projects
Commit 6d26c1e3 authored by David Hammer's avatar David Hammer
Browse files

Enable sending image data

parent 727d7fd8
No related branches found
No related tags found
2 merge requests!59Interface CrystFEL with Karabo and allow automatic parameter tunning with rcrystfel,!53Train picker arbiter kernel
......@@ -17,6 +17,7 @@ from karabo.bound import (
OVERWRITE_ELEMENT,
SLOT_ELEMENT,
STRING_ELEMENT,
UINT32_ELEMENT,
UINT64_ELEMENT,
VECTOR_STRING_ELEMENT,
Hash,
......@@ -28,12 +29,6 @@ from karabo.bound import (
from . import utils
# TODO: make this depend on geometry
MODULE_SIZE_SS = 256
MODULE_SIZE_FS = 256
NUM_MODULES = 16
def reparse_fix_comma_mess(l):
"""So, when you fill out a VECTOR_STRING_ELEMENT in KaraboGUI, you can just type
all the entries you want, separated by commas. Delightful! Except if some line
......@@ -92,6 +87,26 @@ class CrystfelRunner(PythonDevice):
.allowedStates(State.STARTED)
.commit(),
# TODO: get these from geometry
NODE_ELEMENT(expected)
.key("dataFormat")
.commit(),
UINT32_ELEMENT(expected)
.key("dataFormat.pixelsSS")
.assignmentMandatory()
.commit(),
UINT32_ELEMENT(expected)
.key("dataFormat.pixelsFS")
.assignmentMandatory()
.commit(),
UINT32_ELEMENT(expected)
.key("dataFormat.numModules")
.assignmentMandatory()
.commit(),
NODE_ELEMENT(expected)
.key("crystfelStats")
.commit(),
......@@ -343,50 +358,73 @@ class CrystfelRunner(PythonDevice):
)
return
peak_count = data_hash.get("peakfinding.numPeaks") # shape: (frame, module)
# TODO: probably rename to ss/ff all around
peak_ss = data_hash.get("peakfinding.peakX") # shape: (frame, module, max_peaks)
peak_fs = data_hash.get("peakfinding.peakY")
peak_intensity = data_hash.get("peakfinding.peakIntensity")
num_frames = peak_count.shape[0]
num_modules = peak_count.shape[1]
# slabbifying the ss coordinates
peak_ss = peak_ss + (
MODULE_SIZE_SS * np.arange(num_modules)[np.newaxis, :, np.newaxis]
)
if self.unsafe_get("crystfelArgs.doPeakfinding"):
# TODO: maybe support passing data even when there are already peaks
images = data_hash.get("image.data")
raise NotImplementedError("Not yet ready for this")
assert all(thing.shape[0] == num_frames for thing in (peak_ss, peak_fs, peak_intensity))
for frame_index in range(num_frames):
ss, fs, intensity = [], [], []
if np.sum(peak_count[frame_index]) < 10:
continue
for module_index in range(num_modules):
peak_count_local = peak_count[frame_index][module_index]
ss.append(peak_ss[frame_index][module_index][:peak_count_local])
fs.append(peak_fs[frame_index][module_index][:peak_count_local])
intensity.append(peak_intensity[frame_index][module_index][:peak_count_local])
payload = {
#"timestamp": time.time(), # TODO: decide on some timestamp to pass
# TODO: send image data if we want to
#"/entry_1/data_1/data": frame.reshape(16 * 256, 256),
self._geom_peak_path: {
# TODO: slabbify
"ss": list(itertools.chain.from_iterable(ss)),
"fs": list(itertools.chain.from_iterable(fs)),
"intensity": list(itertools.chain.from_iterable(intensity)),
},
"native_data_shape": (
MODULE_SIZE_SS,
MODULE_SIZE_FS,
), # TODO: figure out what this should be
}
self._crystfel_queue.put_nowait(msgpack.packb([payload]))
self.set(
"trainQueue.fullness",
len(self._crystfel_queue) / self._crystfel_queue.maxsize * 100
num_frames = images.shape[0]
num_modules = images.shape[1]
# TODO: apply mask to data
for frame_index in range(num_frames):
payload = {
#"timestamp": time.time(), # TODO: decide on some timestamp to pass
# TODO: send image data if we want to
#"/entry_1/data_1/data": frame.reshape(16 * 256, 256),
self._geom_data_path: images[frame_index].reshape(
self.unsafe_get("dataFormat.numModules") * self.unsafe_get("dataFormat.pixelsSS"),
self.unsafe_get("dataFormat.pixelsFS")
),
"native_data_shape": (
self.unsafe_get("dataFormat.pixelsSS"),
self.unsafe_get("dataFormat.pixelsFS"),
), # TODO: figure out what this should be
}
try:
self._crystfel_queue.put_nowait(msgpack.packb([payload]))
except queue.Full:
...
else:
peak_count = data_hash.get("peakfinding.numPeaks") # shape: (frame, module)
# TODO: probably rename to ss/ff all around
peak_ss = data_hash.get("peakfinding.peakX") # shape: (frame, module, max_peaks)
peak_fs = data_hash.get("peakfinding.peakY")
peak_intensity = data_hash.get("peakfinding.peakIntensity")
num_frames = peak_count.shape[0]
num_modules = peak_count.shape[1]
# slabbifying the ss coordinates
peak_ss = peak_ss + (
MODULE_SIZE_SS * np.arange(num_modules)[np.newaxis, :, np.newaxis]
)
assert all(thing.shape[0] == num_frames for thing in (peak_ss, peak_fs, peak_intensity))
for frame_index in range(num_frames):
ss, fs, intensity = [], [], []
# TODO: configurable
if np.sum(peak_count[frame_index]) < 10:
continue
for module_index in range(num_modules):
peak_count_local = peak_count[frame_index][module_index]
ss.append(peak_ss[frame_index][module_index][:peak_count_local])
fs.append(peak_fs[frame_index][module_index][:peak_count_local])
intensity.append(peak_intensity[frame_index][module_index][:peak_count_local])
payload = {
#"timestamp": time.time(), # TODO: decide on some timestamp to pass
# TODO: send image data if we want to
#"/entry_1/data_1/data": frame.reshape(16 * 256, 256),
self._geom_peak_path: {
# TODO: slabbify
"ss": list(itertools.chain.from_iterable(ss)),
"fs": list(itertools.chain.from_iterable(fs)),
"intensity": list(itertools.chain.from_iterable(intensity)),
},
"native_data_shape": (
MODULE_SIZE_SS,
MODULE_SIZE_FS,
), # TODO: figure out what this should be
}
self._crystfel_queue.put_nowait(msgpack.packb([payload]))
self.set(
"frameQueue.fullness",
self._crystfel_queue.qsize() / self._crystfel_queue.maxsize * 100
)
def _serve_to_crystfel(self):
def server():
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment