From b536ff1ce1ba646bb9a89a03c10a37da508b6fc2 Mon Sep 17 00:00:00 2001
From: ahmedk <karim.ahmed@xfel.eu>
Date: Fri, 10 Feb 2023 17:28:25 +0100
Subject: [PATCH] Estimate the available memory to avoid dead kernel for long
 Jungfrau dark runs

---
 ...rk_analysis_all_gains_burst_mode_NBC.ipynb | 84 ++++++++++++++-----
 1 file changed, 61 insertions(+), 23 deletions(-)

diff --git a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb
index 05bda3f95..80fd57fde 100644
--- a/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb
+++ b/notebooks/Jungfrau/Jungfrau_dark_analysis_all_gains_burst_mode_NBC.ipynb
@@ -72,8 +72,10 @@
    "outputs": [],
    "source": [
     "import glob\n",
+    "import gc\n",
     "import os\n",
     "import warnings\n",
+    "from logging import warning\n",
     "from pathlib import Path\n",
     "warnings.filterwarnings('ignore')\n",
     "\n",
@@ -82,6 +84,7 @@
     "import multiprocessing\n",
     "import numpy as np\n",
     "import pasha as psh\n",
+    "import psutil\n",
     "import yaml\n",
     "from IPython.display import Markdown, display\n",
     "from extra_data import RunDirectory\n",
@@ -199,11 +202,11 @@
     "\n",
     "# A transperent workaround for old raw data with wrong/missing medium and low settings\n",
     "if med_low_settings == [None, None]:\n",
-    "    print(\"WARNING: run.settings is not stored in the data to read. \"\n",
-    "          f\"Hence assuming gain_mode = {gain_mode} for adaptive old data.\")\n",
+    "    warning(\"run.settings is not stored in the data to read. \"\n",
+    "            f\"Hence assuming gain_mode = {gain_mode} for adaptive old data.\")\n",
     "elif med_low_settings == [\"dynamicgain\", \"forceswitchg1\"]:\n",
-    "    print(f\"WARNING: run.settings for medium and low gain runs are wrong {med_low_settings}. \"\n",
-    "          f\"This is an expected bug for old raw data. Setting gain_mode to {gain_mode}.\")\n",
+    "    warning(f\"run.settings for medium and low gain runs are wrong {med_low_settings}. \"\n",
+    "            f\"This is an expected bug for old raw data. Setting gain_mode to {gain_mode}.\")\n",
     "# Validate that low_med_settings is not a mix of adaptive and fixed settings.\n",
     "elif not (sorted(med_low_settings) in [fixed_settings, dynamic_settings, old_fixed_settings]):  # noqa\n",
     "    raise ValueError(\n",
@@ -313,6 +316,29 @@
     "context = psh.context.ThreadContext(num_workers=multiprocessing.cpu_count())"
    ]
   },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "def calculate_parallel_threads(n_trains, memory_cells):\n",
