From 1dae93345d72bb2efd3371e8c9b2d5c47e1ee0d2 Mon Sep 17 00:00:00 2001 From: Rafael Gort <rafael.gort@xfel.eu> Date: Thu, 20 Aug 2020 00:35:38 +0200 Subject: [PATCH] Genralized joblib implementation --- VERSION | 2 +- src/toolbox_scs/detectors/__init__.py | 3 +- src/toolbox_scs/detectors/dssc.py | 39 ++++++++++++-------- src/toolbox_scs/detectors/dssc_processing.py | 13 ------- 4 files changed, 25 insertions(+), 32 deletions(-) diff --git a/VERSION b/VERSION index 3fe66b1..32afd27 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.3-rc1 +1.0.3-rc2 diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index ae82ccb..5059545 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -8,7 +8,7 @@ from .dssc_misc import ( load_dssc_info, create_dssc_bins, load_geom, quickmask_DSSC_ASIC, calc_xgm_frame_indices, get_xgm_formatted, load_mask) from .dssc_processing import ( - bin_data_multipr, bin_data) + bin_data) from .dssc import ( DSSCBinner, DSSCFormatter, DSSCAnalyzer) from .azimuthal_integrator import AzimuthalIntegrator @@ -23,7 +23,6 @@ __all__ = ( "create_dssc_bins", "calc_xgm_frame_indices", "get_xgm_formatted", - "bin_data_multipr", "bin_data", "save_xarray", "load_xarray", diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 7068b19..f6e6759 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -28,7 +28,7 @@ from .dssc_data import ( from .dssc_misc import ( load_dssc_info, get_xgm_formatted, load_geom) from .dssc_processing import ( - bin_data, bin_data_multipr, create_empty_dataset) + bin_data, create_empty_dataset) __all__ = ["DSSCBinner", "DSSCFormatter", "DSSCAnalyzer"] log = logging.getLogger(__name__) @@ -172,15 +172,22 @@ class DSSCBinner: # ------------------------------------------------------------------------- # Data processing # ------------------------------------------------------------------------- - def bin_data(self, use_joblib=True, modules=[], chunksize=512): + def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None): """ Load and bin dssc data according to self.bins Parameters ---------- - use_joblib: bool - use the joblib module for multithreading. If set to false the - multiprocessing module is used. + backend: str + joblib multiprocessing backend to be used. At the moment it can be + any of joblibs standard backends: 'loky' (default), + 'multiprocessing', 'threading'. Anything else than the default is + experimental and not appropriately implemented in the dbdet member + function 'bin_data'. + n_jobs: int + number of cpu's used per sub-process. Note that when using the + default backend there is no need to adjust this parameter with the + current implementation. modules: list of ints a list containing the module numbers that should be processed. If empty, all modules are processed. @@ -198,11 +205,14 @@ class DSSCBinner: mod_list = modules if len(mod_list)==0: mod_list = [i for i in range(16)] - n_jobs = len(mod_list) + log.info(f'Process modules {mod_list}') + njobs = n_jobs + if njobs==None: + njobs = len(mod_list) - jobs = [] + module_jobs = [] for m in mod_list: - jobs.append(dict( + module_jobs.append(dict( proposal=self.proposal, run_nr=self.runnr, module=m, @@ -212,14 +222,11 @@ class DSSCBinner: )) data = None - if use_joblib: - log.info(f'using joblib module for multithreading') - data = joblib.Parallel(n_jobs=n_jobs) \ - (joblib.delayed(bin_data)(**jobs[i]) for i in range(n_jobs)) - else: - log.info(f'using multiprocessing module for multithreading') - with multiprocessing.Pool(n_jobs) as pool: - data = pool.map(bin_data_multipr, jobs) + + log.info(f'using parallelization backend {backend}') + data = joblib.Parallel(n_jobs=njobs, backend=backend) \ + (joblib.delayed(bin_data)(**module_jobs[i]) \ + for i in range(len(mod_list))) data = xr.concat(data, dim='module') data = data.assign_coords(module=("module", np.array(mod_list))) diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py index ff7a2fd..f0ee373 100644 --- a/src/toolbox_scs/detectors/dssc_processing.py +++ b/src/toolbox_scs/detectors/dssc_processing.py @@ -104,19 +104,6 @@ def load_chunk_data(sel, sourcename, maxframes=None): return data.loc[{'pulse': np.s_[:maxframes]}] -def bin_data_multipr(job): - """ - This method calls the binning routine when using the multiprocessing - module. It is needed due to the syntax in the main function call. For more - information see "bin_data". - - Returns - ------- - module_data: xarray.Dataset - """ - return bin_data(**job) - - def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners, pulsemask=None): """ -- GitLab