diff --git a/VERSION b/VERSION index 3fe66b1708b122e499c436ebab136407c277be84..32afd27a22df189c6c555942a557eaa26b17fcfb 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.3-rc1 +1.0.3-rc2 diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index ae82ccb34fd584c7aa54f5b1bd13d84f11acb47b..5059545ee600c791fa6147d42424b2f7dfeb88ef 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -8,7 +8,7 @@ from .dssc_misc import ( load_dssc_info, create_dssc_bins, load_geom, quickmask_DSSC_ASIC, calc_xgm_frame_indices, get_xgm_formatted, load_mask) from .dssc_processing import ( - bin_data_multipr, bin_data) + bin_data) from .dssc import ( DSSCBinner, DSSCFormatter, DSSCAnalyzer) from .azimuthal_integrator import AzimuthalIntegrator @@ -23,7 +23,6 @@ __all__ = ( "create_dssc_bins", "calc_xgm_frame_indices", "get_xgm_formatted", - "bin_data_multipr", "bin_data", "save_xarray", "load_xarray", diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 7068b1938ad89c0ee6be11807635ce7530ac2a45..f6e67591cf4cdc5f7e521d214527bb5c0f5c56f2 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -28,7 +28,7 @@ from .dssc_data import ( from .dssc_misc import ( load_dssc_info, get_xgm_formatted, load_geom) from .dssc_processing import ( - bin_data, bin_data_multipr, create_empty_dataset) + bin_data, create_empty_dataset) __all__ = ["DSSCBinner", "DSSCFormatter", "DSSCAnalyzer"] log = logging.getLogger(__name__) @@ -172,15 +172,22 @@ class DSSCBinner: # ------------------------------------------------------------------------- # Data processing # ------------------------------------------------------------------------- - def bin_data(self, use_joblib=True, modules=[], chunksize=512): + def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None): """ Load and bin dssc data according to self.bins Parameters ---------- - use_joblib: bool - use the joblib module for multithreading. If set to false the - multiprocessing module is used. + backend: str + joblib multiprocessing backend to be used. At the moment it can be + any of joblibs standard backends: 'loky' (default), + 'multiprocessing', 'threading'. Anything else than the default is + experimental and not appropriately implemented in the dbdet member + function 'bin_data'. + n_jobs: int + number of cpu's used per sub-process. Note that when using the + default backend there is no need to adjust this parameter with the + current implementation. modules: list of ints a list containing the module numbers that should be processed. If empty, all modules are processed. @@ -198,11 +205,14 @@ class DSSCBinner: mod_list = modules if len(mod_list)==0: mod_list = [i for i in range(16)] - n_jobs = len(mod_list) + log.info(f'Process modules {mod_list}') + njobs = n_jobs + if njobs==None: + njobs = len(mod_list) - jobs = [] + module_jobs = [] for m in mod_list: - jobs.append(dict( + module_jobs.append(dict( proposal=self.proposal, run_nr=self.runnr, module=m, @@ -212,14 +222,11 @@ class DSSCBinner: )) data = None - if use_joblib: - log.info(f'using joblib module for multithreading') - data = joblib.Parallel(n_jobs=n_jobs) \ - (joblib.delayed(bin_data)(**jobs[i]) for i in range(n_jobs)) - else: - log.info(f'using multiprocessing module for multithreading') - with multiprocessing.Pool(n_jobs) as pool: - data = pool.map(bin_data_multipr, jobs) + + log.info(f'using parallelization backend {backend}') + data = joblib.Parallel(n_jobs=njobs, backend=backend) \ + (joblib.delayed(bin_data)(**module_jobs[i]) \ + for i in range(len(mod_list))) data = xr.concat(data, dim='module') data = data.assign_coords(module=("module", np.array(mod_list))) diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py index ff7a2fdc8182b4bb0a083defdcd6d51b2a1c20e0..f0ee373b9fc9300a8c72ffc959b90dd56de9f8a2 100644 --- a/src/toolbox_scs/detectors/dssc_processing.py +++ b/src/toolbox_scs/detectors/dssc_processing.py @@ -104,19 +104,6 @@ def load_chunk_data(sel, sourcename, maxframes=None): return data.loc[{'pulse': np.s_[:maxframes]}] -def bin_data_multipr(job): - """ - This method calls the binning routine when using the multiprocessing - module. It is needed due to the syntax in the main function call. For more - information see "bin_data". - - Returns - ------- - module_data: xarray.Dataset - """ - return bin_data(**job) - - def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners, pulsemask=None): """