+    "    \"\"\"Calculate the maximum number of parallel threads to use.\n",
+    "    If enough memory is free, each cell constant can be computed in parallel.\n",
+    "    Otherwise the parallel threads are reduced based on estimated memory consumption.\n",
+    "    If the n_trains exceed the free memory of one processes,\n",
+    "    number of trains to process will be reduced.\n",
+    "    \"\"\"\n",
+    "    reduced_trains = n_trains\n",
+    "    available_memory = (psutil.virtual_memory().available >> 30) - (16 * n_trains * (1024 * 512 * 3 + 1) // 1e9)\n",
+    "    parallel_threads = available_memory // ((1024 * 512 * 5 * n_trains) // 1e9)\n",
+    "    if parallel_threads < 1:\n",
+    "        reduced_trains = (available_memory // ((1024 * 512 * 4 / 1e9)))  - 4\n",
+    "        warning(f\"Reducing the processed trains from {n_trains} to {reduced_trains} to fit the free memory.\")\n",
+    "    \n",
+    "    return max(min(memory_cells, int(parallel_threads)), 1), int(reduced_trains)"
+   ]
+  },
   {
    "cell_type": "code",
    "execution_count": null,
@@ -343,7 +369,7 @@
     "\n",
     "    print(f\"\\n- Instrument data path for {mod} is {instrument_src}.\")\n",
     "\n",
-    "    offset_map[mod] = context.alloc(shape=(sensor_size+(memory_cells, 3)), fill=0)\n",
+    "    offset_map[mod] = context.alloc(shape=(sensor_size+(memory_cells, 3)), fill=0, dtype=np.float32)\n",
     "    noise_map[mod] = context.alloc(like=offset_map[mod], fill=0)\n",
     "    bad_pixels_map[mod] = context.alloc(like=offset_map[mod], dtype=np.uint32, fill=0)\n",
     "\n",
@@ -351,46 +377,54 @@
     "\n",
     "        def process_cell(worker_id, array_index, cell_number):\n",
     "            cell_slice_idx = acelltable == cell_number\n",
-    "            thiscell = images[..., cell_slice_idx]\n",
+    "            thiscell = images[..., cell_slice_idx]  # [1024, 512, n_trains]\n",
     "\n",
     "            # Identify cells/trains with images of 0 pixels.\n",
     "            # TODO: An investigation is ongoing by DET to identify reason for these empty images.\n",
-    "            nonzero_adc = np.any(thiscell != 0 , axis=(0, 1))\n",
+    "            nonzero_adc = np.any(thiscell != 0 , axis=(0, 1))  # [n_trains]\n",
     "\n",
     "            # Exclude empty images with 0 pixels, before calculating offset and noise\n",
     "            thiscell = thiscell[..., nonzero_adc]\n",
-    "            offset_map[mod][..., cell_number, gain] = np.mean(thiscell, axis=2)\n",
-    "            noise_map[mod][..., cell_number, gain] = np.std(thiscell, axis=2)\n",
-    "\n",
+    "            offset_map[mod][..., cell_number, gain] = np.mean(  # [1024, 512]\n",
+    "                thiscell, axis=2, dtype=np.float32)\n",
+    "            noise_map[mod][..., cell_number, gain] = np.std(  # [1024, 512]\n",
+    "                thiscell, axis=2, dtype=np.float32)\n",
+    "            del thiscell\n",
     "            # Check if there are wrong bad gain values.\n",
     "            # 1. Exclude empty images.\n",
     "            # 2. Indicate pixels with wrong gain value for any train for each cell.\n",
     "            # TODO: mean is used to use thresholds for accepting gain values, even if not 0 mean value.\n",
-    "            gain_avg = np.mean(\n",
-    "                gain_vals[..., cell_slice_idx][..., nonzero_adc], axis=2)\n",
-    "\n",
+    "            gain_avg = np.mean(  # [1024, 512]\n",
+    "                gain_vals[..., cell_slice_idx][..., nonzero_adc],\n",
+    "                axis=2, dtype=np.float32\n",
+    "            )\n",
+    "            del nonzero_adc\n",
+    "            # [1024, 512]\n",
     "            bad_pixels_map[mod][..., cell_number, gain][gain_avg != raw_g] |= BadPixels.WRONG_GAIN_VALUE.value\n",
+    "            del gain_avg\n",
     "        print(f\"Gain stage {gain}, run {run_n}\")\n",
     "\n",
     "        # load shape of data for memory cells, and detector size (imgs, cells, x, y)\n",
-    "        n_imgs = run_dc[instrument_src, \"data.adc\"].shape[0]\n",
+    "        n_trains = run_dc[instrument_src, \"data.adc\"].shape[0]\n",
     "        # load number of data available, including trains with empty data.\n",
-    "        n_trains = len(run_dc.train_ids)\n",
+    "        all_trains = len(run_dc.train_ids)\n",
     "        instr_dc = run_dc.select(instrument_src, require_all=True)\n",
-    "        empty_trains = n_trains - n_imgs\n",
+    "        empty_trains = all_trains - n_trains\n",
     "        if empty_trains != 0:\n",
-    "            print(f\"\\tWARNING: {mod} has {empty_trains} trains with empty data out of {n_trains} trains\")  # noqa\n",
+    "            print(f\"{mod} has {empty_trains} empty trains out of {all_trains} trains\")\n",
     "        if max_trains > 0:\n",
-    "            n_imgs = min(n_imgs, max_trains)\n",
-    "        print(f\"Processing {n_imgs} images.\")\n",
-    "        # Select only requested number of images to process darks.\n",
-    "        instr_dc = instr_dc.select_trains(np.s_[:n_imgs])\n",
+    "            n_trains = min(n_trains, max_trains)\n",
+    "        print(f\"Processing {n_trains} images.\")\n",
     "\n",
-    "        if n_imgs < min_trains:\n",
+    "        if n_trains < min_trains:\n",
     "            raise ValueError(\n",
     "                f\"Less than {min_trains} trains are available in RAW data.\"\n",
     "                \" Not enough data to process darks.\")\n",
     "\n",
+    "        parallel_processes, n_trains = calculate_parallel_threads(n_trains, memory_cells)\n",
+    "\n",
+    "        # Select only requested number of images to process darks.\n",
+    "        instr_dc = instr_dc.select_trains(np.s_[:n_trains])\n",
     "        images = np.transpose(\n",
     "            instr_dc[instrument_src, \"data.adc\"].ndarray(), (3, 2, 1, 0))\n",
     "        acelltable = np.transpose(instr_dc[instrument_src, \"data.memoryCell\"].ndarray())\n",
@@ -412,8 +446,12 @@
     "            acelltable[1:] = 255\n",
     "\n",
     "        # Calculate offset and noise maps\n",
+    "        context = psh.context.ThreadContext(num_workers=parallel_processes)\n",
     "        context.map(process_cell, range(memory_cells))\n",
-    "\n",
+    "        del images\n",
+    "        del acelltable\n",
+    "        del gain_vals\n",
+    "        gc.collect();\n",
     "    step_timer.done_step(f'Creating Offset and noise constants for a module.')"
    ]
   },
-- 
GitLab