From abd3f4cf9346ea961185286cb246a37ee99c2a3b Mon Sep 17 00:00:00 2001
From: David Hammer <dhammer@mailbox.org>
Date: Wed, 7 Apr 2021 13:48:06 +0200
Subject: [PATCH] Recreating showcase of threads for cell processing

---
 .../Characterize_AGIPD_Gain_Darks_NBC.ipynb   | 44 ++++++++++++-------
 setup.py                                      |  1 +
 2 files changed, 28 insertions(+), 17 deletions(-)

diff --git a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb
index 722eb0a11..460585328 100644
--- a/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb
+++ b/notebooks/AGIPD/Characterize_AGIPD_Gain_Darks_NBC.ipynb
@@ -86,6 +86,7 @@
     "import h5py\n",
     "import matplotlib\n",
     "import numpy as np\n",
+    "import pasha as psh\n",
     "import tabulate\n",
     "import yaml\n",
     "\n",
@@ -104,9 +105,7 @@
     "    get_gain_setting,\n",
     "    get_num_cells,\n",
     ")\n",
-    "\n",
     "from cal_tools.enums import AgipdGainMode, BadPixels\n",
-    "\n",
     "from cal_tools.plotting import (\n",
     "    create_constant_overview,\n",
     "    plot_badpix_3d,\n",
@@ -381,23 +380,26 @@
     "        ga = np.rollaxis(ga, 2)\n",
     "        ga = np.rollaxis(ga, 2, 1)\n",
     "    \n",
-    "    offset = np.zeros((im.shape[0], im.shape[1], num_cells))\n",
-    "    noise = np.zeros((im.shape[0], im.shape[1], num_cells))\n",
+    "    offset = psh.array((im.shape[0], im.shape[1], num_cells))\n",
+    "    noise = psh.array((im.shape[0], im.shape[1], num_cells))\n",
     "\n",
     "    if fixed_gain_mode:\n",
     "        gains = None\n",
     "        gains_std = None\n",
     "    else:\n",
-    "        gains = np.zeros((im.shape[0], im.shape[1], num_cells))\n",
-    "        gains_std = np.zeros((im.shape[0], im.shape[1], num_cells))\n",
-    "\n",
-    "    for cc in np.unique(cellIds[cellIds < num_cells]):\n",
-    "        cellidx = cellIds == cc\n",
-    "        offset[...,cc] = np.median(im[..., cellidx], axis=2)\n",
-    "        noise[...,cc] = np.std(im[..., cellidx], axis=2)\n",
+    "        gains = psh.array((im.shape[0], im.shape[1], num_cells))\n",
+    "        gains_std = psh.array((im.shape[0], im.shape[1], num_cells))\n",
+    "\n",
+    "    def process_cell(worker_id, array_index, cell_number):\n",
+    "        cell_slice_index = (cellIds == cell_number)\n",
+    "        im_slice = im[..., cell_slice_index]\n",
+    "        offset[..., cell_number] = np.median(im_slice, axis=2)\n",
+    "        noise[..., cell_number] = np.std(im_slice, axis=2)\n",
     "        if not fixed_gain_mode:\n",
-    "            gains[...,cc] = np.median(ga[..., cellidx], axis=2)\n",
-    "            gains_std[...,cc] = np.std(ga[..., cellidx], axis=2)\n",
+    "            ga_slice = ga[..., cell_slice_index]\n",
+    "            gains[..., cell_number] = np.median(ga_slice, axis=2)\n",
+    "            gains_std[..., cell_number] = np.std(ga_slice, axis=2)\n",
+    "    psh.map(process_cell, np.arange(num_cells))\n",
     "\n",
     "    # bad pixels\n",
     "    bp = np.zeros(offset.shape, np.uint32)\n",
@@ -422,6 +424,17 @@
     "    return offset, noise, gains, gains_std, gg, bp, num_cells, local_acq_rate"
    ]
   },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "parallel_num_procs = 4\n",
+    "parallel_num_threads = multiprocessing.cpu_count() // parallel_num_procs\n",
+    "psh.set_default_context(\"threads\", num_workers=parallel_num_threads)"
+   ]
+  },
   {
    "cell_type": "code",
    "execution_count": null,
@@ -435,7 +448,6 @@
     "    gain_g = OrderedDict()\n",
     "    gainstd_g = OrderedDict()\n",
     "    \n",
-    "start = datetime.now()\n",
     "all_cells = []\n",
     "all_acq_rate = []\n",
     "\n",
@@ -452,7 +464,7 @@
     "            continue\n",
     "        inp.append((fname_in, i, gg))\n",
     "\n",
-    "with multiprocessing.Pool() as pool:\n",
+    "with multiprocessing.Pool(processes=parallel_num_procs) as pool:\n",
     "    results = pool.starmap(characterize_module, inp)\n",
     "\n",
     "for offset, noise, gains, gains_std, gg, bp, thiscell, thisacq in results:\n",
@@ -476,8 +488,6 @@
     "            gainstd_g[qm][..., gg] = gains_std\n",
     "\n",
     "\n",
-    "duration = (datetime.now() - start).total_seconds()\n",
-    "\n",
     "max_cells = np.max(all_cells)\n",
     "print(f\"Using {max_cells} memory cells\")\n",
     "acq_rate = np.max(all_acq_rate)\n",
diff --git a/setup.py b/setup.py
index e506c721b..f5b32c6bd 100644
--- a/setup.py
+++ b/setup.py
@@ -76,6 +76,7 @@ setup(
         "iCalibrationDB @ git+ssh://git@git.xfel.eu:10022/detectors/cal_db_interactive.git@2.0.5",  # noqa
         "nbparameterise @ git+ssh://git@git.xfel.eu:10022/detectors/nbparameterise.git@0.3",  # noqa
         "XFELDetectorAnalysis @ git+ssh://git@git.xfel.eu:10022/karaboDevices/pyDetLib.git@2.5.6-2.10.0#subdirectory=lib",  # noqa
+        "pasha @ git+https://github.com/European-XFEL/pasha.git",
         "Cython==0.29.21",
         "Jinja2==2.11.2",
         "astcheck==0.2.5",
-- 
GitLab