From 0ac4f5f85d6c0ef265e1e3a9e896052571c57845 Mon Sep 17 00:00:00 2001
From: Rafael Gort <rafael.gort@xfel.eu>
Date: Fri, 28 Aug 2020 22:19:43 +0200
Subject: [PATCH] Added functionality to substract darks and normalize
 according to xgm

---
 src/toolbox_scs/detectors/dssc.py            | 17 +++-
 src/toolbox_scs/detectors/dssc_processing.py | 91 +++++++++++++-------
 src/toolbox_scs/test/test_dssc_cls.py        | 53 ++++++++++++
 src/toolbox_scs/test/test_dssc_methods.py    | 85 ++++++++++++++++++
 4 files changed, 210 insertions(+), 36 deletions(-)

diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py
index 189f7d7..ff808c4 100644
--- a/src/toolbox_scs/detectors/dssc.py
+++ b/src/toolbox_scs/detectors/dssc.py
@@ -36,7 +36,7 @@ log = logging.getLogger(__name__)
 class DSSCBinner:
     def __init__(self, proposal_nr, run_nr,
                  binners={},
-                 use_xgm=False, xgm_threshold=(0, np.inf), xgm_bins=None,
+                 use_xgm=False, xgm_threshold=(0, np.inf), xgm_name='SCS_SA3',
                  use_tim=False
                  ):
         """
@@ -58,6 +58,10 @@ class DSSCBinner:
             pulsemask for additional data filtering.
         xgm_threshold: tuple
             the lower and upper boundary of xgm values rendering valid data.
+        xgm_name: str
+            a valid mnemonic key of the XGM data to be used to mask the dssc
+            frames.
+            -> ToDo: generalize the load_xgm method.
         use_tim: bool
             -> to be implemented. Same role as 'use_xgm'
 
@@ -82,7 +86,6 @@ class DSSCBinner:
         self.run = load_run(proposal_nr, run_nr)
         self.xgmthreshold = xgm_threshold
         self.binners = binners
-
         # ---------------------------------------------------------------------
         # Prepare pulse mask for additional data reduction next to binning
         # ---------------------------------------------------------------------
@@ -94,7 +97,7 @@ class DSSCBinner:
                                       coords={'trainId': self.run.train_ids,
                                               'pulse': range(fpt)})
         if use_xgm:
-            self.xgm = get_xgm_formatted(self.run, xgm_bins)
+            self.xgm = get_xgm_formatted(self.run, None)
             valid = (self.xgm > self.xgmthreshold[0]) * \
                     (self.xgm < self.xgmthreshold[1])
             self.pulsemask = \
@@ -174,7 +177,10 @@ class DSSCBinner:
     # -------------------------------------------------------------------------
     # Data processing
     # -------------------------------------------------------------------------
-    def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None):
+    def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None,
+                 xgm_normalization=False, norm_every=2,
+                 dark_file=None
+                ):
         """
         Load and bin dssc data according to self.bins
 
@@ -221,6 +227,9 @@ class DSSCBinner:
                 chunksize=chunksize,
                 info=self.info,
                 dssc_binners=self.binners,
+                xgm_normalization=xgm_normalization,
+                norm_every=norm_every,
+                dark_file=dark_file
             ))
 
         data = None
diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py
index f0ee373..435002b 100644
--- a/src/toolbox_scs/detectors/dssc_processing.py
+++ b/src/toolbox_scs/detectors/dssc_processing.py
@@ -12,6 +12,7 @@ import pandas as pd
 
 import extra_data as ed
 
+from ..constants import mnemonics as _mnemonics
 
 log = logging.getLogger(__name__)
 
@@ -105,7 +106,9 @@ def load_chunk_data(sel, sourcename, maxframes=None):
 
 
 def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners, 
