Skip to content
Snippets Groups Projects

Draft: Add simple assembler and manual geometry devices using extra_geom

Closed David Hammer requested to merge add-simple-assembler-geometry into add-agipd-device
1 file
+ 20
5
Compare changes
  • Side-by-side
  • Inline
+ 239
0
import pickle
import matplotlib.pyplot as plt
import numpy as np
from karabo.bound import (
DOUBLE_ELEMENT,
IMAGEDATA_ELEMENT,
INT32_ELEMENT,
KARABO_CLASSINFO,
NODE_ELEMENT,
OUTPUT_CHANNEL,
SLOT_ELEMENT,
TABLE_ELEMENT,
VECTOR_CHAR_ELEMENT,
VECTOR_STRING_ELEMENT,
Encoding,
Hash,
ImageData,
PythonDevice,
Schema,
State,
)
from karabo.common.api import KARABO_SCHEMA_DISPLAY_TYPE_SCENES as DT_SCENES
from matplotlib.backends.backend_agg import FigureCanvasAgg
from . import scenes
from ._version import version as deviceVersion
geometry_schema = Schema()
(
VECTOR_CHAR_ELEMENT(geometry_schema)
.key("pickledGeometry")
.displayedName("Pickled geometry")
.assignmentOptional()
.defaultValue([])
.commit()
)
preview_schema = Schema()
(IMAGEDATA_ELEMENT(preview_schema).key("overview").commit())
ModuleColumn = Schema()
(
DOUBLE_ELEMENT(ModuleColumn)
.key("posX")
.assignmentOptional()
.defaultValue(95)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(ModuleColumn)
.key("posY")
.assignmentOptional()
.defaultValue(564)
.reconfigurable()
.commit(),
INT32_ELEMENT(ModuleColumn)
.key("orientationX")
.assignmentOptional()
.defaultValue(-1)
.reconfigurable()
.commit(),
INT32_ELEMENT(ModuleColumn)
.key("orientationY")
.assignmentOptional()
.defaultValue(-1)
.reconfigurable()
.commit(),
)
@KARABO_CLASSINFO("ManualGeometryBase", deviceVersion)
class ManualGeometryBase(PythonDevice):
@staticmethod
def expectedParameters(expected):
# "mandatory" for geometry serving device
(
OUTPUT_CHANNEL(expected)
.key("geometryOutput")
.dataSchema(geometry_schema)
.commit(),
SLOT_ELEMENT(expected).key("pleaseSendYourGeometry").commit(),
OUTPUT_CHANNEL(expected)
.key("previewOutput")
.dataSchema(preview_schema)
.commit(),
IMAGEDATA_ELEMENT(expected).key("layoutPreview").commit(),
)
# scenes are fun
(
VECTOR_STRING_ELEMENT(expected)
.key("availableScenes")
.setSpecialDisplayType(DT_SCENES)
.readOnly()
.initialValue(["overview"])
.commit(),
)
def update_geom(self):
raise NotImplementedError()
def __init__(self, config):
super().__init__(config)
self.KARABO_SLOT(self.pleaseSendYourGeometry)
self.KARABO_SLOT(self.requestScene)
self.update_geom()
plt.switch_backend("agg")
self.registerInitialFunction(self._initialization)
def _initialization(self):
self.updateState(State.ON)
self.pleaseSendYourGeometry()
def requestScene(self, params):
payload = Hash()
scene_name = params.get("name", default="")
payload["name"] = scene_name
payload["success"] = True
if scene_name == "overview":
payload["data"] = scenes.manual_geometry_overview(
device_id=self.getInstanceId()
)
else:
payload["success"] = False
response = Hash()
response["type"] = "deviceScene"
response["origin"] = self.getInstanceId()
response["payload"] = payload
self.reply(response)
def pleaseSendYourGeometry(self):
self.update_geom()
self.writeChannel("geometryOutput", Hash("pickledGeometry", self.pickled))
axis = self.geom.inspect()
axis.figure.tight_layout(pad=0)
axis.figure.set_facecolor("none")
# axis.figure.set_size_inches(6, 6)
# axis.figure.set_dpi(300)
canvas = FigureCanvasAgg(axis.figure)
canvas.draw()
image_buffer = np.frombuffer(canvas.buffer_rgba(), dtype=np.uint8).reshape(
canvas.get_width_height()[::-1] + (4,)
)
self.set(
"layoutPreview",
ImageData(image_buffer, encoding=Encoding.RGBA, bitsPerPixel=3 * 8),
)
def preReconfigure(self, config):
self._prereconfigure_update_hash = config
def postReconfigure(self):
del self._prereconfigure_update_hash
@KARABO_CLASSINFO("ManualQuadrantsGeometryBase", deviceVersion)
class ManualQuadrantsGeometryBase(ManualGeometryBase):
@staticmethod
def expectedParameters(expected):
# note: subclasses should set better defaults
(NODE_ELEMENT(expected).key("quadrantCorners").commit(),)
for q in range(1, 5):
(
NODE_ELEMENT(expected).key(f"quadrantCorners.Q{q}").commit(),
DOUBLE_ELEMENT(expected)
.key(f"quadrantCorners.Q{q}.x")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
DOUBLE_ELEMENT(expected)
.key(f"quadrantCorners.Q{q}.y")
.assignmentOptional()
.defaultValue(0)
.reconfigurable()
.commit(),
)
def postReconfigure(self):
if any(
path.startswith("quadrantCorners")
for path in self._prereconfigure_update_hash.getPaths()
):
self.update_geom()
super().postReconfigure()
def update_geom(self):
self.quadrant_corners = tuple(
(self.get(f"quadrantCorners.Q{q}.x"), self.get(f"quadrantCorners.Q{q}.y"))
for q in range(1, 5)
)
self.geom = self.geometry_class.from_quad_positions(self.quadrant_corners)
self.pickled = pickle.dumps(self.geom)
# TODO: send to anyone who asks? make slot for that? send on connect?
self.writeChannel("geometryOutput", Hash("pickledGeometry", self.pickled))
@KARABO_CLASSINFO("ManualModulesGeometryBase", deviceVersion)
class ManualModulesGeometryBase(ManualGeometryBase):
@staticmethod
def expectedParameters(expected):
(
TABLE_ELEMENT(expected)
.key("modules")
.setColumns(ModuleColumn)
.assignmentOptional()
.defaultValue([])
.reconfigurable()
.commit(),
)
def postReconfigure(self):
if self._prereconfigure_update_hash.has("modules"):
self.update_geom()
super().postReconfigure()
def update_geom(self):
modules = self.get("modules")
module_pos = [(module.get("posX"), module.get("posY")) for module in modules]
orientations = [
(module.get("orientationX"), module.get("orientationY"))
for module in modules
]
self.geom = self.geometry_class.from_module_positions(
module_pos, orientations=orientations
)
self.pickled = pickle.dumps(self.geom)
# TODO: send to anyone who asks? make slot for that?
self.writeChannel("geometryOutput", Hash("pickledGeometry", self.pickled))
Loading