From 4ccbff34088399058d1612eca8747fa1e96a75ca Mon Sep 17 00:00:00 2001
From: Karim Ahmed <>
Date: Mon, 25 May 2020 13:41:18 +0200
Subject: [PATCH] fixing agipd corr and agipd pc

 bin/                        |  2 +-
 cal_tools/cal_tools/               | 11 ++-
 .../AGIPD/AGIPD_Correct_and_Verify.ipynb      | 75 +++++++++++++------
 requirements.txt                              |  7 +-
 xfel_calibrate/                   |  2 +-
 5 files changed, 71 insertions(+), 26 deletions(-)

diff --git a/bin/ b/bin/
index 6b43b5065..44be0e93e 100755
--- a/bin/
+++ b/bin/
@@ -52,7 +52,7 @@ fi
 echo "Running script"
-${jupyter_path} nbconvert --to rst --ExecutePreprocessor.timeout=86400 --ExecutePreprocessor.allow_errors=True --TemplateExporter.exclude_input=True --execute ${nb_path}
+${jupyter_path} nbconvert --to rst --ExecutePreprocessor.timeout=36000 --ExecutePreprocessor.allow_errors=True --TemplateExporter.exclude_input=True --execute ${nb_path}
 # stop the cluster if requested
 if [ "${uuid}" != "NO_CLUSTER" ]
diff --git a/cal_tools/cal_tools/ b/cal_tools/cal_tools/
index 1ac77d9b8..bbc628415 100644
--- a/cal_tools/cal_tools/
+++ b/cal_tools/cal_tools/
@@ -323,6 +323,12 @@ class AgipdCorrections:
             print("Threshold medians are {}".format(
                 np.nanmedian(self.thresholds, axis=(0, 1, 2))))
+        # delete unneeded parameters.
+        todel = [offset, rel_gain, xray_cor, mask, noise, thresholds,
+                 swap_axis, frac_high_med, md_additional_offset]
+        for d in todel:
+            if d is not None:
+                del d
     def get_shadowed_stripe(self, data, threshold, fraction):
         Return list of shadowed regions.
@@ -1655,8 +1661,9 @@ class AgipdCorrections:
         for cname, mdata in const_yaml[dname].items():
             when[cname] = mdata["creation-time"]
             if when[cname]:
-                with h5py.File(mdata["file-path"], "r") as cf:
-                    cons_data[cname] = np.copy(cf[f"{dname}/{cname}/0/data"])
+                cf = h5py.File(mdata["file-path"], "r")
+                cons_data[cname] = np.copy(cf[f"{dname}/{cname}/0/data"])
+                cf.close()
                 # Create empty constant using the list elements
                 cons_data[cname] = \
diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
index d3832232a..af39d80a4 100644
--- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
+++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
@@ -24,11 +24,10 @@
    "source": [
     "cluster_profile = \"noDB\"\n",
     "in_folder = \"/gpfs/exfel/exp/SPB/202030//p900119/raw\" # the folder to read data from, required\n",
-    "out_folder =  \"/gpfs/exfel/data/scratch/ahmedk/test/AGIPD_SPB4\"  # the folder to output to, required\n",
+    "out_folder =  \"/gpfs/exfel/data/scratch/ahmedk/test/AGIPD_SPB0\"  # the folder to output to, required\n",
     "sequences =  [-1] # sequences to correct, set to -1 for all, range allowed\n",
     "modules = [-1] # modules to correct, set to -1 for all, range allowed\n",
     "run = 80 # runs to process, required\n",
-    "\n",
     "karabo_id = \"SPB_DET_AGIPD1M-1\" # karabo karabo_id\n",
     "karabo_da = ['-1']  # a list of data aggregators names, Default [-1] for selecting all data aggregators\n",
     "receiver_id = \"{}CH0\" # inset for receiver devices\n",
@@ -172,7 +171,7 @@
     "    sequences = None\n",
     "CHUNK_SIZE = 250\n",
-    "MAX_PAR = 32\n",
+    "MAX_PAR = 16\n",
     "if in_folder[-1] == \"/\":\n",
     "    in_folder = in_folder[:-1]\n",
@@ -372,7 +371,7 @@
     "                   bins_gain_vs_signal, bins_signal_low_range, bins_signal_high_range,\n",
     "                   bins_dig_gain_vs_signal, max_pulses, dbparms, fileparms, nodb, chunk_size_idim,\n",
     "                   special_opts, il_mode, loc, dinstance, force_hg_if_below, force_mg_if_below,\n",
-    "                   mask_noisy_adc, acq_rate, gain_setting, corr_bools, h5path, h5path_idx, const_yaml, inp):\n",
+    "                   mask_noisy_adc, acq_rate, gain_setting, corr_bools, h5path, h5path_idx, const_yaml, out_folder, inp):\n",
     "    print(\"foo\")\n",
     "    import numpy as np\n",
     "    import copy\n",
@@ -382,16 +381,17 @@
     "    from datetime import datetime\n",
     "    import re\n",
     "    import os\n",
-    "    from influxdb import InfluxDBClient\n",
+    "    import gc\n",
+    "  #  from influxdb import InfluxDBClient\n",
     "    import subprocess\n",
     "    from iCalibrationDB import Constants, Conditions, Detectors\n",
     "    from cal_tools.enums import BadPixels\n",
     "    from cal_tools.agipdlib import AgipdCorrections, SnowResolution\n",
     "    from cal_tools.agipdlib import get_num_cells, get_acq_rate\n",
-    "    \n",
-    "  \n",
-    "    #client = InfluxDBClient('exflqr18318', 8086, 'root', 'root', 'calstats')\n",
+    "    import logging \n",
+    "    #client = InfluxDBClient('exflqr18318', 8086, 'root', 'root', 'calstats')\n",
+    "    \"\"\"\n",
     "    def create_influx_entry(run, proposal, qm, sequence, filesize, chunksize,\n",
     "                            total_sequences, success, runtime, reason=\"\"):\n",
     "        return {\n",
@@ -418,6 +418,7 @@
     "                \"runtime\": runtime,                \n",
     "            }\n",
     "        }\n",
+    "    \"\"\"\n",
     "    \n",
     "    hists_signal_low = None\n",
     "    hists_signal_high = None\n",
@@ -438,19 +439,44 @@
     "        success = True\n",
     "        reason = \"\"\n",
     "        filename, filename_out, channel, qm = inp\n",
-    "        print(\"Have input\")\n",
+    "        \n",
+    "        \n",
+    "        \n",
+    "        base = os.path.basename(filename_out)\n",
+    "        f = os.path.splitext(base)[0]\n",
+    "        #Create and configure logger\n",
+    "        os.makedirs(f\"{out_folder}/logs\", exist_ok=True)\n",
+    "        logfile=f\"{out_folder}/logs/log_partial_{f}.log\"\n",
+    "        logger = logging.getLogger('server_logger')\n",
+    "        logger.setLevel(logging.DEBUG)\n",
+    "        # create file handler which logs even debug messages\n",
+    "        fh = logging.FileHandler(logfile)\n",
+    "        fh.setLevel(logging.DEBUG)\n",
+    "        # create console handler with a higher log level\n",
+    "        ch = logging.StreamHandler()\n",
+    "        ch.setLevel(logging.ERROR)\n",
+    "        # create formatter and add it to the handlers\n",
+    "        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')\n",
+    "        ch.setFormatter(formatter)\n",
+    "        fh.setFormatter(formatter)\n",
+    "        # add the handlers to logger\n",
+    "        logger.addHandler(ch)\n",
+    "        logger.addHandler(fh)\n",
+    "        fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'\n",
+    "        logging.basicConfig(filename=logfile,\n",
+    "                        level=getattr(logging, \"DEBUG\"),\n",
+    "                        format=fmt)\n",
+    "        \n",
     "        if max_cells == 0:\n",
     "            max_cells = get_num_cells(filename, loc, channel)\n",
     "            if max_cells is None:\n",
     "                raise ValueError(f\"No raw images found for {qm}\")\n",
     "            else:\n",
     "                cells = np.arange(max_cells)\n",
-    "            \n",
     "        if acq_rate == 0.:\n",
     "            acq_rate = get_acq_rate(filename, loc, channel)\n",
     "        else:\n",
     "            acq_rate = None\n",
-    "\n",
     "        if dbparms[2] == 0:\n",
     "            dbparms[2] = max_cells\n",
     "        if dbparms[5] == 0:\n",
@@ -468,6 +494,7 @@
     "        infile = h5py.File(filename, \"r\", driver=\"core\")\n",
     "        outfile = h5py.File(filename_out, \"w\")\n",
+    "\n",
     "        try:\n",
     "            agipd_corr = AgipdCorrections(infile, outfile, max_cells, channel, max_pulses,\n",
     "                                          bins_gain_vs_signal, bins_signal_low_range,\n",
@@ -480,7 +507,7 @@
     "                                          force_mg_if_below=force_mg_if_below, mask_noisy_adc=mask_noisy_adc,\n",
     "                                          acquisition_rate=acq_rate, gain_setting=gain_setting,\n",
     "                                          corr_bools=corr_bools)\n",
-    "\n",
+    "            logger.debug(\"init_class \\n\")\n",
     "            blc_noise_threshold, blc_hmatch, melt_snow = special_opts\n",
     "            if not corr_bools[\"only_offset\"]:\n",
     "                blc_hmatch = False\n",
@@ -491,9 +518,14 @@
     "                agipd_corr.get_valid_image_idx()\n",
     "            except IOError:\n",
     "                return\n",
+    "            logger.debug(\"getting valid_image \\n\")\n",
+    "            \n",
     "            device = getattr(getattr(Detectors, dinstance), qm)\n",
     "            \n",
     "            # check if there is a yaml file in out_folder that has the device constants.\n",
+    "            slag = fileparms != \"\"\n",
+    "            logger.debug(f\"retrieving constants. yaml: {const_yaml}, not nodb: {not nodb}, fileparms: {slag}, device [{device}] {device.device_name in const_yaml}\\n\") \n",
+    "\n",
     "            if not nodb:\n",
     "                if const_yaml and device.device_name in const_yaml:\n",
     "                    print(fileparms != \"\")\n",
@@ -502,25 +534,26 @@
     "                else:\n",
     "                    when = agipd_corr.initialize_from_db(dbparms, qm,\n",
     "                                                         only_dark=(fileparms != \"\"))\n",
-    "\n",
+    "            \n",
     "            if fileparms != \"\" and not corr_bools[\"only_offset\"]:\n",
     "                agipd_corr.initialize_from_file(fileparms, qm, with_dark=nodb)\n",
     "            print(\"Initialized constants\")\n",
-    "\n",
+    "            \n",
+    "            logger.debug(\"retrieved constants \\n\") \n",
+    "            \n",
     "            for irange in agipd_corr.get_iteration_range():\n",
     "                agipd_corr.correct_agipd(irange)\n",
-    "                print(\"Iterated\")\n",
-    "            print(\"All iterations are finished\")\n",
+    "                #print(\"Iterated\")\n",
+    "            logger.debug(\"All iterations are finished \\n\")\n",
     "            hists, edges = agipd_corr.get_histograms()\n",
     "            hists_signal_low, hists_signal_high, hists_gain_vs_signal, hists_dig_gain_vs_signal, hist_pulses = hists\n",
     "            low_edges, high_edges, signal_edges, dig_signal_edges = edges\n",
     "            gain_stats = np.array(agipd_corr.gain_stats)\n",
-    "            \n",
+    "            logger.debug(\"history_data \\n\")\n",
     "        finally:\n",
     "            outfile.close()\n",
     "            infile.close()\n",
     "            print(\"Closed files\")\n",
-    "        \n",
     "    except Exception as e:\n",
     "        err = f\"Error: {e}\\nError traceback: {traceback.format_exc()}\"\n",
     "        print(err)\n",
@@ -598,6 +631,7 @@
     "        fout = os.path.abspath(\"{}/{}\".format(out_folder, (os.path.split(fname_in)[-1]).replace(\"RAW\", \"CORR\")))\n",
     "        if first:\n",
     "            first_files.append((fname_in, fout))\n",
+    "        \n",
     "        inp.append((fname_in, fout, i,  qm))\n",
     "    first = False\n",
     "    \n",
@@ -608,11 +642,11 @@
     "                    sequences_qm, bins_gain_vs_signal, bins_signal_low_range, bins_signal_high_range,\n",
     "                    bins_dig_gain_vs_signal, max_pulses, dbparms, fileparms, nodb, chunk_size_idim,\n",
     "                    special_opts, il_mode, karabo_id, dinstance, force_hg_if_below, force_mg_if_below,\n",
-    "                    mask_noisy_adc, acq_rate, gain_setting, corr_bools, h5path, h5path_idx, const_yaml)\n",
+    "                    mask_noisy_adc, acq_rate, gain_setting, corr_bools, h5path, h5path_idx, const_yaml, out_folder)\n",
     "        r = view.map_sync(p, inp)\n",
     "        #r = list(map(p, inp))\n",
-    "\n",
+    "        \n",
     "        inp = []\n",
     "        left -= MAX_PAR\n",
@@ -634,7 +668,6 @@
     "                    hists_gain_vs_signal += hg.astype(np.float64)\n",
     "                    hists_dig_gain_vs_signal += hdg.astype(np.float64)\n",
     "                    gain_stats += gs\n",
-    "    \n",
     "    done = all(dones)\n",
     "print(f\"Corrected raw data of {cells} memory cells and {acq_rate} MHz acquisition rate\")"
diff --git a/requirements.txt b/requirements.txt
index ab8532286..7f24418b5 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -8,11 +8,16 @@ extra_geom == 0.8.0
 fabio == 0.9.0
 gitpython == 3.1.0
 h5py == 2.10.0
-influxdb == 5.2.3
 iminuit == 1.3.8
+influxdb == 5.2.3
 ipyparallel == 6.2.4
+ipykernel == 5.1.4
+ipython == 7.12.0
+ipython_genutils == 0.2.0
 jupyter == 1.0.0
+jupyter_client == 5.3.4
 jupyter_console == 6.1.0
+jupyter-core == 4.6.1
 karabo_data == 0.7.0
 lxml == 4.5.0
 metadata_client == 3.0.5
diff --git a/xfel_calibrate/ b/xfel_calibrate/
index 6a9e40efd..c745c0f4c 100755
--- a/xfel_calibrate/
+++ b/xfel_calibrate/
@@ -291,7 +291,7 @@ def balance_sequences(in_folder, run, sequences, sequences_per_node,
         sequences_per_node += 1
         nsplits = len(seq_nums) // sequences_per_node + 1
         print("Changed to {} sequences per node".format(sequences_per_node))
-        print("to have a maximum of 8 concurrent jobs")
+        print(f"to have a maximum of {max_nodes} concurrent jobs")
     return [l.tolist() for l in np.array_split(list(seq_nums), nsplits) if
             l.size > 0]