From 0c02120bfd11d6ced4167b2df6e9a77db5ba713b Mon Sep 17 00:00:00 2001
From: Karim Ahmed <>
Date: Wed, 27 May 2020 23:47:17 +0200
Subject: [PATCH] add more fixes

 cal_tools/cal_tools/               | 82 ++++++++++++++-----
 .../AGIPD/AGIPD_Correct_and_Verify.ipynb      | 32 +++++---                                      |  1 +
 webservice/                      |  2 +-
 webservice/webservice.yaml                    |  4 +-
 xfel_calibrate/                   |  2 +-
 6 files changed, 87 insertions(+), 36 deletions(-)

diff --git a/cal_tools/cal_tools/ b/cal_tools/cal_tools/
index bbc628415..020d86abb 100644
--- a/cal_tools/cal_tools/
+++ b/cal_tools/cal_tools/
@@ -1,7 +1,7 @@
 import copy
 from enum import Enum
 import os
+import gc
 import h5py
 import numpy as np
 from scipy.signal import cwt, ricker
@@ -13,6 +13,8 @@ from cal_tools.enums import BadPixels
 from import get_constant_from_db, get_constant_from_db_and_time
 from iCalibrationDB import Constants, Conditions, Detectors
+import logging
 def get_num_cells(fname, loc, module):
     with h5py.File(fname, "r") as f:
@@ -114,7 +116,7 @@ class AgipdCorrections:
                  cal_det_instance="AGIPD1M1", karabo_data_mode=False,
                  force_hg_if_below=None, force_mg_if_below=None,
                  mask_noisy_adc=False, acquisition_rate=None, gain_setting=None,
-                 corr_bools=None):
+                 corr_bools=None, logfile=None):
         Initialize an AgipdCorrections Class
@@ -192,6 +194,28 @@ class AgipdCorrections:
         self.frac_high_med = 0
         self.md_additional_offset = 0
         self.xray_cor = 0
