Skip to content
Snippets Groups Projects
Commit 1dae9334 authored by Rafael Gort's avatar Rafael Gort
Browse files

Genralized joblib implementation

parent 7c63afde
No related branches found
Tags 1.0.3-rc2
2 merge requests!98WIP: Separate digitizers from XGM, assign absolute pulse ID to XGM and digitizers data,!87Introduce package structure, generalized binning principle, ...
1.0.3-rc1
1.0.3-rc2
......@@ -8,7 +8,7 @@ from .dssc_misc import (
load_dssc_info, create_dssc_bins, load_geom, quickmask_DSSC_ASIC,
calc_xgm_frame_indices, get_xgm_formatted, load_mask)
from .dssc_processing import (
bin_data_multipr, bin_data)
bin_data)
from .dssc import (
DSSCBinner, DSSCFormatter, DSSCAnalyzer)
from .azimuthal_integrator import AzimuthalIntegrator
......@@ -23,7 +23,6 @@ __all__ = (
"create_dssc_bins",
"calc_xgm_frame_indices",
"get_xgm_formatted",
"bin_data_multipr",
"bin_data",
"save_xarray",
"load_xarray",
......
......@@ -28,7 +28,7 @@ from .dssc_data import (
from .dssc_misc import (
load_dssc_info, get_xgm_formatted, load_geom)
from .dssc_processing import (
bin_data, bin_data_multipr, create_empty_dataset)
bin_data, create_empty_dataset)
__all__ = ["DSSCBinner", "DSSCFormatter", "DSSCAnalyzer"]
log = logging.getLogger(__name__)
......@@ -172,15 +172,22 @@ class DSSCBinner:
# -------------------------------------------------------------------------
# Data processing
# -------------------------------------------------------------------------
def bin_data(self, use_joblib=True, modules=[], chunksize=512):
def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None):
"""
Load and bin dssc data according to self.bins
Parameters
----------
use_joblib: bool
use the joblib module for multithreading. If set to false the
multiprocessing module is used.
backend: str
joblib multiprocessing backend to be used. At the moment it can be
any of joblibs standard backends: 'loky' (default),
'multiprocessing', 'threading'. Anything else than the default is
experimental and not appropriately implemented in the dbdet member
function 'bin_data'.
n_jobs: int
number of cpu's used per sub-process. Note that when using the
default backend there is no need to adjust this parameter with the
current implementation.
modules: list of ints
a list containing the module numbers that should be processed. If
empty, all modules are processed.
......@@ -198,11 +205,14 @@ class DSSCBinner:
mod_list = modules
if len(mod_list)==0:
mod_list = [i for i in range(16)]
n_jobs = len(mod_list)
log.info(f'Process modules {mod_list}')
njobs = n_jobs
if njobs==None:
njobs = len(mod_list)
jobs = []
module_jobs = []
for m in mod_list:
jobs.append(dict(
module_jobs.append(dict(
proposal=self.proposal,
run_nr=self.runnr,
module=m,
......@@ -212,14 +222,11 @@ class DSSCBinner:
))
data = None
if use_joblib:
log.info(f'using joblib module for multithreading')
data = joblib.Parallel(n_jobs=n_jobs) \
(joblib.delayed(bin_data)(**jobs[i]) for i in range(n_jobs))
else:
log.info(f'using multiprocessing module for multithreading')
with multiprocessing.Pool(n_jobs) as pool:
data = pool.map(bin_data_multipr, jobs)
log.info(f'using parallelization backend {backend}')
data = joblib.Parallel(n_jobs=njobs, backend=backend) \
(joblib.delayed(bin_data)(**module_jobs[i]) \
for i in range(len(mod_list)))
data = xr.concat(data, dim='module')
data = data.assign_coords(module=("module", np.array(mod_list)))
......
......@@ -104,19 +104,6 @@ def load_chunk_data(sel, sourcename, maxframes=None):
return data.loc[{'pulse': np.s_[:maxframes]}]
def bin_data_multipr(job):
"""
This method calls the binning routine when using the multiprocessing
module. It is needed due to the syntax in the main function call. For more
information see "bin_data".
Returns
-------
module_data: xarray.Dataset
"""
return bin_data(**job)
def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners,
pulsemask=None):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment