From 03f73a1a14d4be23ee3b7c767700017fdbecbcdd Mon Sep 17 00:00:00 2001
From: ahmedk <>
Date: Thu, 23 Feb 2023 11:54:32 +0100
Subject: [PATCH] put back functions in agiplib and fix tests

 .../AGIPD/AGIPD_Correct_and_Verify.ipynb      | 130 +++----------
 src/cal_tools/                     | 175 ++++++++++++++++++
 tests/                       |  44 ++---
 3 files changed, 214 insertions(+), 135 deletions(-)

diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
index 62646f280..aa8611fe0 100644
--- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
+++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
@@ -163,10 +163,6 @@
     "    LitFrameSelection,\n",
     "from cal_tools.ana_tools import get_range\n",
-    "from cal_tools.calcat_interface import (\n",
-    "    AGIPD_CalibrationData,\n",
-    "    CalCatError,\n",
-    ")\n",
     "from cal_tools.enums import AgipdGainMode, BadPixels\n",
     "from cal_tools.step_timing import StepTimer\n",
     "from import (\n",
@@ -546,57 +542,6 @@
     "# NOTE: this notebook will not overwrite calibration metadata file\n",
     "const_yaml = metadata.get(\"retrieved-constants\", {})\n",
-    "dark_constants = {\"Offset\", \"Noise\", \"BadPixelsDark\"}\n",
-    "if not gain_mode:\n",
-    "    dark_constants.add(\"ThresholdsDark\")\n",
-    "\n",
-    "gain_constants = set()\n",
-    "if any(agipd_corr.pc_bools):\n",
-    "    gain_constants |=  {\"SlopesPC\", \"BadPixelsPC\"}\n",
-    "if agipd_corr.corr_bools.get(\"xray_corr\"):\n",
-    "    gain_constants |= {\"SlopesFF\", \"BadPixelsFF\"}\n",
-    "\n",
-    "all_constants = dark_constants.copy()\n",
-    "all_constants |= gain_constants\n",
-    "\n",
-    "print(f'Preparing constants (FF: {agipd_corr.corr_bools.get(\"xray_corr\", False)}, PC: {any(agipd_corr.pc_bools)}, '\n",
-    "      f'BLC: {any(agipd_corr.blc_bools)})')"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "def check_constants_availability(\n",
-    "    const_data: dict, karabo_da: str, \n",
-    ") -> bool :  \n",
-    "    \"\"\"\n",
-    "    Validate the constants availability and raise/warn correspondingly.\n",
-    "    \"\"\"\n",
-    "    missing_dark_constants = dark_constants - set(const_data)\n",
-    "    missing_gain_constants = gain_constants - set(const_data)\n",
-    "\n",
-    "    if missing_gain_constants:\n",
-    "        warning(\n",
-    "            f\"Gain constants {missing_gain_constants} are\"\n",
-    "            f\" not retrieved for {karabo_da}\")\n",
-    "\n",
-    "    if missing_dark_constants:\n",
-    "        warning(f\"Dark constants {missing_dark_constants} are not available\"\n",
-    "                f\" to correct {karabo_da}\")\n",
-    "        return False\n",
-    "    else:\n",
-    "        return True"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "metadata": {},
-   "outputs": [],
-   "source": [
     "def retrieve_constants(mod):\n",
     "    \"\"\"\n",
     "    Retrieve calibration constants and load them to shared memory\n",
@@ -604,66 +549,32 @@
     "    Metadata for constants is taken from yml file or retrieved from the DB\n",
     "    \"\"\"\n",
     "    k_da = module_index_to_karabo_da[mod]\n",
-    "    # string of the device name.\n",
-    "    cons_data = dict()\n",
-    "    when = dict()\n",
-    "    variant = dict()\n",
+    "\n",
     "    if k_da in const_yaml:\n",
-    "        db_module = const_yaml[k_da][\"physical-name\"]\n",
-    "        for cname, mdata in const_yaml[k_da][\"constants\"].items():\n",
-    "            base_key = f\"{db_module}/{cname}/0\"\n",
-    "            when[cname] = mdata[\"creation-time\"]\n",
-    "            if when[cname]:\n",
-    "                with h5py.File(mdata[\"path\"], \"r\") as cf:\n",
-    "                    cons_data[cname] = cf[f\"{base_key}/data\"][()]\n",
-    "                    # Set variant to 0 if the attribute is missing\n",
-    "                    # as for old constants.\n",
-    "                    if \"variant\" in cf[base_key].attrs.keys():\n",
-    "                        variant[cname] = cf[base_key].attrs[\"variant\"]\n",
-    "                    else:\n",
-    "                        variant[cname] = 0\n",
+    "        when = agipd_corr.initialize_from_yaml(k_da, const_yaml, mod)\n",
+    "        print(f\"Found constants for {k_da} in calibration_metadata.yml\")\n",
     "    else:  # Contact the database for module's constants.\n",
-    "        agipd_cal = AGIPD_CalibrationData(\n",
-    "            detector_name=karabo_id,\n",
-    "            modules=[k_da],\n",
-    "            sensor_bias_voltage=bias_voltage,\n",
-    "            memory_cells=memory_cells,\n",
-    "            acquisition_rate=acquisition_rate,\n",
+    "        when = agipd_corr.initialize_from_db(\n",
+    "            karabo_id=karabo_id,\n",
+    "            karabo_da=k_da,\n",
+    "            creation_time=creation_time,\n",
+    "            memory_cells=mem_cells_db,\n",
+    "            bias_voltage=bias_voltage,\n",
+    "            photon_energy=9.2,\n",
+    "            gain_setting=gain_setting,\n",
+    "            acquisition_rate=acq_rate,\n",
     "            integration_time=integration_time,\n",
-    "            source_energy=photon_energy,\n",
     "            gain_mode=gain_mode,\n",
-    "            gain_setting=gain_setting,\n",
-    "            event_at=creation_time,\n",
-    "            client=client,\n",
+    "            module_idx=mod,\n",
+    "            client=rest_cfg.calibration_client(),\n",
     "        )\n",
-    "\n",
-    "        for cname in all_constants:\n",
-    "            if cname in gain_constants:\n",
-    "                agipd_cal.gain_mode = None\n",
-    "            else:\n",
-    "                agipd_cal.gain_mode = gain_mode\n",
-    "            try:\n",
-    "                agipd_md = agipd_cal.metadata([cname])[k_da]\n",
-    "            except CalCatError as e:\n",
-    "                pass  # a validation of missing constants is done later.\n",
-    "        for cname in all_constants:\n",
-    "            if agipd_md.get(cname):\n",
-    "                when[cname] = agipd_md[cname][\"begin_validity_at\"]\n",
-    "                dataset = agipd_md[cname][\"dataset\"]\n",
-    "                with h5py.File(agipd_cal.caldb_root / agipd_md[cname][\"path\"], \"r\") as cf:  # noqa\n",
-    "                    cons_data[cname] = np.copy(cf[f\"{dataset}/data\"])\n",
-    "                    variant[cname] = cf[dataset].attrs[\"variant\"] if cf[dataset].attrs.keys() else 0  # noqa\n",
     "        print(f\"Queried CalCat for {k_da}\")\n",
+    "    # `when` is expected to be None if dark constants are not available\n",
+    "    # and correction should be skipped for this module.\n",
+    "    return mod, when, k_da\n",
-    "    mod_const_validated = check_constants_availability(cons_data, k_da)\n",
-    "\n",
-    "    if mod_const_validated:\n",
-    "        agipd_corr.init_constants(cons_data, when, mod, variant)\n",
-    "        return mod, when, k_da\n",
-    "    else:\n",
-    "        print(\"Missing dark constants, hence no correction for this module.\")\n",
-    "        return None, None, k_da\n",
-    "\n",
+    "print(f'Preparing constants (FF: {agipd_corr.corr_bools.get(\"xray_corr\", False)}, PC: {any(agipd_corr.pc_bools)}, '\n",
+    "      f'BLC: {any(agipd_corr.blc_bools)})')\n",
     "with multiprocessing.Pool(processes=len(modules)) as pool:\n",
     "    const_out =, modules)\n",
@@ -683,8 +594,9 @@
     "for i, (modno, when, k_da) in enumerate(const_out):\n",
-    "    if modno is None:\n",
+    "    if when is None:\n",
     "        karabo_da.remove(k_da)\n",
+    "        warning(f\"Skip correcting {k_da} because of missing dark constants.\")\n",
     "        continue\n",
     "    qm = module_index_to_qm(modno)\n",
diff --git a/src/cal_tools/ b/src/cal_tools/
index 66a5c2613..576b07559 100644
--- a/src/cal_tools/
+++ b/src/cal_tools/
@@ -478,6 +478,20 @@ class AgipdCorrections:
+        self.dark_constants = {"Offset", "Noise", "BadPixelsDark"}
+        if not self.gain_mode:
+            self.dark_constants.add("ThresholdsDark")
+        self.gain_constants = set()
+        if any(self.pc_bools):
+            self.gain_constants |=  {"SlopesPC", "BadPixelsPC"}
+        if self.corr_bools.get("xray_corr"):
+            self.gain_constants |= {"SlopesFF", "BadPixelsFF"}
+        self.all_constants = self.dark_constants.copy()
+        self.all_constants |= self.gain_constants
     def read_file(self, i_proc: int, file_name: str,
                   apply_sel_pulses: Optional[bool] = True
                   ) -> int:
@@ -1337,6 +1351,167 @@ class AgipdCorrections:
+    def validate_constants_availability(
+        self, const_data: dict, karabo_da: str, 
+    ) -> bool :  
+        """
+        Validate the constants availability and raise/warn correspondingly.
+        Validation is False, if dark constants are not retrieved.
+        """
+        missing_dark_constants = self.dark_constants - set(const_data)
+        missing_gain_constants = self.gain_constants - set(const_data)
+        if missing_gain_constants:
+            warning(
+                f"Gain constants {missing_gain_constants} are"
+                f" not retrieved for {karabo_da}")
+        if missing_dark_constants:
+            warning(f"Dark constants {missing_dark_constants} are not available"
+                    f" to correct {karabo_da}")
+            return False
+        else:
+            return True
+    def initialize_from_yaml(
+        self, karabo_da: str, const_yaml: Dict[str, Any], module_idx: int,
+    ) -> Dict[str, Any]:
+        """Initialize calibration constants from a yaml file
+        :param karabo_da: a karabo data aggregator
+        :param const_yaml: from the "retrieved-constants" part of a yaml file
+        from pre-notebook, which consists of metadata of either the constant
+        file path or the empty constant shape, and the creation-time of the
+        retrieved constants
+        :param module_idx: Index of module
+        :return when: dict of retrieved constants with their creation-time
+        """
+        cons_data = dict()
+        when = dict()
+        variant = dict()
+        # string of the device name.
+        db_module = const_yaml[karabo_da]["physical-name"]
+        for cname, mdata in const_yaml[karabo_da]["constants"].items():
+            base_key = f"{db_module}/{cname}/0"
+            when[cname] = mdata["creation-time"]
+            if when[cname]:
+                with h5py.File(mdata["path"], "r") as cf:
+                    cons_data[cname] = cf[f"{base_key}/data"][()]
+                    # Set variant to 0 if the attribute is missing
+                    # as for old constants.
+                    if "variant" in cf[base_key].attrs.keys():
+                        variant[cname] = cf[base_key].attrs["variant"]
+                    else:
+                        variant[cname] = 0
+        if self.validate_constants_availability(cons_data, karabo_da):
+            self.init_constants(cons_data, when, module_idx, variant)
+            return when
+        else:
+            # Avoid initializing the constants and return None
+            # to skip correcting this module.
+            return
+    def initialize_from_db(
+        self, karabo_id: str,
+        karabo_da: str,
+        module_idx: int,
+        creation_time: datetime,
+        memory_cells: int,
+        bias_voltage: int,
+        gain_setting: float,
+        gain_mode: int,
+        acquisition_rate: float,
+        integration_time: int,
+        photon_energy: float = 9.2,
+        client=None,
+    ):
+        """ Initialize calibration constants from the calibration database
+        :param karabo_id: karabo detector identifier.
+        :param karabo_da: karabo module data aggregator name.
+        :param module_idx: module index to save retrieved CCV in sharedmem.
+        :param creation_time: time for desired calibration constant version.
+        :param memory_cells: number of memory cells used for CCV conditions.
+        :param bias_voltage: bias voltage used for CCV conditions.
+        :param gain_setting: gain setting used for CCV conditions.
+        :param gain_mode: gain mode used for CCV conditions.
+        :param acquisition_rate: acquistion rate used for CCV conditions.
+        :param integration_time: integration time used for CCV conditions.
+        :param photon_energy: photon energy value used for CCV conditions.
+        :param client: calibration_client object for CalibrationData objects.
+        Loading the following constants based on the
+        correction configuration:
+            Dark Image Derived
+            ------------------
+            * Constants.AGIPD.Offset
+            * Constants.AGIPD.Noise
+            * Constants.AGIPD.BadPixelsDark
+            * Constants.AGIPD.ThresholdsDark
+            Pulse Capacitor Derived
+            -----------------------
+            * Constants.AGIPD.SlopesPC
+            * Constants.AGIPD.BadPixelsPC
+            Flat-Field Derived
+            * Constants.AGIPD.SlopesFF
+            * Constants.AGIPD.BadPixelsFF
+        """
+        from cal_tools.calcat_interface import (
+            AGIPD_CalibrationData,
+            CalCatError,
+        )
+        when = {}
+        cons_data = {}
+        variant = {}
+        agipd_cal = AGIPD_CalibrationData(
+            detector_name=karabo_id,
+            modules=[karabo_da],
+            sensor_bias_voltage=bias_voltage,
+            memory_cells=memory_cells,
+            acquisition_rate=acquisition_rate,
+            integration_time=integration_time,
+            source_energy=photon_energy,
+            gain_mode=gain_mode,
+            gain_setting=gain_setting,
+            event_at=creation_time,
+            client=client,
+        )
+        agipd_md = dict()
+        for cname in self.all_constants:
+            if cname in self.gain_constants:
+                agipd_cal.gain_mode = None
+            else:
+                agipd_cal.gain_mode = gain_mode
+            try:
+                agipd_md = agipd_cal.metadata([cname])[karabo_da]
+            except CalCatError as e:
+                pass  # a validation of missing constants is done later.
+            if agipd_md.get(cname):
+                when[cname] = agipd_md[cname]["begin_validity_at"]
+                dataset = agipd_md[cname]["dataset"]
+                with h5py.File(agipd_cal.caldb_root / agipd_md[cname]["path"], "r") as cf:  # noqa
+                    cons_data[cname] = np.copy(cf[f"{dataset}/data"])
+                    variant[cname] = cf[dataset].attrs["variant"] if cf[dataset].attrs.keys() else 0  # noqa
+        if self.validate_constants_availability(cons_data, karabo_da):
+            self.init_constants(cons_data, when, module_idx, variant)
+            return when
+        else:
+            # Avoid initializing the constants and return None
+            # to skip correcting this module.
+            return
     def allocate_constants(self, modules, constant_shape):
         Allocate memory for correction constants
diff --git a/tests/ b/tests/
index 70b83e39b..937b8b7d1 100644
--- a/tests/
+++ b/tests/
@@ -409,49 +409,41 @@ def test_get_pdu_from_db(_agipd_const_cond):
 # TODO add a marker for accessing zmq end_point
-def test_raise_initialize_from_db():
-    creation_time = datetime.strptime(
-        "2020-01-07 13:26:48.00", "%Y-%m-%d %H:%M:%S.%f")
+def test_no_constants_initialize_from_db():
     agipd_corr = AgipdCorrections(
-        cell_sel=CellRange([0, 500, 1], MEM_CELLS))
-    agipd_corr.allocate_constants(
-        modules=[0],
-        constant_shape=(3, MEM_CELLS, 512, 128))
+        cell_sel=CellRange([0, 352, 1], MEM_CELLS))
-    with pytest.raises(KeyError):
-        agipd_corr.initialize_from_db(
-            karabo_id="TEST_DET_CAL_CI-1",
-            karabo_da="TEST_DET_CAL_DA1",
-            creation_time=creation_time,
-            memory_cells=MEM_CELLS,
-            bias_voltage=BIAS_VOLTAGE,
-            photon_energy=PHOTON_ENERGY,
-            gain_setting=GAIN_SETTING,
-            gain_mode=GAIN_MODE,
-            acquisition_rate=ACQ_RATE,
-            integration_time=INTEGRATION_TIME,
-            module_idx=0,
-        )
+    when = agipd_corr.initialize_from_db(
+        karabo_id="TEST_DET_CAL_CI-1",
+        karabo_da="TEST_DET_CAL_DA1",
+        creation_time="2020-01-07 13:26:48.00",
+        memory_cells=MEM_CELLS,
+        bias_voltage=BIAS_VOLTAGE,
+        photon_energy=PHOTON_ENERGY,
+        gain_setting=GAIN_SETTING,
+        gain_mode=GAIN_MODE,
+        acquisition_rate=ACQ_RATE,
+        integration_time=INTEGRATION_TIME,
+        module_idx=0,
+    )
+    assert when == None
 # TODO add a marker for accessing zmq end_point
 def test_initialize_from_db():
     creation_time = datetime.strptime(
         "2020-01-07 13:26:48.00", "%Y-%m-%d %H:%M:%S.%f")
     agipd_corr = AgipdCorrections(
-        cell_sel=CellRange([0, 500, 1], MEM_CELLS))
+        cell_sel=CellRange([0, 352, 1], MEM_CELLS))
         constant_shape=(3, MEM_CELLS, 512, 128))
-    agipd_corr
     dark_const_time_dict = agipd_corr.initialize_from_db(