+        self.logfile = logfile
+        self.logger = logging.getLogger('server_logger2')
+        self.logger.setLevel(logging.DEBUG)
+        # create file handler which logs even debug messages
+        fh = logging.FileHandler(logfile)
+        fh.setLevel(logging.DEBUG)
+        # create console handler with a higher log level
+        ch = logging.StreamHandler()
+        ch.setLevel(logging.ERROR)
+        # create formatter and add it to the handlers
+        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
+        ch.setFormatter(formatter)
+        fh.setFormatter(formatter)
+        # add the handlers to logger
+        self.logger.addHandler(ch)
+        self.logger.addHandler(fh)
+        fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
+        logging.basicConfig(filename=logfile,
+                        level=getattr(logging, "DEBUG"),
+                        format=fmt)
         # check if given corr_bools are correct
         tot_corr_bools = ['only_offset', 'adjust_mg_baseline', 'rel_gain',
@@ -323,12 +347,6 @@ class AgipdCorrections:
             print("Threshold medians are {}".format(
                 np.nanmedian(self.thresholds, axis=(0, 1, 2))))
-        # delete unneeded parameters.
-        todel = [offset, rel_gain, xray_cor, mask, noise, thresholds,
-                 swap_axis, frac_high_med, md_additional_offset]
-        for d in todel:
-            if d is not None:
-                del d
     def get_shadowed_stripe(self, data, threshold, fraction):
         Return list of shadowed regions.
@@ -914,7 +932,7 @@ class AgipdCorrections:
         if not self.initialized:
             raise RuntimeError("Must call initialize() first!")
+        self.logger.debug("check init \n")
         if not self.karabo_data_mode:
             agipd_base = self.agipd_base
             cidx = self.cidx
@@ -935,6 +953,7 @@ class AgipdCorrections:
             # this far end of the image index range we are working on
             nidx = int(cidx + irange.size)
+            self.logger.debug("Not karabo mode \n")
             cidx = 1  # do not produce any histograms
             im = irange['']
             trainId = np.squeeze(irange['image.trainId'])
@@ -943,12 +962,15 @@ class AgipdCorrections:
             cellid = np.squeeze(irange['image.cellId'])
             length = np.squeeze(irange['image.length'])
+        self.logger.debug("splitting image \n")
         # split off image and gain into two separate arrays
         im, ga = self.gsfun(im)
+        self.logger.debug("float image \n")
         # we will work on float data from now on
         im = im.astype(np.float32)
+        self.logger.debug("copy raw \n")
         # some correction require us to maintain a copy of the raw data
         raw = copy.copy(im)
@@ -961,6 +983,7 @@ class AgipdCorrections:
             length = length[0::2]
         # first evaluate the gain into 0, 1, 2 --> high, medium, low
+        self.logger.debug("evaluating gain \n")
         gain = np.zeros(ga.shape, np.uint8)
         gain[...] = 0
         t0 = self.thresholds[..., 0]
@@ -971,11 +994,12 @@ class AgipdCorrections:
         gain[(ga > t1[cellid, ...]) & (
                 t1[cellid, ...] > 1.05 * t0[cellid, ...])] = 2
+        self.logger.debug("check  force_mg_if_below \n")
         # force into high or medium gain if requested
         if self.force_mg_if_below is not None and self.force_mg_if_below > 0:
             gain[(gain == 2) & ((im - self.offset[
                 cellid, ..., 1]) < self.force_mg_if_below)] = 1
+        self.logger.debug("check  force_hg_if_below \n")
         if self.force_hg_if_below is not None and self.force_hg_if_below > 0:
             gain[(gain > 0) & ((im - self.offset[
                 cellid, ..., 0]) < self.force_hg_if_below)] = 0
@@ -986,6 +1010,7 @@ class AgipdCorrections:
         # check if any data has zero standard deviation, and mark this in
         # the bad pixel maks
         # this can be done on otherwise not corrected data.
+        self.logger.debug("check  if any data has zero standard deviation \n")
         if self.sig_zero_mask is None:
             self.sig_zero_mask = np.zeros(
                 (self.max_cells, im.shape[1], im.shape[2]), np.uint32)
@@ -993,7 +1018,7 @@ class AgipdCorrections:
                 std = np.nanstd(im[cellid == c, ...], axis=0)
                     c, std == 0] = BadPixels.DATA_STD_IS_ZERO.value
+        self.logger.debug("check the first chunk \n")
         # for feedback we produced histograms for the first chunk
         if cidx == 0:
             H, xe, ye = np.histogram2d(im.flatten(), ga.flatten(),
@@ -1007,31 +1032,31 @@ class AgipdCorrections:
                                        range=[[4000, 8192], [0, 4]])
             self.hists_dig_gain_vs_signal += H
             self.dig_signal_edges = (xe, ye)
+        self.logger.debug("check correct constants depending \n")
         # now get the correct constants depending on cell id
         offsetb = self.offset[cellid, ...]
         tmask = self.mask[cellid, ...]
+        self.logger.debug("choose constants according to gain setting \n")
         # choose constants according to gain setting
         off = np.choose(gain,
                         (offsetb[..., 0], offsetb[..., 1], offsetb[..., 2]))
         msk = np.choose(gain, (tmask[..., 0], tmask[..., 1], tmask[..., 2]))
+        self.logger.debug("get the correct rel_gain depending on cell-id \n")
         # same for relative gain and then bad pixel mask
         if hasattr(self, "rel_gain"):
             # get the correct rel_gain depending on cell-id
             rc = self.rel_gain[cellid, ...]
             rel_cor = np.choose(gain, (rc[..., 0], rc[..., 1], rc[..., 2]))
+        self.logger.debug("scale raw gain for use in the identifying snowy pixels \n")
         # scale raw gain for use in the identifying snowy pixels
         rgain = None
         if self.melt_snow is not False:
             rgain = ga / t0[cellid, ...]
+        self.logger.debug("subtract offset \n")
         # subtract offset
         im -= off
+        self.logger.debug("evaluate any baseline shifts \n")
         # before doing relative gain correction we need to evaluate any
         # baseline shifts
         # as they are effectively and additional offset in the data
@@ -1083,22 +1108,27 @@ class AgipdCorrections:
         # now we can correct for relative gain if requested
         if self.corr_bools.get("rel_gain") and hasattr(self, "rel_gain"):
