diff --git a/notebooks/LPD/LPDChar_Darks_NBC.ipynb b/notebooks/LPD/LPDChar_Darks_NBC.ipynb index c4820ced9a370e962349ecc99ee86cfcb78ddf8d..a96a7ab6d750a3dcdb22da07506f439fc058c68e 100644 --- a/notebooks/LPD/LPDChar_Darks_NBC.ipynb +++ b/notebooks/LPD/LPDChar_Darks_NBC.ipynb @@ -22,7 +22,6 @@ "metadata": {}, "outputs": [], "source": [ - "cluster_profile = \"noDB\" # The ipcluster profile to use\n", "in_folder = \"/gpfs/exfel/exp/FXE/202030/p900121/raw\" # path to input data, required\n", "out_folder = \"/gpfs/exfel/data/scratch/ahmedk/test/LPD/\" # path to output to, required\n", "metadata_folder = \"\" # Directory containing calibration_metadata.yml when run by xfel-calibrate\n", @@ -68,6 +67,7 @@ "outputs": [], "source": [ "import copy\n", + "import multiprocessing\n", "import os\n", "import warnings\n", "from collections import OrderedDict\n", @@ -79,7 +79,8 @@ "import dateutil.parser\n", "import h5py\n", "import matplotlib\n", - "from ipyparallel import Client\n", + "import pasha as psh\n", + "import scipy.stats\n", "from IPython.display import Latex, Markdown, display\n", "\n", "matplotlib.use(\"agg\")\n", @@ -123,9 +124,6 @@ "metadata": {}, "outputs": [], "source": [ - "client = Client(profile=cluster_profile)\n", - "view = client[:]\n", - "view.use_dill()\n", "gains = np.arange(3)\n", "max_cells = mem_cells\n", "cells = np.arange(max_cells)\n", @@ -200,15 +198,11 @@ "metadata": {}, "outputs": [], "source": [ - "# the actual characterization\n", - "def characterize_module(cells, bp_thresh, skip_first_ntrains, ntrains, test_for_normality, \n", - " h5path, h5path_idx, inp):\n", - " import copy\n", + "parallel_num_procs = min(6, len(modules)*3)\n", + "parallel_num_threads = multiprocessing.cpu_count() // parallel_num_procs\n", "\n", - " import h5py\n", - " import numpy as np\n", - " import scipy.stats\n", - " from cal_tools.enums import BadPixels\n", + "# the actual characterization\n", + "def characterize_module(filename, channel, gg, cap):\n", "\n", " def splitOffGainLPD(d):\n", " msk = np.zeros(d.shape, np.uint16)\n", @@ -219,22 +213,19 @@ " gain[gain > 2] = 2\n", " return data, gain\n", "\n", - " filename, channel, gg, cap = inp\n", - " thresholds_offset_hard, thresholds_offset_sigma, thresholds_noise_hard, thresholds_noise_sigma = bp_thresh\n", - "\n", " infile = h5py.File(filename, \"r\")\n", " \n", - " h5path = h5path.format(channel)\n", - " h5path_idx = h5path_idx.format(channel)\n", - " count = infile[f\"{h5path_idx}/count\"][()]\n", - " first = infile[f\"{h5path_idx}/first\"][()]\n", + " instrument_src = h5path.format(channel)\n", + " index_src = h5path_idx.format(channel)\n", + " count = infile[f\"{index_src}/count\"][()]\n", + " first = infile[f\"{index_src}/first\"][()]\n", " valid = count != 0\n", " count, first = count[valid], first[valid]\n", " first_image = int(first[skip_first_ntrains] if first.shape[0] > skip_first_ntrains else 0)\n", " last_image = int(first_image + np.sum(count[skip_first_ntrains:skip_first_ntrains+ntrains]))\n", "\n", - " im = np.array(infile[\"{}/data\".format(h5path, channel)][first_image:last_image, ...])\n", - " cellid = np.squeeze(np.array(infile[\"{}/cellId\".format(h5path, channel)][first_image:last_image, ...]))\n", + " im = np.array(infile[\"{}/data\".format(instrument_src, channel)][first_image:last_image, ...])\n", + " cellid = np.squeeze(np.array(infile[\"{}/cellId\".format(instrument_src, channel)][first_image:last_image, ...]))\n", " infile.close()\n", "\n", " im, g = splitOffGainLPD(im[:, 0, ...])\n", @@ -243,18 +234,20 @@ " im = np.rollaxis(im, 2)\n", " im = np.rollaxis(im, 2, 1)\n", "\n", - " offset = np.zeros((im.shape[0], im.shape[1], cells))\n", - " noise = np.zeros((im.shape[0], im.shape[1], cells))\n", - " normal_test = np.zeros((im.shape[0], im.shape[1], cells))\n", - " for cc in range(cells):\n", + " context = psh.context.ThreadContext(num_workers=parallel_num_threads)\n", + " offset = context.alloc(shape=(im.shape[0], im.shape[1], max_cells), dtype=np.float64)\n", + " noise = context.alloc(like=offset)\n", + " normal_test = context.alloc(like=offset)\n", + " def process_cell(worker_id, array_index, cc):\n", " idx = cellid == cc\n", + " im_slice = im[..., idx]\n", " if np.any(idx):\n", - "\n", - " offset[..., cc] = np.median(im[:, :, idx], axis=2)\n", - " noise[..., cc] = np.std(im[:, :, idx], axis=2)\n", + " offset[..., cc] = np.median(im_slice, axis=2)\n", + " noise[..., cc] = np.std(im_slice, axis=2)\n", " if test_for_normality:\n", " _, normal_test[..., cc] = scipy.stats.normaltest(\n", " im[:, :, idx], axis=2)\n", + " context.map(process_cell, np.unique(cellid))\n", "\n", " # bad pixels\n", " bp = np.zeros(offset.shape, np.uint32)\n", @@ -279,9 +272,15 @@ " bp[~np.isfinite(noise)] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value\n", "\n", " idx = cellid == 12\n", - " return offset, noise, channel, gg, cap, bp, im[12, 12, idx], normal_test\n", - "\n", - "\n", + " return offset, noise, channel, gg, cap, bp, im[12, 12, idx], normal_test" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ "offset_g = OrderedDict()\n", "noise_g = OrderedDict()\n", "badpix_g = OrderedDict()\n", @@ -313,15 +312,8 @@ "\n", " gg+=1\n", "\n", - "p = partial(characterize_module, max_cells,\n", - " (thresholds_offset_hard, thresholds_offset_sigma,\n", - " thresholds_noise_hard, thresholds_noise_sigma),\n", - " skip_first_ntrains, ntrains, test_for_normality,\n", - " h5path, h5path_idx)\n", - "\n", - "# Don't remove. Used for Debugging.\n", - "#results = list(map(p, inp))\n", - "results = view.map_sync(p, inp)\n", + "with multiprocessing.Pool(processes=parallel_num_procs) as pool:\n", + " results = pool.starmap(characterize_module, inp)\n", "\n", "for ir, r in enumerate(results):\n", " offset, noise, i, gg, cap, bp, data, normal = r\n",