From e5e36d8baf145a6b1957760d53f2fe727ef1f4f6 Mon Sep 17 00:00:00 2001
From: Rafael Gort <rafael.gort@xfel.eu>
Date: Mon, 14 Sep 2020 23:26:22 +0200
Subject: [PATCH] Simplified input for xgm-normalization, cleaned code
 structure, updated test suites

---
 src/toolbox_scs/detectors/dssc.py            |  26 +++--
 src/toolbox_scs/detectors/dssc_misc.py       |  17 +--
 src/toolbox_scs/detectors/dssc_processing.py | 105 +++++++++++++------
 src/toolbox_scs/detectors/xgm.py             |   4 +-
 src/toolbox_scs/test/test_dssc_cls.py        |  85 ++++++++++++---
 5 files changed, 168 insertions(+), 69 deletions(-)

diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py
index e85baa4..2231d4a 100644
--- a/src/toolbox_scs/detectors/dssc.py
+++ b/src/toolbox_scs/detectors/dssc.py
@@ -116,16 +116,16 @@ class DSSCBinner:
             log.info(msg+", no binner created")
             raise ToolBoxValueError(msg, name)
 
-    def load_xgm(self, xgm_frame_coords=None):
+    def load_xgm(self, xgm_coord_stride=1):
         """
         load xgm data and construct coordinate array according to corresponding
         dssc frame number.
         """
-        self.xgm = get_xgm_formatted(self.run, self.xgm_name, xgm_frame_coords)
+        self.xgm = get_xgm_formatted(self.run, self.xgm_name, xgm_coord_stride)
         log.info('loaded formatted xgm data')
 
     def create_xgm_mask(self,
-                        xgm_threshold=(0, np.inf), xgm_frame_coords=None):
+                        xgm_threshold=(0, np.inf), normevery=1):
         """
         creates a mask for dssc frames according to measured xgm intensity.
         """
@@ -137,8 +137,8 @@ class DSSCBinner:
                                       dims=['trainId', 'pulse'],
                                       coords={'trainId': trainIds,
                                               'pulse': range(fpt)})
-        
-        self.load_xgm(xgm_frame_coords=xgm_frame_coords)
+
+        self.load_xgm(xgm_coord_stride=normevery)
         valid = (self.xgm > xgm_threshold[0]) * \
                 (self.xgm < xgm_threshold[1])
         self.pulsemask = \
@@ -186,7 +186,7 @@ class DSSCBinner:
     def process_data(self, modules=[],
                      chunksize=512, backend='loky', n_jobs=None,
                      dark_image=None,
-                     xgm_normalization=False, xgm_frame_coords=None
+                     xgm_normalization=False, normevery=1
                     ):
         """
         Load and bin dssc data according to self.bins
@@ -200,14 +200,20 @@ class DSSCBinner:
             experimental and not appropriately implemented in the dbdet member
             function 'bin_data'.
         n_jobs: int
-            number of cpu's used per sub-process. Note that when using the
-            default backend there is no need to adjust this parameter with the
-            current implementation.
+            inversely proportional of the number of cpu's available for one 
+            job. Tasks within one job can grab a maximum of n_CPU_tot/n_jobs of 
+            cpu's.
+            Note that when using the default backend there is no need to adjust 
+            this parameter with the current implementation.
         modules: list of ints
             a list containing the module numbers that should be processed. If
             empty, all modules are processed.
         chunksize: int
             The number of trains that should be read in one iterative step.
+        dark_image: xarray.DataArray
+            DataArray with dimensions compatible with the loaded dssc data.
+        normevery: int
+            integer indicating which out of normevery frame will be normalized.
 
         Returns
         -------
@@ -241,7 +247,7 @@ class DSSCBinner:
                 dark_image=dark,
                 xgm_normalization=xgm_normalization,
                 xgm_mnemonic=self.xgm_name,
-                xgm_frame_coords=xgm_frame_coords,
+                normevery=normevery,
             ))
 
         data = None
diff --git a/src/toolbox_scs/detectors/dssc_misc.py b/src/toolbox_scs/detectors/dssc_misc.py
index ce9eaee..ea92d02 100644
--- a/src/toolbox_scs/detectors/dssc_misc.py
+++ b/src/toolbox_scs/detectors/dssc_misc.py
@@ -94,7 +94,7 @@ def create_dssc_bins(name, coordinates, bins):
                      'trainId, x, or y')
 
 
-def get_xgm_formatted(run, xgm_name, frame_coords):
+def get_xgm_formatted(run, xgm_name, xgm_coord_stride=1):
     """
     Load the xgm data and define coordinates along the pulse dimension.
 
