From a4d4be9378997169b88fcca7bb012a2476e40950 Mon Sep 17 00:00:00 2001
From: ahmedk <>
Date: Tue, 18 Jul 2023 15:49:44 +0200
Subject: [PATCH] Move db function into NB

1. Move Initialize_from_db to the notebook
2. Remove function and it's test from cal_tools
3. Use CalibrationData Latex table displaying constant timestamps
4. Start storing data in fragment file
5. Avoid errors in case Thresholds or Noise were not retrieved.
6. Update and cleanup cell for adding timing summary
 .../AGIPD/AGIPD_Correct_and_Verify.ipynb      | 242 ++++++++++--------
 src/cal_tools/                     | 110 +-------
 src/cal_tools/             |  11 +-
 tests/                       |  57 -----
 4 files changed, 149 insertions(+), 271 deletions(-)

diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
index cfe04e181..3e4d29c52 100644
--- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
+++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
@@ -164,6 +164,10 @@
     "    LitFrameSelection,\n",
     "from cal_tools.ana_tools import get_range\n",
+    "from cal_tools.calcat_interface import (\n",
+    "    AGIPD_CalibrationData,\n",
+    "    CalCatError,\n",
+    ")\n",
     "from cal_tools.enums import AgipdGainMode, BadPixels\n",
     "from cal_tools.step_timing import StepTimer\n",
     "from import (\n",