+            self.logger.debug("correct for relative gain \n")
             im *= rel_cor
         if self.corr_bools.get("adjust_mg_baseline"):
+            self.logger.debug("correct adjust_mg_baseline \n")
             mgbc = self.md_additional_offset[cellid, ...]
             im[gain == 1] += mgbc[gain == 1]
         # Set negative values for medium gain to 0
         if self.corr_bools.get('blc_set_min'):
+            self.logger.debug("correct blc_set_min \n")
             im[(im < 0) & (gain == 1)] = 0
         # Do xray correction if requested
         if self.corr_bools.get("xray_corr"):
+            self.logger.debug("correct xray_corr \n")
             im *= self.xray_cor
         # try to identify snowy pixels at this point
         if self.melt_snow is not False:
+            self.logger.debug("correct melt_snow\n")
             ms = self.melt_snow
             im, gain, snowmask = self.melt_snowy_pixels(raw,
@@ -1129,7 +1159,7 @@ class AgipdCorrections:
         msk |= self.sig_zero_mask[cellid, ...]
         if self.melt_snow is not False:
             msk |= snowmask
+        self.logger.debug("for the first chunk output some statistics\n")
         # for the first chunk output some statistics
         if cidx == 0:
             copim = copy.copy(im)
@@ -1164,6 +1194,7 @@ class AgipdCorrections:
         # apply noisy ADC mask if requested
         if self.mask_noisy_adc is not None and self.mask_noisy_adc != 0:
+            self.logger.debug("mask_noisy_adc\n")
             if self.adc_mask is None:
                 self.adc_mask = self.make_noisy_adc_mask(msk)
             msk |= self.adc_mask
@@ -1172,16 +1203,26 @@ class AgipdCorrections:
         sd = 1 if not self.il_mode else 2
         if not self.karabo_data_mode:
+            self.logger.debug("now write out the data\n")
             self.ddset[cidx // sd:nidx // sd, ...] = im
+            self.logger.debug("1\n")
             self.gdset[cidx // sd:nidx // sd, ...] = gain
+            self.logger.debug("2\n")
             self.mdset[cidx // sd:nidx // sd, ...] = msk
+            self.logger.debug("3\n")
             self.outfile[agipd_base + "image/cellId"][cidx:nidx] = cellid
+            self.logger.debug("4\n")
             self.outfile[agipd_base + "image/trainId"][cidx:nidx] = trainId
+            self.logger.debug("5\n")
             self.outfile[agipd_base + "image/pulseId"][cidx:nidx] = pulseId
+            self.logger.debug("6\n")
             self.outfile[agipd_base + "image/status"][cidx:nidx] = status
+            self.logger.debug("7\n")
             self.outfile[agipd_base + "image/length"][cidx:nidx] = length
+            self.logger.debug("8\n")
             self.cidx = nidx
+            self.logger.debug("now write out the data-finished\n")
             irange[''] = im
             irange['image.gain'] = gain
@@ -1193,6 +1234,7 @@ class AgipdCorrections:
             irange['image.length'] = length
             return irange
     def get_valid_image_idx(self):
         """ Return the indices of valid data
@@ -1627,7 +1669,7 @@ class AgipdCorrections:
             # Calculate relative gain
             rel_gain[..., 0] = pc_high_m / pc_high_ave
             rel_gain[..., 1] = pc_med_m / pc_med_ave * frac_high_med
-            rel_gain[..., 2] = rel_gain[..., 1] * 0.233
+            rel_gain[..., 2] = rel_gain[..., 1] * 4.48
             md_additional_offset = None
             rel_gain = None
diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
index af39d80a4..cbf04cb74 100644
--- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
+++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
@@ -440,13 +440,12 @@
     "        reason = \"\"\n",
     "        filename, filename_out, channel, qm = inp\n",
     "        \n",
-    "        \n",
-    "        \n",
     "        base = os.path.basename(filename_out)\n",
     "        f = os.path.splitext(base)[0]\n",
     "        #Create and configure logger\n",
     "        os.makedirs(f\"{out_folder}/logs\", exist_ok=True)\n",
     "        logfile=f\"{out_folder}/logs/log_partial_{f}.log\"\n",
+    "        logfile_2=f\"{out_folder}/logs/log_partial_{f}_2.log\"\n",
     "        logger = logging.getLogger('server_logger')\n",
     "        logger.setLevel(logging.DEBUG)\n",
     "        # create file handler which logs even debug messages\n",
@@ -466,7 +465,7 @@
     "        logging.basicConfig(filename=logfile,\n",
     "                        level=getattr(logging, \"DEBUG\"),\n",
     "                        format=fmt)\n",
-    "        \n",
+    "\n",
     "        if max_cells == 0:\n",
     "            max_cells = get_num_cells(filename, loc, channel)\n",
     "            if max_cells is None:\n",
@@ -506,7 +505,7 @@
     "                                          cal_det_instance=dinstance, force_hg_if_below=force_hg_if_below,\n",
     "                                          force_mg_if_below=force_mg_if_below, mask_noisy_adc=mask_noisy_adc,\n",
     "                                          acquisition_rate=acq_rate, gain_setting=gain_setting,\n",
-    "                                          corr_bools=corr_bools)\n",
+    "                                          corr_bools=corr_bools, logfile=logfile_2)\n",
     "            logger.debug(\"init_class \\n\")\n",
     "            blc_noise_threshold, blc_hmatch, melt_snow = special_opts\n",
     "            if not corr_bools[\"only_offset\"]:\n",
@@ -519,9 +518,9 @@
     "            except IOError:\n",
     "                return\n",
     "            logger.debug(\"getting valid_image \\n\")\n",
-    "            \n",
+    "\n",
     "            device = getattr(getattr(Detectors, dinstance), qm)\n",
-    "            \n",
+    "\n",
     "            # check if there is a yaml file in out_folder that has the device constants.\n",
     "            slag = fileparms != \"\"\n",
     "            logger.debug(f\"retrieving constants. yaml: {const_yaml}, not nodb: {not nodb}, fileparms: {slag}, device [{device}] {device.device_name in const_yaml}\\n\") \n",
@@ -534,16 +533,19 @@
     "                else:\n",
     "                    when = agipd_corr.initialize_from_db(dbparms, qm,\n",
     "                                                         only_dark=(fileparms != \"\"))\n",
-    "            \n",
+    "\n",
     "            if fileparms != \"\" and not corr_bools[\"only_offset\"]:\n",
     "                agipd_corr.initialize_from_file(fileparms, qm, with_dark=nodb)\n",
     "            print(\"Initialized constants\")\n",
-    "            \n",
+    "\n",
     "            logger.debug(\"retrieved constants \\n\") \n",
-    "            \n",
+    "\n",
     "            for irange in agipd_corr.get_iteration_range():\n",
     "                agipd_corr.correct_agipd(irange)\n",
-    "                #print(\"Iterated\")\n",
+    "               # print(\"Iterated\")\n",
+    "               # logger.debug(f\"{irange}\\n\")\n",
+    "            # clear parameters\n",
+    "            #agipd_corr.clear_constants()\n",
     "            logger.debug(\"All iterations are finished \\n\")\n",
     "            hists, edges = agipd_corr.get_histograms()\n",
     "            hists_signal_low, hists_signal_high, hists_gain_vs_signal, hists_dig_gain_vs_signal, hist_pulses = hists\n",
@@ -553,19 +555,24 @@
     "        finally:\n",
     "            outfile.close()\n",
     "            infile.close()\n",
+    "            logger.debug(\"closed files \\n\")\n",
     "            print(\"Closed files\")\n",
     "    except Exception as e:\n",
     "        err = f\"Error: {e}\\nError traceback: {traceback.format_exc()}\"\n",
     "        print(err)\n",
+    "        logger.debug(f\"{err} \\n\")\n",
     "        success = False\n",
     "        reason = \"Error\"\n",
     "        \n",
     "    finally:\n",
+    "        logger.debug(\"finally\\n\")\n",
     "        run = re.findall(r'.*r([0-9]{4}).*', filename)[0]\n",
     "        proposal = re.findall(r'.*p([0-9]{6}).*', filename)[0]\n",
     "        sequence = re.findall(r'.*S([0-9]{5}).*', filename)[0]\n",
     "        filesize = os.path.getsize(filename)\n",
     "        duration = (\n",
+    "        logger.debug(f\"run:{run}, proposal:{proposal}, qm:{qm}, sequence:{sequence}, filesize:{filesize}, CHUNK_SIZE:{CHUNK_SIZE}, total_sequences:{total_sequences}, success:{success}, duration:{duration}, reason:{reason}\\n\")\n",
+    "        logger.debug(\"finished \\n\")\n",
     "        #influx = create_influx_entry(run, proposal, qm, sequence, filesize, CHUNK_SIZE, total_sequences, success, duration, reason)\n",
     "        #client.write_points([influx])\n",
     "    return (hists_signal_low, hists_signal_high, hists_gain_vs_signal, hists_dig_gain_vs_signal, hist_pulses,\n",
@@ -634,7 +641,6 @@
     "        \n",
     "        inp.append((fname_in, fout, i,  qm))\n",
     "    first = False\n",
-    "    \n",
     "    if len(inp) >= min(MAX_PAR, left):\n",
     "        print(f\"Running {len(inp)} tasks parallel\")\n",
@@ -643,10 +649,10 @@
     "                    bins_dig_gain_vs_signal, max_pulses, dbparms, fileparms, nodb, chunk_size_idim,\n",
     "                    special_opts, il_mode, karabo_id, dinstance, force_hg_if_below, force_mg_if_below,\n",
     "                    mask_noisy_adc, acq_rate, gain_setting, corr_bools, h5path, h5path_idx, const_yaml, out_folder)\n",
-    "\n",
+    "        print(\"MAX_PAR\", MAX_PAR, \"left\", left, \"\\n\")\n",
     "        r = view.map_sync(p, inp)\n",
     "        #r = list(map(p, inp))\n",
-    "        \n",
+    "\n",
     "        inp = []\n",
     "        left -= MAX_PAR\n",
diff --git a/ b/
index e8a538d6a..b93537e26 100644
--- a/
+++ b/
@@ -43,6 +43,7 @@ for ctypes in notebooks.values():
     for nb in ctypes.values():
         data_files += nb.get("dep_notebooks", [])
+        data_files += nb.get("pre_notebooks", [])
     name='European XFEL Offline Calibration',
diff --git a/webservice/ b/webservice/
index 5722c8fc4..a82e50554 100644
--- a/webservice/
+++ b/webservice/
@@ -778,7 +778,7 @@ async def server_runner(config, mode):
                         action=action, instrument=instrument,
                         cycle=cycle, proposal=proposal,
-                        runs="_r".join(wait_runs),
+                        runs="_".join([f"r{r}" for r in wait_runs]),
diff --git a/webservice/webservice.yaml b/webservice/webservice.yaml
index b87d9b7d0..638d08ba8 100644
--- a/webservice/webservice.yaml
+++ b/webservice/webservice.yaml
@@ -31,8 +31,9 @@ correct:
         --slurm-scheduling {sched_prio}
         --request-time {request_time}
         --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_r{runs}
-        --report-to {action}_{det_instance}_{time_stamp}
+        --report-to /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp}
         --cal-db-timeout 300000
+        --cal-db-interface tcp://max-exfl016:8015#8044
     in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw
@@ -45,4 +46,5 @@ dark:
         --request-time {request_time}
         --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_r{runs}
         --report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{detector}/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp}
+        --cal-db-interface tcp://max-exfl016:8015#8044
diff --git a/xfel_calibrate/ b/xfel_calibrate/
index 494d3686f..3b8dfcc07 100644
--- a/xfel_calibrate/
+++ b/xfel_calibrate/
@@ -8,7 +8,7 @@ notebooks = {
             "dep_notebooks": [
-            "concurrency": {"parameter": "modules",
+            "concurrency": {"parameter": "karabo-da",
                             "default concurrency": list(range(16)),
                             "cluster cores": 8},