@@ -102,8 +102,10 @@ def get_xgm_formatted(run, xgm_name, frame_coords):
     ----------
     run: extra_data.DataCollection
         DataCollection object providing access to the xgm data to be loaded
-    pulse_bins: list, numpy.ndarray
-        bins along the pulse dimension not containing darks
+    xgm_name: str
+        valid mnemonic of a xgm source 
+    xgm_coord_stride: int
+        defines which dssc frames should be normalized using data from the xgm.
 
     Returns
     -------
@@ -111,11 +113,10 @@ def get_xgm_formatted(run, xgm_name, frame_coords):
         xgm data with coordinate 'pulse'.
     """
     xgm = load_xgm(run, xgm_name)
-    if frame_coords is None:
-        xgm_frame_coords = np.linspace(0, xgm.shape[1]-1, xgm.shape[1], 
-                                       dtype=np.uint64)
-    else:
-        xgm_frame_coords = frame_coords
+    xgm_frame_coords = np.linspace(0,
+                                   (xgm.shape[1]-1)*xgm_coord_stride, 
+                                   xgm.shape[1],
+                                   dtype=np.uint64)
     xgm = xgm.rename(new_name_or_name_dict={'XGMbunchId':'pulse'})
     xgm['pulse'] = xgm_frame_coords
     return xgm
diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py
index b31e025..fb8eb0d 100644
--- a/src/toolbox_scs/detectors/dssc_processing.py
+++ b/src/toolbox_scs/detectors/dssc_processing.py
@@ -12,6 +12,7 @@ import pandas as pd
 
 import extra_data as ed
 
+from .dssc_misc import get_xgm_formatted
 from ..constants import mnemonics as _mnemonics
 
 log = logging.getLogger(__name__)
@@ -101,16 +102,44 @@ def load_chunk_data(sel, sourcename, maxframes=None):
                         ).unstack('trainId_pulse')
     data = data.transpose('trainId', 'pulse', 'x', 'y')
 
-    log.debug(f"loaded chunk data")
+    log.debug(f"loaded and formatted chunk of dssc data")
     return data.loc[{'pulse': np.s_[:maxframes]}]
 
 
+def _load_chunk_xgm(sel, xgm_mnemonic, stride):
+    """
+    Load selected xgm data. 
+
+    Copyright (c) 2020, SCS-team
+
+    Parameters
+    ----------
+    sel: extra_data.DataCollection
+        a DataCollection or a subset of a DataCollection obtained by its
+        select_trains() method
+    xgm_mnemonic: str
+        a valid mnemonic for an xgm source from the tb.mnemonic dictionary
+    stride: int
+        indicating which dssc frames should be normalized using the xgm data.
+
+    Returns
+    -------
+    xarray.DataArray
+    """
+    d = sel.get_array(*_mnemonics[xgm_mnemonic].values())
+    d = d.rename(new_name_or_name_dict={'XGMbunchId':'pulse'})
+    frame_coords = np.linspace(0,(d.shape[1]-1)*stride, d.shape[1], dtype=int)
+    d = d.assign_coords({'pulse':frame_coords})
+    log.debug(f"loaded and formatted chunk of xgm data")
+    return d
+    
+
 def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, 
                       pulsemask=None,
                       dark_image=None,
                       xgm_normalization=False,
                       xgm_mnemonic='SCS_SA3',   
-                      xgm_frame_coords=None
+                      normevery=1
                      ):
     """
     Collects and reduces DSSC data  for a single module.
@@ -143,81 +172,89 @@ def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners,
         true if the data should be divided by the corresponding xgm value.
     xgm_mnemonic: str
         Mnemonic of the xgm data to be used for normalization.
