From 5393914ea8656eacb5dceaf122cda0a68a7cda58 Mon Sep 17 00:00:00 2001
From: Rafael Gort <rafael.gort@xfel.eu>
Date: Thu, 7 May 2020 16:14:28 +0200
Subject: [PATCH] Extended skeleton for dssc class, restructured processing and
 routines modules for easy switch between joblib and multiprocessing

---
 src/toolbox_scs/detectors/__init__.py         |   9 +-
 .../detectors/azimuthal_integrator.py         |   6 +-
 src/toolbox_scs/detectors/dssc.py             |   9 +-
 src/toolbox_scs/detectors/dssc_misc.py        | 114 ++++++++++++
 .../{dssc_routines.py => dssc_processing.py}  | 171 ++++++------------
 5 files changed, 190 insertions(+), 119 deletions(-)
 create mode 100644 src/toolbox_scs/detectors/dssc_misc.py
 rename src/toolbox_scs/detectors/{dssc_routines.py => dssc_processing.py} (77%)

diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py
index abb5916..d08e171 100644
--- a/src/toolbox_scs/detectors/__init__.py
+++ b/src/toolbox_scs/detectors/__init__.py
@@ -3,8 +3,9 @@ from .xgm import (
 from .tim import (
     load_TIM,)
 from .dssc_routines import (
-    load_dssc_info, calc_xgm_frame_indices, process_dssc_module, split_frames, 
-    process_intra_train)
+    load_dssc_info, load_geom, calc_xgm_frame_indices)
+from .dssc_processing import (
+    bin_data_multipr, process_intra_train, bin_data)
 from .dssc import DSSC
 
 __all__ = (
@@ -14,7 +15,8 @@ __all__ = (
     "matchXgmTimPulseId",
     "load_dssc_info",
     "calc_xgm_frame_indices",
-    "process_dssc_module",
+    "bin_data_multipr",
+    "bin_data",
     "process_intra_train",
     "split_frames",
     # Classes
@@ -36,6 +38,7 @@ clean_ns = [
     'DSSC1module',
     'dssc',
     'dssc_routines',
+    'dssc_processing',
     'dssc_data',
     'dssc_plot',
     'azimuthal_integrator',
diff --git a/src/toolbox_scs/detectors/azimuthal_integrator.py b/src/toolbox_scs/detectors/azimuthal_integrator.py
index 56759ac..e6b4ce3 100644
--- a/src/toolbox_scs/detectors/azimuthal_integrator.py
+++ b/src/toolbox_scs/detectors/azimuthal_integrator.py
@@ -1,5 +1,9 @@
+import logging
+
 import numpy as np
 
+log = logging.getLogger(__name__)
+
 class AzimutalIntegrator(object):
     def __init__(self, imageshape, center, polar_range, dr=2, aspect=204/236):
         '''
@@ -37,7 +41,7 @@ class AzimutalIntegrator(object):
         '''
         self.shape = imageshape
         cx, cy = center
-        print(f'azimuthal center: {center}')
+        log.info(f'azimuthal center: {center}')
         sx, sy = imageshape
         xcoord, ycoord = np.ogrid[:sx, :sy]
         xcoord -= cx
diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py
index 09a608e..7ba4dba 100644
--- a/src/toolbox_scs/detectors/dssc.py
+++ b/src/toolbox_scs/detectors/dssc.py
@@ -62,7 +62,14 @@ class DSSC:
     # Data processing
     # -------------------------------------------------------------------------
     def process_bin(self):
-        pass
+        
+        #option 1
+        #with multiprocessing.Pool(16) as pool:
+        #    module_data = pool.map(tbdet.bin_data_multipr, jobs)
+        
+        #option 2
+        #module_data = joblib.Parallel(n_jobs=16) \
+        #    (joblib.delayed(tbdet.bin_data)(**jobs[i]) for i in range(16))
 
     def azimuthal_integration(self):
         pass
diff --git a/src/toolbox_scs/detectors/dssc_misc.py b/src/toolbox_scs/detectors/dssc_misc.py
new file mode 100644
index 0000000..013ada2
--- /dev/null
+++ b/src/toolbox_scs/detectors/dssc_misc.py
@@ -0,0 +1,114 @@
+""" 
+    DSSC-related sub-routines.
+
+    original-author: Michael Schneider 
+    authors: SCS-team members
+    
+    license: BSD 3-Clause License (see LICENSE_BSD for more info)
+
+    comment: contributions should comply with pep8 code structure guidelines.
+"""
+import logging
+from joblib import Parallel, delayed
+from tqdm import tqdm
+
+import numpy as np
+import xarray as xr
+import pandas as pd
+
+import extra_data as ed
+
+
+log = logging.getLogger(__name__)
+
+
+def load_dssc_info(proposal, run_nr):
+    """
+    Loads the first data file for DSSC module 0 (this is hardcoded)
+    and returns the detector_info dictionary
+
+    Parameters
+    ----------
+    proposal: str, int
+        number of proposal
+    run_nr: str, int
+        number of run
+
+    Returns
+    -------
+    info : dictionary
+        {'dims': tuple, 'frames_per_train': int, 'total_frames': int}
+    """
+
+    module = ed.open_run(proposal, run_nr, include='*DSSC00*')
+    info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')
+    log.debug("Fetched information for DSSC module nr. 0.")
+    return info
+
+
+def calc_xgm_frame_indices(nbunches, framepattern):
+    """
+    Returns a coordinate array for XGM data. The coordinates correspond to
+    DSSC frame numbers and depend on the number of FEL pulses per train
+    ("nbunches") and the framepattern. In framepattern, dark DSSC frame
+    names (i.e., without FEL pulse) _must_ include "dark" as a substring.
+    
+    Copyright (c) 2019, Michael Schneider
+    Copyright (c) 2020, SCS-team
+
+    Parameters
+    ----------
+    nbunches: int
+        number of bunches per train
+    framepattern: list
+        experimental pattern
+
+    Returns
+    -------
+    frame_indices: numpy.ndarray
+        coordinate array corresponding to DSSC frame numbers
+
+    """
+
+    n_frames = len(framepattern)
+    n_data_frames = np.sum(['dark' not in p for p in framepattern])
+    frame_max = nbunches * n_frames // n_data_frames
+
+    frame_indices = []
+    for i, p in enumerate(framepattern):
+        if 'dark' not in p:
+            frame_indices.append(np.arange(i, frame_max, n_frames))
+
+    log.debug("Constructed coordinate array for XGM data.")
+    return np.sort(np.concatenate(frame_indices))
+
+
+def load_geom(geopath=None, quad_pos=None):
+    """ 
+    Loads and return the DSSC geometry.
+
+    Parameters
+    ----------
+    geopath: str 
+        path to the h5 geometry file. If None uses a default file.
+        quad_pos: list of quadrants tuple position. If None uses a default
+        position.
+
+    Returns
+    -------
+    geom: extra_geom.DSSC_1MGeometry
+        loaded geometry object
+        """
+    if quad_pos is None:
+        quad_pos = [(-124.100,    3.112),  # TR
+                    (-133.068, -110.604),  # BR
+                    (   0.988, -125.236),  # BL
+                    (   4.528,   -4.912)   # TL
+                    ]
+
+    if geopath is None:
+        geopath = '/gpfs/exfel/sw/software/git/EXtra-geom/' \
+                  'docs/dssc_geo_june19.h5'
+
+    geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(geopath, quad_pos)
+    return geom
diff --git a/src/toolbox_scs/detectors/dssc_routines.py b/src/toolbox_scs/detectors/dssc_processing.py
similarity index 77%
rename from src/toolbox_scs/detectors/dssc_routines.py
rename to src/toolbox_scs/detectors/dssc_processing.py
index ded1405..0930849 100644
--- a/src/toolbox_scs/detectors/dssc_routines.py
+++ b/src/toolbox_scs/detectors/dssc_processing.py
@@ -9,6 +9,7 @@
     comment: contributions should comply with pep8 code structure guidelines.
 """
 import logging
+from joblib import Parallel, delayed
 from tqdm import tqdm
 
 import numpy as np
@@ -21,98 +22,6 @@ import extra_data as ed
 log = logging.getLogger(__name__)
 
 
-def load_dssc_info(proposal, run_nr):
-    """
-    Loads the first data file for DSSC module 0 (this is hardcoded)
-    and returns the detector_info dictionary
-
-    Parameters
-    ----------
-    proposal: str, int
-        number of proposal
-    run_nr: str, int
-        number of run
-
-    Returns
-    -------
-    info : dictionary
-        {'dims': tuple, 'frames_per_train': int, 'total_frames': int}
-    """
-
-    module = ed.open_run(proposal, run_nr, include='*DSSC00*')
-    info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')
-    log.debug("Fetched information for DSSC module nr. 0.")
-    return info
-
-
-def load_geom(geopath=None, quad_pos=None):
-    """ 
-    Loads and return the DSSC geometry.
-
-    Parameters
-    ----------
-    geopath: str 
-        path to the h5 geometry file. If None uses a default file.
-        quad_pos: list of quadrants tuple position. If None uses a default
-        position.
-
-    Returns
-    -------
-    geom: extra_geom.DSSC_1MGeometry
-        loaded geometry object
-        """
-    if quad_pos is None:
-        quad_pos = [(-124.100,    3.112),  # TR
-                    (-133.068, -110.604),  # BR
-                    (   0.988, -125.236),  # BL
-                    (   4.528,   -4.912)   # TL
-                    ]
-
-    if geopath is None:
-        geopath = '/gpfs/exfel/sw/software/git/EXtra-geom/' \
-                  'docs/dssc_geo_june19.h5'
-
-    geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(geopath, quad_pos)
-    return geom
-
-
-def calc_xgm_frame_indices(nbunches, framepattern):
-    """
-    Returns a coordinate array for XGM data. The coordinates correspond to
-    DSSC frame numbers and depend on the number of FEL pulses per train
-    ("nbunches") and the framepattern. In framepattern, dark DSSC frame
-    names (i.e., without FEL pulse) _must_ include "dark" as a substring.
-    
-    Copyright (c) 2019, Michael Schneider
-    Copyright (c) 2020, SCS-team
-
-    Parameters
-    ----------
-    nbunches: int
-        number of bunches per train
-    framepattern: list
-        experimental pattern
-
-    Returns
-    -------
-    frame_indices: numpy.ndarray
-        coordinate array corresponding to DSSC frame numbers
-
-    """
-
-    n_frames = len(framepattern)
-    n_data_frames = np.sum(['dark' not in p for p in framepattern])
-    frame_max = nbunches * n_frames // n_data_frames
-
-    frame_indices = []
-    for i, p in enumerate(framepattern):
-        if 'dark' not in p:
-            frame_indices.append(np.arange(i, frame_max, n_frames))
-
-    log.debug("Constructed coordinate array for XGM data.")
-    return np.sort(np.concatenate(frame_indices))
-
-
 def prepare_module_empty(scan_variable, framepattern):
     """
     Create empty (zero-valued) DataArray for a single DSSC module
@@ -345,30 +254,24 @@ def process_intra_train(job):
     return module_data
 
 
-def process_dssc_module(job):
+def bin_data_multipr(job):
     """
-    Aggregate DSSC data (chunked, to fit into memory) for a single module.
-    Groups by "scan_variable" in given scanfile - use dummy scan_variable to
-    average over all trains. This implies, that only trains found in the
-    scanfile are considered.
-    
-    Copyright (c) 2019, Michael Schneider
-    Copyright (c) 2020, SCS-team
+    Entry point for binning routines using the multrprocessing module
 
     Parameters
     ----------
-    job: dictionary
+    job: dictionary
       Designed for the multiprocessing module - expects a job dictionary with
       the following keys:
         proposal : int
                    proposal number
-        run : int
+        run_nr : int
                 run number
         module : int
                 DSSC module to process
         chunksize : int
                 number of trains to process simultaneously
-        scanfile : str
+        binfile : str
                 name of hdf5 file with xarray.DataArray containing the
                 scan variable and trainIds
         framepattern : list of str
@@ -383,14 +286,54 @@ def process_dssc_module(job):
     module_data: xarray.Dataset
 
     """
+    params = {}
+    params['proposal'] = job['proposal']
+    params['run_nr'] = job['run_nr']
+    params['module'] = job['module']
+    params['chunksize'] = job['chunksize']
+    params['binfile'] = job['binfile']
+    params['framepattern'] = job.get('framepattern', ['image'])
+    params['maskfile'] = job.get('maskfile', None)
 
-    proposal = job['proposal']
-    run_nr = job['run_nr']
-    module = job['module']
-    chunksize = job['chunksize']
-    scanfile = job['scanfile']
-    framepattern = job.get('framepattern', ['image'])
-    maskfile = job.get('maskfile', None)
+    return bin_data(**params)
+
+
+def bin_data(proposal, run_nr, module, chunksize, binfile,
+                        framepattern=['image'], maskfile=None):
+    """
+    Aggregate DSSC data (chunked, to fit into memory) for a single module.
+    Groups by "scan_variable" in given bins - use dummy scan_variable to
+    average over all trains. This implies, that only trains found in bins are 
+    considered.
+    
+    Copyright (c) 2019, Michael Schneider
+    Copyright (c) 2020, SCS-team
+
+    Parameters
+    ----------
+    proposal : int
+        proposal number
+    run_nr : int
+        run number
+    module : int
+        DSSC module to process
+    chunksize : int
+        number of trains to process simultaneously
+    binfile : str
+        name of hdf5 file with xarray.DataArray containing the
+        bin variable and trainIds
+    framepattern : list of str
+        names for the (possibly repeating) intra-train pulses. See
+        split_dssc_data
+    pulsemask : str
+        name of hdf5 file with boolean xarray.DataArray to
+        select/reject trains and pulses
+
+    Returns
+    -------
+    module_data: xarray.Dataset
+        xarray datastructure containing data binned according to bins.
+    """
 
     sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
     collection = ed.open_run(proposal, run_nr,
@@ -399,8 +342,8 @@ def process_dssc_module(job):
 
     log.info(f"Processing dssc module {module}: start")
 
-    # read preprocessed scan variable from file
-    scan = xr.open_dataarray(scanfile, 'data', engine='h5netcdf')
+    # read preprocessed bins from file
+    bins = xr.open_dataarray(binfile, 'data', engine='h5netcdf')
 
     # read binary pulse/train mask - e.g. from XGM thresholding
     if maskfile is not None:
@@ -408,7 +351,7 @@ def process_dssc_module(job):
     else:
         pulsemask = None
 
-    module_data = prepare_module_empty(scan, framepattern)
+    module_data = prepare_module_empty(bins, framepattern)
     chunks = np.arange(ntrains, step=chunksize)
 
     # progress bar
@@ -436,7 +379,7 @@ def process_dssc_module(job):
             data = xr.merge([data, sum_count])
 
             # aligns on trainId, drops non-matching trains
-            data['scan_variable'] = scan
+            data['scan_variable'] = bins
             data = data.groupby('scan_variable').sum('trainId')
 
             log.debug(f"Module {module}: "
-- 
GitLab