From 8ecaa1e25a50c519bf16d7879d3ba429afd3da5d Mon Sep 17 00:00:00 2001
From: David Hammer <dhammer@mailbox.org>
Date: Fri, 15 Oct 2021 14:53:09 +0200
Subject: [PATCH] Small fixes, add more CalCat details

---
 src/calng/AgipdCorrection.py    |   2 +-
 src/calng/agipd_gpu_kernels.cpp |   2 +-
 src/calng/base_correction.py    |  19 ++++++
 src/calng/calcat_utils.py       | 102 +++++++++++++++++++++++++-------
 src/calng/utils.py              |  12 +++-
 src/tests/test_calcat_utils.py  |   3 +
 6 files changed, 115 insertions(+), 25 deletions(-)

diff --git a/src/calng/AgipdCorrection.py b/src/calng/AgipdCorrection.py
index 32619701..7ff10526 100644
--- a/src/calng/AgipdCorrection.py
+++ b/src/calng/AgipdCorrection.py
@@ -362,7 +362,7 @@ class AgipdCorrection(BaseCorrection):
             self._override_md_additional_offset = self.get(
                 "corrections.relGainPc.mdAdditionalOffset"
             )
-            self.gpu_runner.md_additional_offset_gpu.override_md_additional_offset(
+            self.gpu_runner.override_md_additional_offset(
                 self._override_md_additional_offset
             )
         else:
diff --git a/src/calng/agipd_gpu_kernels.cpp b/src/calng/agipd_gpu_kernels.cpp
index 192b7193..7548ac40 100644
--- a/src/calng/agipd_gpu_kernels.cpp
+++ b/src/calng/agipd_gpu_kernels.cpp
@@ -75,7 +75,7 @@ extern "C" {
 		const size_t gm_map_stride_y = 3 * gm_map_stride_gain;
 		const size_t gm_map_stride_x = Y * gm_map_stride_y;
 		const size_t gm_map_stride_cell = X * gm_map_stride_x;
-		// TODO: handle multiple maps, multiple strides, multiple limits
+		// note: assuming all maps have same shape (in terms of cells / x / y)
 
 		const size_t map_cell = cell_table[cell];
 
diff --git a/src/calng/base_correction.py b/src/calng/base_correction.py
index a504ca9c..8b575485 100644
--- a/src/calng/base_correction.py
+++ b/src/calng/base_correction.py
@@ -2,6 +2,7 @@ import pathlib
 import threading
 import timeit
 
+import dateutil.parser
 import hashToSchema
 import numpy as np
 from karabo.bound import (
@@ -410,6 +411,24 @@ class BaseCorrection(PythonDevice):
             )
 
     def preReconfigure(self, config):
+        for ts_path in (
+            "constantParameters.deviceMappingSnapshotAt",
+            "constantParameters.constantVersionEventAt",
+        ):
+            if config.has(ts_path):
+                ts_string = config.get(ts_path)
+                if ts_string.strip() == "":
+                    config.set(ts_path, "")
+                else:
+                    try:
+                        timestamp = dateutil.parser.isoparse(ts_string)
+                    except ValueError as error:
+                        self.log_status_warn(f"Failed to parse {ts_path}; {error}")
+                        config.erase(ts_path)
+                    else:
+                        config.set(ts_path, timestamp.isoformat())
+        if config.has("constantParameters.deviceMappingSnapshotAt"):
+            self.calcat_friend.flush_pdu_mapping()
         self._prereconfigure_update_hash = config
 
     def postReconfigure(self):
diff --git a/src/calng/calcat_utils.py b/src/calng/calcat_utils.py
index f7453ff3..9088f058 100644
--- a/src/calng/calcat_utils.py
+++ b/src/calng/calcat_utils.py
@@ -74,7 +74,7 @@ def _add_status_schema_from_enum(schema, prefix, enum_class):
             .initialValue(False)
             .commit(),
             STRING_ELEMENT(schema)
-            .key(f"{constant_node}.createdAt")
+            .key(f"{constant_node}.validFrom")
             .readOnly()
             .initialValue("")
             .commit(),
@@ -182,6 +182,31 @@ class BaseCalcatFriend:
         # Parameters which any detector would probably have (extend this in subclass)
         # TODO: probably switch to floating point for everything, including mem cells
         (
+            STRING_ELEMENT(schema)
+            .key(f"{param_prefix}.deviceMappingSnapshotAt")
+            .displayedName("Snapshot timestamp (for device mapping)")
+            .description(
+                "CalCat supports querying with a specific snapshot of the database. "
+                "When playing back a run from the file system, this feature is useful "
+                "to look up the device mapping at the time of the run. If this field "
+                "is left empty, the latest device mapping is used. Date format should "
+                "be 'YYYY-MM-DD' with optional time of day starting with 'T' followed "
+                "by 'hh:mm:ss.mil+02:00'."
+            )
+            .assignmentOptional()
+            .defaultValue("")
+            .reconfigurable()
+            .commit(),
+            STRING_ELEMENT(schema)
+            .key(f"{param_prefix}.constantVersionEventAt")
+            .displayedName("TODO")
+            .description(
+                "TODO"
+            )
+            .assignmentOptional()
+            .defaultValue("")
+            .reconfigurable()
+            .commit(),
             STRING_ELEMENT(schema)
             .key(f"{param_prefix}.detectorType")
             .displayedName("Detector type name")
@@ -192,15 +217,30 @@ class BaseCalcatFriend:
             .initialValue(detector_type)
             .commit(),
             STRING_ELEMENT(schema)
+            .key(f"{param_prefix}.detectorTypeId")
+            .readOnly()
+            .initialValue("")
+            .commit(),
+            STRING_ELEMENT(schema)
             .key(f"{param_prefix}.detectorName")
             .assignmentOptional()
             .defaultValue("")
             .commit(),
             STRING_ELEMENT(schema)
+            .key(f"{param_prefix}.detectorId")
+            .readOnly()
+            .initialValue("")
+            .commit(),
+            STRING_ELEMENT(schema)
             .key(f"{param_prefix}.karaboDa")
             .assignmentOptional()
             .defaultValue("")
             .commit(),
+            STRING_ELEMENT(schema)
+            .key(f"{param_prefix}.moduleId")
+            .readOnly()
+            .initialValue("")
+            .commit(),
             UINT32_ELEMENT(schema)
             .key(f"{param_prefix}.memoryCells")
             .assignmentOptional()
@@ -224,6 +264,8 @@ class BaseCalcatFriend:
             .reconfigurable()
             .commit(),
         )
+        managed_keys.append(f"{param_prefix}.deviceMappingSnapshotAt")
+        managed_keys.append(f"{param_prefix}.constantVersionEventAt")
         managed_keys.append(f"{param_prefix}.memoryCells")
         managed_keys.append(f"{param_prefix}.pixelsX")
         managed_keys.append(f"{param_prefix}.pixelsY")
@@ -242,7 +284,7 @@ class BaseCalcatFriend:
         self.cached_constants = {}
 
         if not secrets_fn.is_file():
-            self.device.log.WARN(f"Missing CalCat secrets file (expected {secrets_fn})")
+            self.device.log_status_warn(f"Missing CalCat secrets file (expected {secrets_fn})")
         with secrets_fn.open("r") as fd:
             calcat_secrets = json.load(fd)
 
@@ -263,34 +305,45 @@ class BaseCalcatFriend:
             scope="public",
             session_token=None,
         )
-        self.device.log.INFO("CalCat connection established")
+        self.device.log_status_info("CalCat connection established")
 
     def _get_param(self, key):
         """Helper to get value from attached device schema"""
         return self.device.get(f"{self.param_prefix}.{key}")
 
+    def _set_param(self, key, value):
+        self.device.set(f"{self.param_prefix}.{key}", value)
+
+    def _get_status(self, constant, key):
+        return self.device.get(f"{self.status_prefix}.{constant.name}.{key}", value)
+
     def _set_status(self, constant, key, value):
         """Helper to update information about found constants on device"""
         self.device.set(f"{self.status_prefix}.{constant.name}.{key}", value)
 
+
     @functools.cached_property
     def detector_id(self):
         resp = Detector.get_by_identifier(self.client, self._get_param("detectorName"))
         _check_resp(resp, DetectorNotFound)
-        return resp["data"]["id"]
+        res = resp["data"]["id"]
+        self._set_param("detectorId", str(res))
+        return res
 
     @functools.cached_property
     def detector_type_id(self):
         resp = DetectorType.get_by_name(self.client, self._get_param("detectorType"))
         _check_resp(resp, DetectorNotFound)
-        return resp["data"]["id"]
+        res = resp["data"]["id"]
+        self._set_param("detectorTypeId", str(res))
+        return res
 
-    # TODO: support updating mapping maybe?
-    # (just means del self.pdus and the properties depending on it)
+    # TODO: support updating mapping (means del self.pdus and properties using it)
+    # TODO: support snapshot
     @functools.cached_property
     def pdus(self):
-        resp = PhysicalDetectorUnit.get_all_by_detector_id(
-            self.client, self.detector_id
+        resp = PhysicalDetectorUnit.get_all_by_detector(
+            self.client, self.detector_id, self._get_param("deviceMappingSnapshotAt")
         )
         _check_resp(resp)
         for irrelevant_key in ("detector", "detector_type", "flg_available"):
@@ -306,10 +359,16 @@ class BaseCalcatFriend:
     def _karabo_da_to_id(self):
         return {pdu["karabo_da"]: pdu["id"] for pdu in self.pdus}
 
+    def flush_pdu_mapping(self):
+        for attr in ("pdus", "_karabo_da_to_float_uuid", "_karabo_da_to_id"):
+            if hasattr(self, attr):
+                delattr(self, attr)
+        self._set_param("moduleId", "")
+
     @utils.threadsafe_cache
     def calibration_id(self, calibration_name: str):
         resp = Calibration.get_by_name(self.client, calibration_name)
-        # TODO: dump name in exception
+        # TODO: include calibration name in exception
         _check_resp(resp, CalibrationNotFound)
         return resp["data"]["id"]
 
@@ -318,7 +377,7 @@ class BaseCalcatFriend:
         # modifying condition parameter messes with cache
         condition_with_detector = copy.copy(condition)
         condition_with_detector["Detector UUID"] = pdu
-        self.device.log.INFO(f"Look for condition: {condition_with_detector}")
+        self.device.log.DEBUG(f"Look for condition: {condition_with_detector}")
         resp = self.client.search_possible_conditions_from_dict(
             "", condition_with_detector.encode()
         )
@@ -338,12 +397,14 @@ class BaseCalcatFriend:
 
     def get_constant_version(self, constant, snapshot_at=None):
         # TODO: support snapshot
+        # TODO: support creation time
         # TODO: catch exceptions, give warnings appropriately
         karabo_da = self._get_param("karaboDa")
-        self.device.log.DEBUG(f"Will look for {constant} for {karabo_da}")
+        self.device.log_status_info(f"Attempting to find {constant} for {karabo_da}")
 
         if karabo_da not in self._karabo_da_to_float_uuid:
             raise ModuleNotFound(f"Module map did not include {karabo_da}")
+        self._set_param("moduleId", str(self._karabo_da_to_id[karabo_da]))
 
         if isinstance(constant, str):
             constant = self._constant_enum_class[constant]
@@ -370,10 +431,9 @@ class BaseCalcatFriend:
             snapshot_at=None,
         )
         _check_resp(resp)
-        timestamp = resp["data"][
-            "begin_at"
-        ]  # TODO: check which key we like (also has begin_validity_at)
-        self._set_status(constant, "createdAt", timestamp)
+        # TODO: replace with start date and end date
+        timestamp = resp["data"]["begin_validity_at"]
+        self._set_status(constant, "validFrom", timestamp)
         self._set_status(constant, "constantVersionId", resp["data"]["id"])
 
         file_path = (
@@ -403,7 +463,7 @@ class BaseCalcatFriend:
         with h5py.File(file_path, "r") as fd:
             constant_data = np.array(fd[resp["data"]["data_set_name"]]["data"])
         self.cached_constants[constant] = constant_data
-        self._set_status(constant, "createdAt", resp["data"]["begin_at"])
+        self._set_status(constant, "validFrom", resp["data"]["begin_at"])
         self._set_status(constant, "calibrationId", "manual override")
         self._set_status(constant, "conditionId", "manual override")
         self._set_status(constant, "constantId", "manual override")
@@ -414,10 +474,9 @@ class BaseCalcatFriend:
     def get_constant_version_and_call_me_back(
         self, constant, callback, snapshot_at=None
     ):
-        """WIP. Callback function will get constant name, constant data, and hopefully
-        soon also some metadata or whatever.
-        """
+        """Runs get_constant_version in thread, will call callback on completion"""
         # TODO: do we want to use asyncio / "modern" async?
+        # TODO: consider moving out of this class, closer to correction device
         def aux():
             data = self.get_constant_version(constant, snapshot_at)
             callback(constant, data)
@@ -441,7 +500,7 @@ class BaseCalcatFriend:
 
     def flush_constants(self):
         for constant in self._constant_enum_class:
-            self._set_status(constant, "createdAt", "")
+            self._set_status(constant, "validFrom", "")
             self._set_status(constant, "found", False)
 
 
@@ -538,6 +597,7 @@ class AgipdCalcatFriend(BaseCalcatFriend):
         res["Source Energy"] = self._get_param("photonEnergy")
         res["Acquisition rate"] = self._get_param("acquisitionRate")
         res["Gain Setting"] = self._get_param("gainSetting")
+        # TODO: integration time (omit if 12)
         return res
 
 
diff --git a/src/calng/utils.py b/src/calng/utils.py
index 0acf8cfd..762247dc 100644
--- a/src/calng/utils.py
+++ b/src/calng/utils.py
@@ -102,11 +102,19 @@ def shape_after_transpose(input_shape, transpose_pattern, squeeze=True):
 class RepeatingTimer:
     """A timer which will call callback every interval seconds"""
 
-    def __init__(self, interval, callback, timer=timeit.default_timer, start_now=True):
+    def __init__(
+        self,
+        interval,
+        callback,
+        timer=timeit.default_timer,
+        start_now=True,
+        daemon=True,
+    ):
         self.timer = timer
         self.stopped = True
         self.interval = interval
         self.callback = callback
+        self.daemonize = daemon
         if start_now:
             self.start()
 
@@ -126,7 +134,7 @@ class RepeatingTimer:
                 self.callback()
                 self.wakeup_time = self.timer() + self.interval
 
-        self.thread = threading.Thread(target=runner)
+        self.thread = threading.Thread(target=runner, daemon=self.daemonize)
         self.thread.start()
 
     def stop(self):
diff --git a/src/tests/test_calcat_utils.py b/src/tests/test_calcat_utils.py
index 5c465702..537bddd9 100644
--- a/src/tests/test_calcat_utils.py
+++ b/src/tests/test_calcat_utils.py
@@ -37,6 +37,9 @@ class DummyAgipdDevice:
     device_class_schema = Schema()
     managed_keys = []
 
+    def log_status_info(self, msg):
+        print(msg)
+
     @staticmethod
     def expectedParameters(expected):
         calcat_utils.AgipdCalcatFriend.add_schema(
-- 
GitLab