-    xgm_frame_coords: numpy.ndarray
-        An array indicating to which dssc frame a xgm value corresponds to.
+    normevery: int
+        One out of normevery dssc frames will be normalized.
 
     Returns
     -------
     module_data: xarray.Dataset
         xarray datastructure containing data binned according to bins.
     """
-
-    sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
-    collection = ed.open_run(proposal, run_nr,
-                             include=f'*DSSC{module:02d}*')
+    # metadata definition
     ntrains = info['number_of_trains']
     fpt = info['frames_per_train']
     chunks = np.arange(ntrains, step=chunksize)
+    sourcename_dssc = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
 
+    # open extra_data run objects
+    collection_DSSC = ed.open_run(proposal, run_nr,
+                                  include=f'*DSSC{module:02d}*')
     collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*')
     log.info(f"Processing dssc module {module}: start")
 
-    # progress bar
+    # create empty dataset for dssc data to be filled iteratively
+    module_data = create_empty_dataset(info, dssc_binners)
+
+    # progress bar for module 15
     if module == 15:
         pbar = tqdm(total=len(chunks))
 
-    # create empty dataset to be filled iteratively
-    module_data = create_empty_dataset(info, dssc_binners)
-
-    # load data chunk by chunk and merge
+    # load data chunk by chunk and merge result into prepared empty dataset
     for chunk in chunks:
-        sel = collection.select_trains(
-                            ed.by_index[chunk:chunk + chunksize])
-        sel_DA1 = collection_DA1.select_trains(
-                            ed.by_index[chunk:chunk + chunksize])
-
         log.debug(f"Module {module}: "
                   f"load trains {chunk}:{chunk + chunksize}")
 
-        chunk_data = load_chunk_data(sel, sourcename)
+        sel = collection_DSSC.select_trains(
+                                    ed.by_index[chunk:chunk + chunksize])
+        chunk_data = load_chunk_data(sel, sourcename_dssc)
         chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
 
-        # prefiltering -> xgm pulse masking, and related creation of
-        # histogram subset
+        # ---------------------------------------------------------------------
+        # optional blocks -> ToDo: see merge request !89
+        # ---------------------------------------------------------------------
+        # option 1: prefiltering -> xgm pulse masking
         if pulsemask is not None:
             log.debug(f'Module {module}: drop out-of-bounds frames')
+            # relatively slow. ToDo -> wrap using np.ufunc
             chunk_data = chunk_data.where(pulsemask)
             chunk_hist = chunk_hist.where(pulsemask)
-        chunk_data = chunk_data.to_dataset(name='data')
-        chunk_data['hist'] = chunk_hist
 
+        # option 2: substraction of dark image/s
         if dark_image is not None:
             log.debug(f'Module {module}: substract dark')
-            chunk_data['data'] = chunk_data['data'] - dark_image
+            chunk_data.values = chunk_data.values - dark_image.values
+            # slower: using xarray directly
+            #chunk_data = chunk_data - dark_image
 
+        # option 3: normalize dssc frames by their xgm values
         if xgm_normalization:
-            # the commented lines use numpys fast vectorized operations
-            # -> to be implemented generally
             log.debug(f'Module {module}: load and format xgm data')
-            data_xgm = sel_DA1.get_array(*_mnemonics[xgm_mnemonic].values())
-            data_xgm = data_xgm.rename(
-                        new_name_or_name_dict={'XGMbunchId':'pulse'})
-            data_xgm = data_xgm.assign_coords({'pulse':xgm_frame_coords})
+            sel_DA1 = collection_DA1.select_trains(
+                                        ed.by_index[chunk:chunk + chunksize])
+            chunk_xgm = _load_chunk_xgm(sel_DA1, xgm_mnemonic, normevery)
 
             log.debug(f'Module {module}: normalize chunk_data using xgm')
-            chunk_data['data'] = chunk_data['data'] / data_xgm
-            #chunk_data['data'].values[:,0::2,:,:] = \
-            #    np.divide(chunk_data['data'][:,0::2,:,:], data_xgm)
+            # the following line uses numpys fast vectorized array operation
+            # there is overhead coming from rewriting the xarrayDataarray
+            chunk_data.values[:,0::normevery,:,:] = \
+                np.divide(chunk_data[:,0::normevery,:,:], chunk_xgm)
+            # slow code using xarray directly
+            #chunk_data = chunk_data / chunk_xgm
+        # ---------------------------------------------------------------------
+        # end optional blocks: xarray operations from here on.
+        # ---------------------------------------------------------------------
 
         # data reduction -> apply binners
         log.debug(f'Module {module}: apply binning to chunk_data')
+        chunk_data = chunk_data.to_dataset(name='data')
+        chunk_data['hist'] = chunk_hist
         for b in dssc_binners:
             chunk_data[b+"_binned"] = dssc_binners[b]
             chunk_data = chunk_data.groupby(b+"_binned").sum(b)
             chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
 
-        log.debug(f'Module {module}: merge junk data')
         # add chunk data to loaded data
+        log.debug(f'Module {module}: merge junk data')
         for var in ['data', 'hist']:
             module_data[var] = xr.concat([module_data[var],
                                          chunk_data[var]],
diff --git a/src/toolbox_scs/detectors/xgm.py b/src/toolbox_scs/detectors/xgm.py
index a7583e4..20e65f5 100644
--- a/src/toolbox_scs/detectors/xgm.py
+++ b/src/toolbox_scs/detectors/xgm.py
@@ -45,10 +45,10 @@ def load_xgm(run, xgm_mnemonic='SCS_SA3'):
     if len(nbunches) == 1:
         nbunches = nbunches[0]
     else:
-        #log.warning('not all trains have same nbunches')
+        log.info('change of pulse pattern in sase3 during the run.')
         nbunches = max(nbunches)
 
-    log.info(f'SASE3 bunches per train: {nbunches}')
+    log.info(f'maximum number of bunches per train in sase3: {nbunches}')
     xgm = run.get_array(*_mnemonics[xgm_mnemonic].values(),
                         roi=ed.by_index[:nbunches])
     return xgm
diff --git a/src/toolbox_scs/test/test_dssc_cls.py b/src/toolbox_scs/test/test_dssc_cls.py
index d49ed95..e4d7e7a 100644
--- a/src/toolbox_scs/test/test_dssc_cls.py
+++ b/src/toolbox_scs/test/test_dssc_cls.py
@@ -26,6 +26,7 @@ suites = {"no-processing": (
                 #"test_process_masking",
                 #"test_process_normalization",
                 "test_normalization_all",
+                #"test_normalization_all2",
                 )
           }
 
@@ -87,9 +88,8 @@ class TestDSSC(unittest.TestCase):
         run235 = tbdet.DSSCBinner(2212, 235)
         run235.add_binner('trainId', binner1)
         run235.add_binner('pulse', binner2)
-        xgm_coord_dssc = np.linspace(0,999,1000, dtype=int)
         xgm_threshold=(300.0, 8000.0)
-        run235.create_xgm_mask(xgm_threshold, xgm_coord_dssc)
+        run235.create_xgm_mask(xgm_threshold, 1)
 
         self.assertEqual(run235.binners['trainId'].values[0],
                          np.float32(7585.52))
@@ -122,9 +122,11 @@ class TestDSSC(unittest.TestCase):
         binners = {'trainId': binner1, 'pulse': binner2}
         run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
         mod_list = [2,15]
-        data = run232.process_data(modules=mod_list)
+        data = run232.process_data(modules=mod_list, chunksize=128)
         self.assertIsNotNone(data.data)
-        data = run232.process_data(backend='multiprocessing', modules=mod_list)
+        data = run232.process_data(backend='multiprocessing', n_jobs=10,
+                                   modules=mod_list,
+                                   chunksize=128)
         self.assertIsNotNone(data.data)
 
         tbdet.save_xarray('./tmp/run232.h5', data)
@@ -154,9 +156,9 @@ class TestDSSC(unittest.TestCase):
         binners = {'trainId': binner1, 'pulse': binner2}
         bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners)
 
-        xgm_coord_dssc = np.linspace(0,999,1000, dtype=int)
+        dssc_norm_every = 1
         xgm_threshold=(300.0, 8000.0)
-        bin_obj.create_xgm_mask(xgm_threshold, xgm_coord_dssc)
+        bin_obj.create_xgm_mask(xgm_threshold, dssc_norm_every)
 
         data = bin_obj.process_data(modules=[3], chunksize=248)
         self.assertIsNotNone(dark_data.data)
@@ -184,13 +186,13 @@ class TestDSSC(unittest.TestCase):
         binners = {'trainId': binner1, 'pulse': binner2}
         bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners)
 
-        xgm_coord_dssc = np.linspace(0,999,1000, dtype=int)
+        dssc_norm_every = 1
         xgm_threshold=(300.0, 8000.0)
-        bin_obj.create_xgm_mask(xgm_threshold, xgm_coord_dssc)
+        bin_obj.create_xgm_mask(xgm_threshold, dssc_norm_every)
 
         data = bin_obj.process_data(modules=[3], chunksize=248,
                                          xgm_normalization=True,
-                                         xgm_frame_coords=xgm_coord_dssc)
+                                         normevery=dssc_norm_every)
         self.assertIsNotNone(data.data)
 
     def test_normalization_all(self):
@@ -212,7 +214,7 @@ class TestDSSC(unittest.TestCase):
         binners = {'trainId': binner1, 'pulse': binner2}
         bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners)
 
-        dark_data = bin_obj.process_data(modules=[3,4,5,15], chunksize=248)
+        dark_data = bin_obj.process_data(modules=[2,3,4,15], chunksize=248)
         self.assertIsNotNone(dark_data.data)
 
         run_nb = 235
@@ -235,16 +237,69 @@ class TestDSSC(unittest.TestCase):
         binners = {'trainId': binner1, 'pulse': binner2}
         bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners)
 
-        xgm_coord_dssc = np.linspace(0,999,1000, dtype=int)
+        dssc_norm_every = 1
         xgm_threshold=(300.0, 8000.0)
-        bin_obj.create_xgm_mask(xgm_threshold, xgm_coord_dssc)
+        bin_obj.create_xgm_mask(xgm_threshold, dssc_norm_every)
 
-        data = bin_obj.process_data(modules=[3,4,5,15], chunksize=248,
-                                dark_image=dark_data['data'][:,0,0,:,:],
+        data = bin_obj.process_data(modules=[2,3,4,15], chunksize=248,
+                                dark_image=dark_data['data'],
                                 xgm_normalization=True,
-                                xgm_frame_coords=xgm_coord_dssc)
+                                normevery=dssc_norm_every)
         self.assertIsNotNone(data.data)
 
+    def test_normalization_all2(self):
+        proposal_nb = 2530
+
+        # dark
+        run_nb = 49
+        run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
+        fpt = run_info['frames_per_train']
+        n_trains = run_info['number_of_trains']
+        trainIds = run_info['trainIds']
+
+        buckets_train = np.zeros(n_trains)
+        buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] 
+
+        binner1 = tbdet.create_dssc_bins("trainId",
+                                         trainIds,
+                                         buckets_train)
+        binner2 = tbdet.create_dssc_bins("pulse",
+                                         np.linspace(0,fpt-1,fpt, dtype=int),
+                                         buckets_pulse)
+        binners = {'trainId': binner1, 'pulse': binner2}
+        bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
+        dark = bin_obj.process_data(modules=[15], chunksize=248)
+
+        # run to normalize
+        run_nb = 50
+        run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
+        fpt = run_info['frames_per_train']
+        n_trains = run_info['number_of_trains']
+        trainIds = run_info['trainIds']
+
+        buckets_train = np.zeros(n_trains)
+        buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] 
+
+        binner1 = tbdet.create_dssc_bins("trainId",
+                                         trainIds,
+                                         buckets_train)
+        binner2 = tbdet.create_dssc_bins("pulse",
+                                         np.linspace(0,fpt-1,fpt, dtype=int),
+                                         buckets_pulse)
+        binners = {'trainId': binner1, 'pulse': binner2}
+        bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
+
+        bin_params = {'modules':[15],
+                      'chunksize':248,
+                      'xgm_normalization':True,
+                      'normevery':2,
+                      'dark_image':dark['data'][:,0,0,:,:]
+                     }
+
+        data = bin_obj.process_data(**bin_params)
+        self.assertIsNotNone(data.data)
+
+
 
 def list_suites():
     print("\nPossible test suites:\n" + "-" * 79)
-- 
GitLab