From 649fa4e717c3526dbec3daa5daa70729bf8cc3bf Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lo=C3=AFc=20Le=20Guyader?= <loic.le.guyader@xfel.eu>
Date: Wed, 16 Oct 2019 17:37:50 +0200
Subject: [PATCH] adds multiprocessing binning of DSSC data

---
 DSSC.py                 | 331 ++++++++++++++++++++++++++++++++++++++++
 __init__.py             |   2 +
 azimuthal_integrator.py |  63 ++++++++
 3 files changed, 396 insertions(+)
 create mode 100644 DSSC.py
 create mode 100644 azimuthal_integrator.py

diff --git a/DSSC.py b/DSSC.py
new file mode 100644
index 0000000..4ea6eb6
--- /dev/null
+++ b/DSSC.py
@@ -0,0 +1,331 @@
+
+import multiprocessing
+from time import strftime
+import tempfile
+import shutil
+from tqdm.auto import tqdm
+import os
+import warnings
+
+import karabo_data as kd
+from karabo_data.geometry2 import DSSC_1MGeometry
+import ToolBox as tb
+import numpy as np
+import xarray as xr
+import matplotlib.pyplot as plt
+import h5py
+
+
+class DSSC:
+    
+    def __init__(self, semester, proposal, topic='SCS'):
+        self.semester = semester
+        self.proposal = proposal
+        self.topic = topic
+        self.tempdir = None
+        self.save_folder = f'/gpfs/exfel/exp/{self.topic}/{self.semester}/{self.proposal}/usr/condensed_runs/'
+        
+        print('DSSC configuration')
+        print(f'Topic: {self.topic}')
+        print(f'Semester: {self.semester}')
+        print(f'Proposal: {self.proposal}')
+        print(f'Default save folder: {self.save_folder}')
+        
+        if not os.path.exists(self.save_folder):
+            warnings.warn(f'Default save folder does not exist: {self.save_folder}')
+        
+    def __del__(self):
+        # deleting temporay folder
+        if self.tempdir:
+            shutil.rmtree(self.tempdir)
+    
+    def open_run(self, run_nr, isDark=False):
+        print('Opening run data with karabo-data')
+        self.run_nr = run_nr
+        self.xgm = None
+        self.scan_vname = None
+        
+        self.run = kd.open_run(self.proposal, self.run_nr)
+        self.isDark = isDark
+        self.plot_title = f'{self.semester} run: {self.run_nr}'
+        
+        self.fpt = self.run.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')['frames_per_train']
+        self.nbunches = self.run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
+        self.nbunches = np.unique(self.nbunches)
+        if len(self.nbunches) == 1:
+            self.nbunches = self.nbunches[0]
+        else:
+            warnings.warn('not all trains have same length DSSC data')
+            print(f'nbunches: {self.nbunches}')
+            self.nbunches = self.nbunches[-1]
+            
+        print(f'DSSC frames per train: {self.fpt}')
+        print(f'SA3 bunches per train: {self.nbunches}')
+        
+        if self.tempdir is not None:
+            shutil.rmtree(self.tempdir)
+        
+        self.tempdir = tempfile.mkdtemp()
+        print(f'Temporary directory: {self.tempdir}')
+
+        print('Creating virtual dataset')
+        self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
+        
+        # create a dummy scan variable for dark run
+        # for other type or run, use DSSC.define_run function to overwrite it
+        self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'],
+                                 coords={'trainId': self.run.train_ids})
+        self.scan_vname = 'dummy'
+        self.vds_scan = None
+        
+    def define_scan(self, vname, bins):
+        """
+            vname: variable name for the scan, can be a mnemonic string from ToolBox
+                or a dictionnary with ['source', 'key'] fields
+            bins: step size or bins
+        """
+
+        if type(vname) is dict:
+            self.scan = self.run.get_array(vname['source'], vname['key'])
+        elif type(vname) is str:
+            if vname not in tb.mnemonics:
+                raise ValueError(f'{vname} not found in the ToolBox mnemonics table')
+            self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
+        else:
+            raise ValueError(f'vname should be a string or a dict. We got {type(vname)}')
+            
+        if (type(bins) is int) or (type(bins) is float):
+            self.scan = bins * np.round(self.scan / bins)
+        else:
+            # TODO: digitize the data
+            raise ValueError(f'To be implemented')
+        self.scan_vname = vname
+        
+        self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
+        if os.path.isfile(self.vds_scan):
+            os.remove(self.vds_scan)
+            
+        self.scan = self.scan.to_dataset(name='scan_variable')
+        self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0')
+        self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0')
+        self.scan.to_netcdf(self.vds_scan, group='data')
+
+        self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])),
+                                        dims=['scan_variable'],
+                                        coords={'scan_variable': self.scan['scan_variable'].values},
+                                        name='counts')
+        
+        fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
+        ax1.plot(self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values,
+                 self.scan_counts.groupby('scan_variable').sum(),
+                 'o-', ms=2)
+        ax1.set_xlabel('scan variable')
+        ax1.set_ylabel('# trains')
+        ax1.set_title(self.plot_title)
+        
+        ax2.plot(self.scan['scan_variable'])
+        ax2.set_xlabel('train #')
+        ax2.set_ylabel(f'{vname}')
+
+        plt.tight_layout()
+            
+    def load_xgm(self):
+        if self.xgm is None:
+            self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
+                                          tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches])
+            
+    def plot_xgm_hist(self, nbins):
+        if self.xgm is None:
+            self.load_xgm()
+            
+        hist, bins_edges = np.histogram(self.xgm, nbins, density=True)
+        width = 1.0 * (bins_edges[1] - bins_edges[0])
+        bins_center = 0.5*(bins_edges[:-1] + bins_edges[1:])
+        plt.figure()
+        plt.bar(bins_center, hist, align='center', width=width)
+        plt.xlabel(f"{tb.mnemonics['SCS_SA3']['source']}{tb.mnemonics['SCS_SA3']['key']}")
+        plt.ylabel('density')
+        plt.title(self.plot_title)
+        plt.tight_layout()
+        
+    def xgm_filter(self, xgm_low=-np.inf, xgm_high=np.inf):
+        if self.xgm is None:
+            self.load_xgm()
+        
+        if self.isDark:
+            warnings.warn(f'This run was loaded as dark. Filtering on xgm makes no sense. Aborting')
+            return
+        
+        self.xgm_low = xgm_low
+        self.xgm_high = xgm_high
+        
+        valid = ((self.xgm > self.xgm_low) * (self.xgm < self.xgm_high)).prod('dim_0').astype(bool)
+        xgm_valid = self.xgm.where(valid)
+        xgm_valid = xgm_valid.dropna('trainId')
+        self.scan = self.scan.sel({'trainId': xgm_valid.trainId})
+        nrejected = len(self.run.train_ids) - len(self.scan)
+        print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm '
+               f'thresholds: [{self.xgm_low}, {self.xgm_high}]'))
+
+    def load_geom(self):
+        quad_pos = [
+            (-124.100,    3.112),  # TR
+            (-133.068, -110.604),  # BR
+            (   0.988, -125.236),  # BL
+            (   4.528,   -4.912)   # TL
+            ]
+        path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
+        geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
+        return geom
+
+    def create_virtual_dssc_datasets(self, run, path=''):
+        vds_list = []
+        for m in tqdm(range(16)):
+            vds_filename = os.path.join(path, f'dssc{m}_vds.h5')
+            if os.path.isfile(vds_filename):
+                os.remove(vds_filename)
+            module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf',
+                                                 'image.data', filename=vds_filename)
+            vds_list.append([vds_filename, module_vds])
+        return vds_list
+    
+    def crunch(self):
+                   
+        if self.vds_scan is None:
+            # probably a dark run with a dummy scan variable
+            self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
+            if os.path.isfile(self.vds_scan):
+                os.remove(self.vds_scan)
+            
+            self.scan = self.scan.to_dataset(name='scan_variable')
+            self.scan.to_netcdf(self.vds_scan, group='data')
+
+        max_GB = 400
+        # max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
+        self.chunksize = int(max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
+        
+        print('processing', self.chunksize, 'trains per chunk')
+                   
+        jobs = []
+        for m in range(16):
+            jobs.append(dict(
+            module=m,
+            fpt=self.fpt,
+            vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'),
+            chunksize=self.chunksize,
+            vdf_scan=self.vds_scan,
+            nbunches=self.nbunches,
+            run_nr=self.run_nr,
+            ))
+            
+        timestamp = strftime('%X')
+        print(f'start time: {timestamp}')
+
+        with multiprocessing.Pool(16) as pool:
+            module_data = pool.map(process_one_module, jobs)
+    
+        print('finished:', strftime('%X'))
+    
+        # rearange the multiprocessed data
+        self.module_data = xr.concat(module_data, dim='module')
+        self.module_data['run'] = self.run_nr
+        self.module_data = self.module_data.transpose('scan_variable', 'module', 'x', 'y')
+                   
+        self.module_data = xr.merge([self.module_data, self.scan.groupby('scan_variable').mean('trainId')])
+        self.module_data = self.module_data.squeeze()
+
+    def save(self, save_folder=None, overwrite=False):
+        if save_folder is None:
+            save_folder = this.save_folder
+
+        if self.isDark:
+            fname = f'run{self.run_nr}_el.h5'  # no scan
+        else:
+            fname = f'run{self.run_nr}_by-delay_el.h5'  # run with delay scan (change for other scan types!)
+
+
+        save_path = os.path.join(save_folder, fname)
+        file_exists = os.path.isfile(save_path)
+
+        if not file_exists or (file_exists and overwrite):
+            if file_exists:
+                warnings.warn(f'Overwriting file: {save_path}')
+                os.remove(save_path)
+            self.module_data.to_netcdf(save_path, group='data')
+            os.chmod(save_path, 0o664)
+            print('saving: ', save_path)
+        else:
+            print('file', save_path, 'exists and overwrite is False')
+
+# since 'self' is not pickable, this function has to be outside the DSSC class so that it can be used
+# by the multiprocessing pool.map function
+def process_one_module(job):
+    module = job['module']
+    fpt = job['fpt']
+    data_vdf = job['vdf_module']
+    scan_vdf = job['vdf_scan']
+    chunksize = job['chunksize']
+    nbunches = job['nbunches']
+
+    image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data'
+    npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count'
+    with h5py.File(data_vdf, 'r') as m:
+        all_trainIds = m['INDEX/trainId'][()]
+    n_trains = len(all_trainIds)
+    chunk_start = np.arange(n_trains, step=chunksize, dtype=int)
+
+    # load scan variable
+    scan = xr.open_dataset(scan_vdf, group='data')['scan_variable']
+    scan.name = 'scan'
+    len_scan = len(scan.groupby(scan))
+
+    # create empty dataset to add actual data to
+    module_data = xr.DataArray(np.empty([len_scan, 128, 512], dtype=np.float64),
+                               dims=['scan_variable', 'x', 'y'],
+                               coords={'scan_variable': np.unique(scan)})
+    module_data = module_data.to_dataset(name='pumped')
+    module_data['unpumped'] = xr.full_like(module_data['pumped'], 0)
+    module_data['sum_count'] = xr.DataArray(np.zeros_like(np.unique(scan)), dims=['scan_variable'])
+    module_data['module'] = module
+
+    # crunching
+    with h5py.File(data_vdf, 'r') as m:
+        #fpt_calc = int(len(m[image_path]) / n_trains)
+        #assert fpt_calc == fpt, f'data length does not match expected value (module {module})'
+        all_trainIds = m['INDEX/trainId'][()]
+        frames_per_train = m[npulse_path][()]
+        trains_with_data = all_trainIds[frames_per_train == fpt]
+        #print(np.unique(pulses_per_train), '/', fpt)
+        #print(len(trains_with_data))
+        chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int)
+        trains_start = 0
+                   
+        # This line is the strange hack from https://github.com/tqdm/tqdm/issues/485
+        print(' ', end='', flush=True)
+        for c0 in tqdm(chunk_start, desc=f'pool.map#{module:02d}', position=module):
+            chunk_dssc = np.s_[int(c0 * fpt):int((c0 + chunksize) * fpt)]  # for dssc data
+            data = m[image_path][chunk_dssc].squeeze()
+            data = data.astype(np.float64)
+            n_trains = int(data.shape[0] // fpt)
+            trainIds_chunk = np.unique(trains_with_data[trains_start:trains_start + n_trains])
+            trains_start += n_trains
+            n_trains_actual = len(trainIds_chunk)
+            coords = {'trainId': trainIds_chunk}
+            data = np.reshape(data, [n_trains_actual, fpt, 128, 512])[:, :int(2 * nbunches)]
+            data = xr.DataArray(data, dims=['trainId', 'pulse', 'x', 'y'], coords=coords)
+            data_pumped = (data[:, ::4]).mean('pulse')
+            data_unpumped = (data[:, 2::4]).mean('pulse')
+            data = data_pumped.to_dataset(name='pumped')
+            data['unpumped'] = data_unpumped
+            data['sum_count'] = xr.DataArray(np.ones(n_trains_actual), dims=['trainId'], coords=coords)
+            # grouping and summing
+            data['scan_variable'] = scan  # this only adds scan data for matching trainIds
+            data = data.dropna('trainId')
+            data = data.groupby('scan_variable').sum('trainId')
+            where = {'scan_variable': data.scan_variable}
+            for var in ['pumped', 'unpumped', 'sum_count']:
+                module_data[var].loc[where] = module_data[var].loc[where] + data[var]
+    for var in ['pumped', 'unpumped']:
+        module_data[var] = module_data[var] / module_data.sum_count
+    #module_data = module_data.drop('sum_count')
+    return module_data
diff --git a/__init__.py b/__init__.py
index 9266141..86c3187 100644
--- a/__init__.py
+++ b/__init__.py
@@ -3,3 +3,5 @@ from ToolBox.xgm import *
 from ToolBox.XAS import *
 from ToolBox.knife_edge import *
 from ToolBox.Laser_utils import *
+from ToolBox.DSSC import DSSC
+from ToolBox.azimuthal_integrator import azimuthal_integrator
diff --git a/azimuthal_integrator.py b/azimuthal_integrator.py
new file mode 100644
index 0000000..da9a6dd
--- /dev/null
+++ b/azimuthal_integrator.py
@@ -0,0 +1,63 @@
+
+class azimuthal_integrator(object):
+    def __init__(self, imageshape, center, polar_range, dr=2):
+        '''
+        Create a reusable integrator for repeated azimuthal integration of similar
+        images. Calculates array indices for a given parameter set that allows
+        fast recalculation.
+        
+        Parameters
+        ==========
+        imageshape : tuple of ints
+            The shape of the images to be integrated over.
+            
+        center : tuple of ints
+            center coordinates in pixels
+        
+        polar_range : tuple of ints
+            start and stop polar angle (in degrees) to restrict integration to wedges
+        
+        dr : int, default 2
+            radial width of the integration slices. Takes non-square DSSC pixels into account.
+        
+        Returns
+        =======
+        ai : azimuthal_integrator instance
+            Instance can directly be called with image data:
+            > az_intensity = ai(image)
+            radial distances and the polar mask are accessible as attributes:
+            > ai.distance
+            > ai.polar_mask
+        '''
+        self.shape = imageshape
+        cx, cy = center
+        sx, sy = imageshape
+        xcoord, ycoord = np.ogrid[:sx, :sy]
+        xcoord -= cx
+        ycoord -= cy
+
+        # distance from center, hexagonal pixel shape taken into account
+        dist_array = np.hypot(xcoord * 204 / 236, ycoord)
+
+        # array of polar angles
+        tmin, tmax = np.deg2rad(np.sort(polar_range)) % np.pi
+        polar_array = np.arctan2(xcoord, ycoord)
+        polar_array = np.mod(polar_array, np.pi)
+        self.polar_mask = (polar_array > tmin) * (polar_array < tmax)
+
+        self.maxdist = min(sx  - cx, sy  - cy)
+
+        ix, iy = np.indices(dimensions=(sx, sy))
+        self.index_array = np.ravel_multi_index((ix, iy), (sx, sy))
+
+        self.distance = np.array([])
+        self.flat_indices = []
+        for dist in range(dr, self.maxdist, dr):
+            ring_mask = self.polar_mask * (dist_array >= (dist - dr)) * (dist_array < dist)
+            self.flat_indices.append(self.index_array[ring_mask])
+            self.distance = np.append(self.distance, dist)
+    
+    def __call__(self, image):
+        assert self.shape == image.shape, 'image shape does not match'
+        image_flat = image.flatten()
+        return np.array([np.nansum(image_flat[indices]) for indices in self.flat_indices])
-- 
GitLab