diff --git a/tests/test_scenes.py b/tests/test_scenes.py new file mode 100644 index 0000000000000000000000000000000000000000..8c15d5982682ee989f458dbdec83971b79c3175f --- /dev/null +++ b/tests/test_scenes.py @@ -0,0 +1,163 @@ +import lxml +from karabo import bound, native + +from calng import ( + CalibrationManager, + DetectorAssembler, + ShmemTrainMatcher, + corrections, + scenes, +) + + +# TODO: spawn device and call requestScene with defaultValues of availableScenes +krb = "{http://karabo.eu/scene}" +configurator = bound.Configurator("PythonDevice") + + +def get_class_schema(device_class): + # TODO: built-in proper way to get full class schema? + if hasattr(device_class, "getClassSchema"): + return device_class.getClassSchema().hash + else: + return configurator.getSchema(device_class).getParameterHash() + + +def attrs_to_dict(attrs): + return {attr.getKey(): attr.getValue() for attr in attrs} + + +# pretend to inject managed keys +def pretend_inject_keys(manager_schema, correction_schema): + for path in correction_schema.paths(): + attrs = attrs_to_dict(correction_schema.getAttributes(path)) + if "managed" in attrs.get("tags", []): + # TODO: check with actual manager, it doesn't just copy + manager_schema.setElement( + f"managedKeys.{path}", correction_schema[path], attrs + ) + + +device_schemata = { + "MOCK/CAL/CORR_00": get_class_schema(corrections.AgipdCorrection.AgipdCorrection), + "MOCK/CAL/MANAGER": get_class_schema(CalibrationManager.CalibrationManager), +} + +pretend_inject_keys( + device_schemata["MOCK/CAL/MANAGER"], device_schemata["MOCK/CAL/CORR_00"] +) + + +class DummyLog: + def INFO(self, text): + print(f"INFO: {text}") + + +def test_autogeneration_no_exceptions(): + for dev_class_name in configurator.getRegisteredClasses(): + if dev_class_name.startswith("Base") or dev_class_name in { + "PythonDevice", + "ZeroMQOutput", + }: + continue + print(dev_class_name) + + dev_class = configurator.baseRegistry[dev_class_name] + + class PretendDevice(dev_class): + def __init__(self, name): + self.config = get_class_schema(dev_class) + self.name = name + self.log = DummyLog() + + def __del__(self): + pass + + def getInstanceId(self): + return self.name + + def getFullSchema(self): + return self.config + + def reply(self, response): + self._response = response + + def generated_scene_data(self, scene_name): + self.requestScene(bound.Hash("name", scene_name)) + return self._response["payload.data"] + + dev = PretendDevice(f"SomeRandom{dev_class}") + for scene_name in attrs_to_dict( + dev.config.getAttributes("availableScenes") + ).get("defaultValue", []) + [""]: + dev.generated_scene_data(scene_name) + # should check that "" is same as "overview", but there are UUIDs + + +def check_keys_exist(tree): + for el in tree.findall(f".//*[@{krb}keys]"): + widget_class = el.attrib[f"{krb}class"] # could use for checking types + widget_keys = el.attrib[f"{krb}keys"] + for key in widget_keys.split(","): + device, _, property_name = key.partition(".") + if device not in device_schemata: + print( + f"Warning: no schema set for {device}, " + f"not checking key {key} of {widget_class}" + ) + continue + assert device_schemata[device].has(property_name) + + +def test_correction_overview(): + device_id = "MOCK/CAL/CORR_00" + svg_string = scenes.correction_device_overview( + device_id, + device_schemata[device_id], + ) + tree = lxml.etree.fromstring(svg_string) + check_keys_exist(tree) + + +def test_correction_constant_overrides(): + device_id = "MOCK/CAL/CORR_00" + svg_string = scenes.correction_device_constant_overrides( + device_id, + device_schemata[device_id], + ) + tree = lxml.etree.fromstring(svg_string) + check_keys_exist(tree) + + +def test_correction_preview(): + device_id = "MOCK/CAL/CORR_00" + svg_string = scenes.correction_device_preview( + device_id, + device_schemata[device_id], + preview_channel="preview.outputRaw", + ) + tree = lxml.etree.fromstring(svg_string) + check_keys_exist(tree) + + +def test_manager_overview(): + # TODO: canonical way to get device schemata + manager_id = "MOCK/CAL/MANAGER" + correction_ids = [f"MOCK/CAL/CORR_{i:02d}" for i in range(16)] + daq_ids = [f"MOCK/DET/DAQ{i}" for i in range(16)] + domain_ids = [ + "MOCK/CAL/CONDITION", + "MOCK/CAL/GEOMETRY", + "MOCK/CAL/MATCHER", + ] + # this currently fails: manager schema has - until it injects managed keys - no managedKeys.frameFilter + svg_string = scenes.manager_device_overview( + manager_id, + device_schemata[manager_id], + correction_ids, + device_schemata[correction_ids[0]], + daq_ids, + domain_ids, + ) + tree = lxml.etree.fromstring(svg_string) + check_keys_exist(tree)