@@ -481,10 +485,11 @@
+   "attachments": {},
    "cell_type": "markdown",
    "metadata": {},
    "source": [
-    "## Data processing ##"
+    "## Retrieving constants"
@@ -527,7 +532,29 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "module_index_to_karabo_da = {mod: da for (mod, da) in zip(modules, karabo_da)}"
+    "# set everything up filewise\n",
+    "mapped_files, _, total_sequences, _, _ =  map_modules_from_folder(\n",
+    "    str(in_folder), run, path_template, karabo_da, sequences\n",
+    ")\n",
+    "file_list = []\n",
+    "\n",
+    "# ToDo: Split table over pages\n",
+    "print(f\"Processing a total of {total_sequences} sequence files in chunks of {n_cores_files}\")\n",
+    "table = []\n",
+    "ti = 0\n",
+    "for k, files in mapped_files.items():\n",
+    "    i = 0\n",
+    "    for f in list(files.queue):\n",
+    "        file_list.append(f)\n",
+    "        if i == 0:\n",
+    "            table.append((ti, k, i, f))\n",
+    "        else:\n",
+    "            table.append((ti, \"\", i,  f))\n",
+    "        i += 1\n",
+    "        ti += 1\n",
+    "md = display(Latex(tabulate.tabulate(table, tablefmt='latex',\n",
+    "                                     headers=[\"#\", \"module\", \"# module\", \"file\"])))\n",
+    "file_list = sorted(file_list, key=lambda name: name[-10:])"
@@ -539,24 +566,18 @@
     "# Retrieve calibration constants to RAM\n",
     "agipd_corr.allocate_constants(modules, (3, mem_cells_db, 512, 128))\n",
-    "\n",
-    "from cal_tools.calcat_interface import (\n",
-    "    AGIPD_CalibrationData,\n",
-    "    CalCatError,\n",
-    ")\n",
-    "\n",
     "when = {}\n",
     "cons_data = {}\n",
     "variant = {}\n",
-    "\n",
+    "step_timer.start()\n",
     "agipd_cal = AGIPD_CalibrationData(\n",
     "    detector_name=karabo_id,\n",
-    "    modules=[karabo_da],\n",
+    "    modules=karabo_da,\n",
     "    sensor_bias_voltage=bias_voltage,\n",
     "    memory_cells=mem_cells,\n",
     "    acquisition_rate=acq_rate,\n",
     "    integration_time=integration_time,\n",
-    "    source_energy=photon_energy,\n",
+    "    source_energy=9.2,\n",
     "    gain_mode=gain_mode,\n",
     "    gain_setting=gain_setting,\n",
     "    event_at=creation_time,\n",
@@ -564,60 +585,107 @@
     "    caldb_root=Path(cal_db_root),\n",
+    "dark_constants = [\"Offset\", \"Noise\", \"BadPixelsDark\"]\n",
+    "if not gain_mode:  # Adaptive gain\n",
+    "    dark_constants.append(\"ThresholdsDark\")\n",
-    "dark_constants = [\"Offset\",\"Noise\", \"ThresholdsDark\", \"BadPixelsDark\"]\n",
-    "pc_constants = [\"SlopesPC\", \"BadPixelsPC\"]\n",
-    "ff_constants = agipd_cal.illuminated_calibrations\n",
-    "\n",
-    "agipd_metadata = agipd_cal.metadata()\n",
+    "gain_constants = []\n",
+    "if any(agipd_corr.pc_bools):\n",
+    "    gain_constants += [\"SlopesPC\", \"BadPixelsPC\"]\n",
+    "if agipd_corr.corr_bools.get('xray_corr'):\n",
+    "    gain_constants += agipd_cal.illuminated_calibrations\n",
     "agipd_metadata = agipd_cal.metadata(dark_constants)\n",
-    "for gain_constants in [pc_constants, ff_constants]:\n",
+    "if gain_constants:\n",
+    "    # Gain constants doesn't have gain_mode condition.\n",
+    "    agipd_cal.gain_mode = None\n",
     "    try:\n",
     "        illum_metadata = agipd_cal.metadata(gain_constants)\n",
     "        for key, value in illum_metadata.items():\n",
     "            agipd_metadata.setdefault(key, {}).update(value)\n",
     "    except CalCatError as e:  # TODO: replace when API errors are improved.\n",
     "        warning(f\"CalCatError: {e}\")\n",
-    "\n",
-    "agipd_md = dict()\n",
-    "\n",
-    "for cname in self.all_constants:\n",
-    "    if cname in self.gain_constants:\n",
-    "        agipd_cal.gain_mode = None\n",
-    "    else:\n",
-    "        agipd_cal.gain_mode = gain_mode\n",
-    "    try:\n",
-    "        agipd_md = agipd_cal.metadata([cname])[karabo_da]\n",
-    "    except CalCatError as e:\n",
-    "        pass  # a validation of missing constants is done later.\n",
-    "    if agipd_md.get(cname):\n",
-    "        when[cname] = agipd_md[cname][\"begin_validity_at\"]\n",
-    "        dataset = agipd_md[cname][\"dataset\"]\n",
-    "        with h5py.File(agipd_cal.caldb_root / agipd_md[cname][\"path\"], \"r\") as cf:  # noqa\n",
-    "            cons_data[cname] = np.copy(cf[f\"{dataset}/data\"])\n",
-    "            variant[cname] = cf[dataset].attrs[\"variant\"] if cf[dataset].attrs.keys() else 0  # noqa\n",
-    "\n",
-    "# skip initializing the constants if a dark constant is missing.\n",
-    "if self.validate_constants_availability(cons_data, karabo_da):\n",
-    "    self.init_constants(cons_data, when, module_idx, variant)\n",
-    "    return when\n",
-    "else:\n",
-    "    # Avoid initializing the constants and return None\n",
-    "    # to skip correcting this module.\n",
-    "    return\n",
-    "\n",
+    "step_timer.done_step(\"Constants were retrieved in\")\n",
+    "\n",
+    "print(\"Preparing constants (\"\n",
+    "      f\"FF: {agipd_corr.corr_bools.get('xray_corr', False)}, \"\n",
+    "      f\"PC: {any(agipd_corr.pc_bools)}, \"\n",
+    "      f\"BLC: {any(agipd_corr.blc_bools)})\")\n",
+    "# Display retrieved calibration constants timestamps\n",
+    "agipd_cal.display_markdown_retrieved_constants(metadata=agipd_metadata)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# Validate constants availability and exclude modules with no offsets.\n",
+    "for da, calibrations in agipd_metadata.items():\n",
+    "    mod = modules[karabo_da.index(da)]\n",
+    "    if {\"Offset\"} - set(calibrations):\n",
+    "        warning(f\"Offset constant is not available to correct {da}.\")\n",
+    "        mapped_files.remove(module_index_to_qm(mod))\n",
+    "        karabo_da.drop(da)\n",
+    "        modules.drop(mod)\n",
+    "\n",
+    "    warn_missing_constants = set(dark_constants + gain_constants)\n",
+    "    warn_missing_constants.remove(\"Offset\")\n",
+    "    warn_missing_constants = warn_missing_constants - set(calibrations)\n",
+    "    if warn_missing_constants:\n",
+    "        warning(f\"Constants {warn_missing_constants} were not retrieved for {da}.\")\n",
+    "\n",
+    "if not mapped_files:  # Offsets are missing for all modules.\n",
+    "    raise Exception(\"Could not find offset constants for any modules, will not correct data.\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
     "# Record constant details in YAML metadata\n",
     "    out_folder=(metadata_folder or out_folder),\n",
-    "    det_metadata=jf_metadata,\n",
-    "    caldb_root=jf_cal.caldb_root)\n",
+    "    det_metadata=agipd_metadata,\n",
+    "    caldb_root=agipd_cal.caldb_root)"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "# load constants data\n",
+    "def load_constants(da, module):\n",
+    "    \"\"\"\n",
+    "    Initialize constants data from previously retrieved metadata.\n",
+    "\n",
+    "    Args:\n",
+    "        da (str): Data Aggregator (Karabo DA)\n",
+    "        module (int): Module index\n",
+    "\n",
+    "    Returns:\n",
+    "        (int, dict, str): Module index, {constant name: creation time}, Karabo DA\n",
+    "    \"\"\"\n",
+    "    const_data = dict()\n",
+    "    when = dict()\n",
+    "\n",
+    "    for cname, mdata in agipd_metadata[da].items():\n",
+    "        when[cname] = mdata[\"begin_validity_at\"]\n",
+    "        dataset = mdata[\"dataset\"]\n",
+    "        with h5py.File(agipd_cal.caldb_root / mdata[\"path\"], \"r\") as cf:  # noqa\n",
+    "            const_data[cname] = np.copy(cf[f\"{dataset}/data\"])\n",
+    "            variant[cname] = cf[dataset].attrs[\"variant\"] if cf[dataset].attrs.keys() else 0  # noqa\n",
+    "    agipd_corr.init_constants(const_data, when, mod, variant)\n",
+    "    return module, when, da\n",
-    "print(f'Preparing constants (FF: {agipd_corr.corr_bools.get(\"xray_corr\", False)}, PC: {any(agipd_corr.pc_bools)}, '\n",
-    "      f'BLC: {any(agipd_corr.blc_bools)})')\n",
     "with multiprocessing.Pool(processes=len(modules)) as pool:\n",
-    "    const_out =, modules)\n",
+    "    const_out = pool.starmap(load_constants, zip(karabo_da, modules))\n",
     "step_timer.done_step(f'Constants were loaded in ')"
@@ -634,74 +702,28 @@
     "for i, (modno, when, k_da) in enumerate(const_out):\n",
-    "    if when is None:\n",
-    "        karabo_da.remove(k_da)\n",
-    "        warning(f\"Skip correcting {k_da} because of missing dark constants.\")\n",
-    "        continue\n",
-    "\n",
-    "    qm = module_index_to_qm(modno)\n",
-    "\n",
-    "    if k_da not in const_yaml:\n",
-    "        if fst_print:\n",
-    "            print(\"Constants are retrieved with creation time: \")\n",
-    "            fst_print = False\n",
+    "    module_timestamps = {}\n",
-    "        module_timestamps = {}\n",
-    "\n",
-    "        print(f\"{qm}:\")\n",
-    "        for key, item in when.items():\n",
-    "            if hasattr(item, 'strftime'):\n",
-    "                item = item.strftime('%y-%m-%d %H:%M')\n",
-    "            when[key] = item\n",
-    "            print('{:.<12s}'.format(key), item)\n",
-    "\n",
-    "        # Store few time stamps if exists\n",
-    "        # Add NA to keep array structure\n",
-    "        for key in ['Offset', 'SlopesPC', 'SlopesFF']:\n",
-    "            if when and key in when and when[key]:\n",
-    "                module_timestamps[key] = when[key]\n",
-    "            else:\n",
-    "                module_timestamps[key] = \"NA\"\n",
-    "        timestamps[qm] = module_timestamps\n",
-    "\n",
-    "if not karabo_da:  # Raise Error if dark constants are missing for all modules.\n",
-    "    raise ValueError(\"Dark constants are missing for all modules.\")\n",
+    "    # Store few time stamps if exists\n",
+    "    # Add NA to keep array structure\n",
+    "    for key in ['Offset', 'SlopesPC', 'SlopesFF']:\n",
+    "        if key in when and when[key]:\n",
+    "            module_timestamps[key] = when[key]\n",
+    "        else:\n",
+    "            module_timestamps[key] = \"NA\"\n",
+    "    timestamps[module_index_to_qm(modno)] = module_timestamps\n",
     "seq = sequences[0] if sequences else 0\n",
-    "if timestamps:\n",
-    "    with open(f\"{out_folder}/retrieved_constants_s{seq}.yml\",\"w\") as fd:\n",
-    "        yaml.safe_dump({\"time-summary\": {f\"S{seq}\": timestamps}}, fd)"
+    "\n",
+    "with open(f\"{out_folder}/retrieved_constants_s{seq}.yml\",\"w\") as fd:\n",
+    "    yaml.safe_dump({\"time-summary\": {f\"S{seq}\": timestamps}}, fd)"
-   "cell_type": "code",
-   "execution_count": null,
+   "cell_type": "markdown",
    "metadata": {},
-   "outputs": [],
    "source": [
-    "# set everything up filewise\n",
-    "mapped_files, _, total_sequences, _, _ =  map_modules_from_folder(\n",
-    "    str(in_folder), run, path_template, karabo_da, sequences\n",
-    ")\n",
-    "file_list = []\n",
-    "\n",
-    "# ToDo: Split table over pages\n",
-    "print(f\"Processing a total of {total_sequences} sequence files in chunks of {n_cores_files}\")\n",
-    "table = []\n",
-    "ti = 0\n",
-    "for k, files in mapped_files.items():\n",
-    "    i = 0\n",
-    "    for f in list(files.queue):\n",
-    "        file_list.append(f)\n",
-    "        if i == 0:\n",
-    "            table.append((ti, k, i, f))\n",
-    "        else:\n",
-    "            table.append((ti, \"\", i,  f))\n",
-    "        i += 1\n",
-    "        ti += 1\n",
-    "md = display(Latex(tabulate.tabulate(table, tablefmt='latex',\n",
-    "                                     headers=[\"#\", \"module\", \"# module\", \"file\"])))\n",
-    "file_list = sorted(file_list, key=lambda name: name[-10:])"
+    "## Data processing ##"
diff --git a/src/cal_tools/ b/src/cal_tools/
index 530f86c52..a99d5589f 100644
--- a/src/cal_tools/
+++ b/src/cal_tools/
@@ -1189,9 +1189,18 @@ class AgipdCorrections:
         # assuming this method runs in parallel.
         calgs_opts = dict(num_threads=os.cpu_count() // len(self.offset))
-        calgs.transpose_constant(self.offset[module_idx], cons_data['Offset'], **calgs_opts)
-        calgs.transpose_constant(self.noise[module_idx], cons_data['Noise'], **calgs_opts)
-        if self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN:
+        calgs.transpose_constant(
+            self.offset[module_idx], cons_data['Offset'], **calgs_opts)
+        # In case noise wasn't retrieved no need for transposing.
+        if 'Noise' in cons_data:
+            calgs.transpose_constant(
+                self.noise[module_idx], cons_data['Noise'], **calgs_opts)
+        if (
+            self.gain_mode is AgipdGainMode.ADAPTIVE_GAIN and
+            'ThresholdsDark' in cons_data
+        ):
                                      cons_data['ThresholdsDark'][..., :3],
@@ -1373,101 +1382,6 @@ class AgipdCorrections:
             return True
-    def initialize_from_db(
-        self, karabo_id: str, karabo_da: str,
-        creation_time: datetime, memory_cells: float,
-        bias_voltage: int, gain_setting: int,
-        acquisition_rate: float, gain_mode: int,
-        integration_time: int, module_idx: int,
-        photon_energy: float = 9.2,
-        client: "calibration_client.CalibrationClient" = None,  # noqa
-    ):
-        """ Initialize calibration constants from the calibration database
-        :param karabo_id: karabo detector identifier
-        :param karabo_da: karabo module data aggregator name
-        :param module_idx: module index to save retrieved CCV in sharedmem
-        :param creation_time: time for desired calibration constant version
-        :param memory_cells: number of memory cells used for CCV conditions
-        :param bias_voltage: bias voltage used for CCV conditions
-        :param gain_setting: gain setting used for CCV conditions
-        :param gain_mode: gain mode used for CCV conditions
-        :param acquisition_rate: acquistion rate used for CCV conditions
-        :param integration_time: integration time used for CCV conditions
-        :param photon_energy: photon energy value used for CCV conditions
-        :param client: calibration_client object for CalibrationData objects
-        Loading the following constants based on the
-        correction configuration:
-            Dark Image Derived
-            ------------------
-            * Constants.AGIPD.Offset
-            * Constants.AGIPD.Noise
-            * Constants.AGIPD.BadPixelsDark
-            * Constants.AGIPD.ThresholdsDark
-            Pulse Capacitor Derived
-            -----------------------
-            * Constants.AGIPD.SlopesPC
-            * Constants.AGIPD.BadPixelsPC
-            Flat-Field Derived
-            ------------------
-            * Constants.AGIPD.SlopesFF
-            * Constants.AGIPD.BadPixelsFF
-        """
-        from cal_tools.calcat_interface import (
-            AGIPD_CalibrationData,
-            CalCatError,
-        )
-        when = {}
-        cons_data = {}
-        variant = {}
-        agipd_cal = AGIPD_CalibrationData(
-            detector_name=karabo_id,
-            modules=[karabo_da],
-            sensor_bias_voltage=bias_voltage,
-            memory_cells=memory_cells,
-            acquisition_rate=acquisition_rate,
-            integration_time=integration_time,
-            source_energy=photon_energy,
-            gain_mode=gain_mode,
-            gain_setting=gain_setting,
-            event_at=creation_time,
-            client=client,
-        )
-        agipd_md = dict()
-        for cname in self.all_constants:
-            if cname in self.gain_constants:
-                agipd_cal.gain_mode = None
-            else:
-                agipd_cal.gain_mode = gain_mode
-            try:
-                agipd_md = agipd_cal.metadata([cname])[karabo_da]
-            except CalCatError as e:
-                pass  # a validation of missing constants is done later.
-            if agipd_md.get(cname):
-                when[cname] = agipd_md[cname]["begin_validity_at"]
-                dataset = agipd_md[cname]["dataset"]
-                with h5py.File(agipd_cal.caldb_root / agipd_md[cname]["path"], "r") as cf:  # noqa
-                    cons_data[cname] = np.copy(cf[f"{dataset}/data"])
-                    variant[cname] = cf[dataset].attrs["variant"] if cf[dataset].attrs.keys() else 0  # noqa
-        # skip initializing the constants if a dark constant is missing.
-        if self.validate_constants_availability(cons_data, karabo_da):
-            self.init_constants(cons_data, when, module_idx, variant)
-            return when
-        else:
-            # Avoid initializing the constants and return None
-            # to skip correcting this module.
-            return
     def allocate_constants(self, modules, constant_shape):
         Allocate memory for correction constants
diff --git a/src/cal_tools/ b/src/cal_tools/
index 0a84efc6f..f1cd30c32 100644
--- a/src/cal_tools/
+++ b/src/cal_tools/
@@ -176,7 +176,7 @@ class CalCatApi(metaclass=ClientWrapper):
                 for pdu in resp_pdus["data"]
-            raise ValueError(f"{module_naming} is unknown!")
+            raise ValueError(f"{module_naming} is unknown!. Expected da, modno, or qm")
@@ -1037,12 +1037,11 @@ class AGIPD_CalibrationData(SplitConditionCalibrationData):
         cond = super()._build_condition(parameters)
         # Fix-up some database quirks.
-        gain_mode = cond.get("Gain mode", None)
-        if gain_mode is not None and int(gain_mode) == 0:
-            del cond["Gain mode"]
-        elif gain_mode is not None:
+        if cond.get("Gain mode", None):
             cond["Gain mode"] = 1
+        else:
+            cond.pop("Gain mode", None)
         if int(cond.get("Integration time", None)) == 12:
             del cond["Integration time"]
diff --git a/tests/ b/tests/
index 59a14d2cb..0c24cc4f8 100644
--- a/tests/
+++ b/tests/
@@ -407,63 +407,6 @@ def test_get_pdu_from_db(_agipd_const_cond):
-def test_no_constants_initialize_from_db():
-    agipd_corr = AgipdCorrections(
-        max_cells=MEM_CELLS,
-        cell_sel=CellRange([0, 352, 1], MEM_CELLS))
-    when = agipd_corr.initialize_from_db(
-        karabo_id="TEST_DET_CAL_CI-1",
-        karabo_da="TEST_DET_CAL_DA1",
-        creation_time="2020-01-07 13:26:48.00",
-        memory_cells=MEM_CELLS,
-        bias_voltage=BIAS_VOLTAGE,
-        photon_energy=PHOTON_ENERGY,
-        gain_setting=GAIN_SETTING,
-        gain_mode=GAIN_MODE,
-        acquisition_rate=ACQ_RATE,
-        integration_time=INTEGRATION_TIME,
-        module_idx=0,
-    )
-    assert when == None
-def test_initialize_from_db():
-    creation_time = datetime.strptime(
-        "2020-01-07 13:26:48.00", "%Y-%m-%d %H:%M:%S.%f")
-    agipd_corr = AgipdCorrections(
-        max_cells=MEM_CELLS,
-        cell_sel=CellRange([0, 352, 1], MEM_CELLS))
-    agipd_corr.allocate_constants(
-        modules=[0],
-        constant_shape=(3, MEM_CELLS, 512, 128))
-    dark_const_time_dict = agipd_corr.initialize_from_db(
-        karabo_id=AGIPD_KARABO_ID,
-        karabo_da="AGIPD00",
-        creation_time=creation_time,
-        memory_cells=MEM_CELLS,
-        bias_voltage=BIAS_VOLTAGE,
-        photon_energy=PHOTON_ENERGY,
-        gain_setting=GAIN_SETTING,
-        gain_mode=GAIN_MODE,
-        integration_time=INTEGRATION_TIME,
-        acquisition_rate=ACQ_RATE,
-        module_idx=0,
-    )
-    # A retrieved constant has a value of datetime creation_time
-    assert isinstance(dark_const_time_dict["Offset"], str)
-    assert set(dark_const_time_dict.keys()) == {
-        "Offset", "Noise", "ThresholdsDark", "BadPixelsDark"}
 def test_module_index_to_qm():
     assert module_index_to_qm(0) == 'Q1M1'