-             pulsemask=None):
+             pulsemask=None,
+             xgm_normalization=False, xgm_mnemonic='SCS_SA3', norm_every=2,
+             dark_file=None):
     """
     Collects and reduces DSSC data  for a single module.
 
@@ -137,7 +140,10 @@ def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners,
     collection = ed.open_run(proposal, run_nr,
                              include=f'*DSSC{module:02d}*')
     ntrains = len(collection.train_ids)
+    fpt = info['frames_per_train']
     chunks = np.arange(ntrains, step=chunksize)
+
+    collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*')
     log.info(f"Processing dssc module {module}: start")
 
     # progress bar
@@ -151,37 +157,58 @@ def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners,
     for chunk in chunks:
         sel = collection.select_trains(
                             ed.by_index[chunk:chunk + chunksize])
-        nframes = sel.detector_info(sourcename)['total_frames']
-        if nframes > 0:  
-            log.debug(f"Module {module}: "
-                      f"load trains {chunk}:{chunk + chunksize}")
-
-            chunk_data = load_chunk_data(sel, sourcename)
-            chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
-
-            # prefiltering -> xgm pulse masking, and related creation of
-            # histogram subset
-            if pulsemask is not None:
-                chunk_data = chunk_data.where(pulsemask)
-                chunk_hist = chunk_hist.where(pulsemask)
-            chunk_data = chunk_data.to_dataset(name='data')
-            chunk_data['hist'] = chunk_hist
-
-            # data reduction -> apply binners
-            log.debug(f'Module {module}: apply binning to chunk_data')
-            for b in dssc_binners:
-                chunk_data[b+"_binned"] = dssc_binners[b]
-                chunk_data = chunk_data.groupby(b+"_binned").sum(b)
-                chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
-
-            # add chunk data to loaded data
-            for var in ['data', 'hist']:
-                module_data[var] = xr.concat([module_data[var],
-                                             chunk_data[var]],
-                                             dim='tmp').sum('tmp')
-
-            log.debug(f"Module {module}: "
-                      f"processed trains {chunk}:{chunk + chunksize}")            
+        sel_DA1 = collection_DA1.select_trains(
+                            ed.by_index[chunk:chunk + chunksize])
+
+        log.debug(f"Module {module}: "
+                  f"load trains {chunk}:{chunk + chunksize}")
+
+        chunk_data = load_chunk_data(sel, sourcename)
+        chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
+
+        # prefiltering -> xgm pulse masking, and related creation of
+        # histogram subset
+        if pulsemask is not None:
+            chunk_data = chunk_data.where(pulsemask)
+            chunk_hist = chunk_hist.where(pulsemask)
+        chunk_data = chunk_data.to_dataset(name='data')
+        chunk_data['hist'] = chunk_hist
+
+        if dark_file is not None:
+            log.debug('substract dark')
+            chunk_data['data'] = chunk_data['data'] - dark_file
+
+        if xgm_normalization:
+            log.debug('load and format xgm data')
+            data_xgm = sel_DA1.get_array(*_mnemonics['SCS_SA3'].values())
+            data_xgm = data_xgm.rename(
+                        new_name_or_name_dict={'XGMbunchId':'pulse'})
+            xgm_shape = data_xgm.shape[1]
+            xgm_coord_arr = np.linspace(0,
+                                        norm_every*(xgm_shape-1),
+                                        xgm_shape, dtype=int)
+            data_xgm = data_xgm.assign_coords({'pulse':xgm_coord_arr})
+
+            log.debug('normalize chunk_data using xgm')
+            chunk_data['data'][:,::norm_every,:,:] =\
+                chunk_data['data'][:,::norm_every,:,:] /\
+                    data_xgm[:,0:int(np.ceil(fpt/norm_every))]
+
+        # data reduction -> apply binners
+        log.debug(f'Module {module}: apply binning to chunk_data')
+        for b in dssc_binners:
+            chunk_data[b+"_binned"] = dssc_binners[b]
+            chunk_data = chunk_data.groupby(b+"_binned").sum(b)
+            chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
+
+        # add chunk data to loaded data
+        for var in ['data', 'hist']:
+            module_data[var] = xr.concat([module_data[var],
+                                         chunk_data[var]],
+                                         dim='tmp').sum('tmp')
+
+        log.debug(f"Module {module}: "
+                  f"processed trains {chunk}:{chunk + chunksize}")            
 
         if module == 15:
             pbar.update(1)
diff --git a/src/toolbox_scs/test/test_dssc_cls.py b/src/toolbox_scs/test/test_dssc_cls.py
index 746529f..392a100 100644
--- a/src/toolbox_scs/test/test_dssc_cls.py
+++ b/src/toolbox_scs/test/test_dssc_cls.py
@@ -23,6 +23,9 @@ suites = {"no-processing": (
                 ),
           "processing-full": (
                 "test_binning_xgm",
+                ),
+          "processing-normalized": (
+                "test_normalization",
                 )
           }
 
@@ -163,6 +166,56 @@ class TestDSSC(unittest.TestCase):
         xgm_binned = run235.get_xgm_binned()
         self.assertIsNotNone(xgm_binned)
 
+    def test_normalization(self):
+        proposal_nb = 2530
+
+        # dark
+        run_nb = 49
+        run_obj = tb.load_run(proposal_nb, run_nb, include='*DA*')
+        dssc_info = tbdet.load_dssc_info(proposal_nb, run_nb)
+        fpt = dssc_info['frames_per_train']
+
+        buckets_train = np.zeros(len(run_obj.train_ids))
+        buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] 
+        
+        binner1 = tbdet.create_dssc_bins("trainId",
+                                         run_obj.train_ids,
+                                         buckets_train)
+        binner2 = tbdet.create_dssc_bins("pulse",
+                                         np.linspace(0,fpt-1,fpt, dtype=int),
+                                         buckets_pulse)
+        binners = {'trainId': binner1, 'pulse': binner2}
+        bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
+        dark = bin_obj.bin_data(backend='multiprocessing', modules=[3])
+        
+        # run to normalize
+        run_nb = 50
+        run_obj = tb.load_run(proposal_nb, run_nb, include='*DA*')
+        dssc_info = tbdet.load_dssc_info(proposal_nb, run_nb)
+        fpt = dssc_info['frames_per_train']
+
+        buckets_train = np.zeros(len(run_obj.train_ids))
+        buckets_pulse = ['image', 'dark'] * 99 + ['image_last']
+
+        binner1 = tbdet.create_dssc_bins("trainId",
+                                         run_obj.train_ids,
+                                         buckets_train)
+        binner2 = tbdet.create_dssc_bins("pulse",
+                                         np.linspace(0,fpt-1,fpt, dtype=int),
+                                         buckets_pulse)
+        binners = {'trainId': binner1, 'pulse': binner2}
+        bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
+
+        bin_params = {'modules':[3],
+                      'chunksize':128,
+                      'xgm_normalization':True,
+                      'norm_every':2,
+                      'dark_file':dark['data'][:,0,0,:,:]
+                     }
+
+        data = bin_obj.bin_data(backend='multiprocessing', **bin_params)
+        self.assertIsNotNone(data.data)
+
 
 def list_suites():
     print("\nPossible test suites:\n" + "-" * 79)
diff --git a/src/toolbox_scs/test/test_dssc_methods.py b/src/toolbox_scs/test/test_dssc_methods.py
index d5c804a..edae993 100644
--- a/src/toolbox_scs/test/test_dssc_methods.py
+++ b/src/toolbox_scs/test/test_dssc_methods.py
@@ -42,6 +42,9 @@ suites = {"metafunctions": (
                 "test_calcindices",
                 "test_createpulsemask",
                 "test_processmodule2",
+                ),
+          "binning-normalize": (
+                "normalize_xgm",
                 )
           }
 
@@ -306,6 +309,88 @@ class TestDSSC(unittest.TestCase):
         print(data)
         self.assertIsNotNone(data)
 
+    def normalize_xgm(self):
+        cls = self.__class__
+        # load single chunk
+        proposal = 2530
+        run_nr = 50
+        module = 1
+        chunksize = 128
+        sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
+
+        collection = ed.open_run(proposal, run_nr,
+                                 include=f'*DSSC{module:02d}*')
+        collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*')
+
+        dssc_info = tbdet.load_dssc_info(proposal, run_nr)
+        fpt = dssc_info['frames_per_train']
+
+        buckets_train = np.zeros(len(collection.train_ids))
+        binner1 = tbdet.create_dssc_bins(
+                    "trainId",collection.train_ids,buckets_train)
+
+        buckets_pulse = ['image', 'dark'] * 99 + ['image_last']
+        binner2 = tbdet.create_dssc_bins(
+                    "pulse",np.linspace(0,fpt-1,fpt, dtype=int),buckets_pulse)
+        binners = {'trainId':binner1, 'pulse':binner2}
+
+
+        ntrains = len(collection.train_ids)
+        chunks = np.arange(ntrains, step=chunksize)
+        start_index = chunks[0]
+
+        module_data = create_empty_dataset(dssc_info, binners)
+
+        for chunk in chunks[0:1]:
+            sel = collection.select_trains(
+                                ed.by_index[chunk:chunk + chunksize])
+            sel_DA1 = collection_DA1.select_trains(
+                                ed.by_index[chunk:chunk + chunksize])
+            log_root.debug(f"Module {module}: "
+                           f"loading trains {chunk}:{chunk + chunksize}")
+            chunk_data = load_chunk_data(sel, sourcename)
+            self.assertIsNotNone(chunk_data)
+
+            # pulse masking, and creation of related hist subset.
+            chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
+            chunk_data = chunk_data.to_dataset(name='data')
+            chunk_data['hist'] = chunk_hist
+
+            log_root.debug('load and format xgm data')
+            xgm_mnem = tb.mnemonics['SCS_SA3'].values()
+            data_xgm = sel_DA1.get_array(*xgm_mnem)
+            data_xgm = data_xgm.rename(
+                            new_name_or_name_dict={'XGMbunchId':'pulse'})
+
+            xgm_shape = data_xgm.shape[1]
+            norm_every = int(2)
+            xgm_coord_arr = np.linspace(0,
+                                        norm_every*(xgm_shape-1),
+                                        xgm_shape, dtype=int)
+
+            data_xgm = data_xgm.assign_coords({'pulse':xgm_coord_arr})
+            log_root.debug('normalize chunk_data using xgm')
+            chunk_data['data'][:,::norm_every,:,:] =\
+                chunk_data['data'][:,::norm_every,:,:] /\
+                    data_xgm[:,0:int(np.ceil(fpt/norm_every))]
+
+            # apply predefined binning
+            log_root.debug(f'Module {module}: apply binning to chunk_data')
+            for b in binners:
+                chunk_data[b+"_binned"] = binners[b]
+                chunk_data = chunk_data.groupby(b+"_binned").sum(b)
+                chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
+            # ToDo: Avoid creation of unnecessary data when binning along x,y
+
+            # merge dsets and format
+            log_root.debug(f'Module {module}: merge data into prepared dset')
+            for var in ['data', 'hist']:
+                module_data[var] = xr.concat([module_data[var],
+                                             chunk_data[var]],
+                                             dim='tmp').sum('tmp')
+
+        #module_data = module_data.transpose('trainId', 'pulse', 'x', 'y')
+
 
     def test_storage(self):
         cls = self.__class__
-- 
GitLab