From 27a9ca24e24b3c45b585ca3efb3feb3c7b107b00 Mon Sep 17 00:00:00 2001 From: Rafael Gort <rafael.gort@xfel.eu> Date: Fri, 25 Sep 2020 18:29:18 +0200 Subject: [PATCH] Adaptation to processing routines such that they avoid return values. Test suites to be updated --- src/toolbox_scs/detectors/__init__.py | 3 ++- src/toolbox_scs/detectors/dssc.py | 10 ++++----- src/toolbox_scs/detectors/dssc_data.py | 8 +++++++ src/toolbox_scs/detectors/dssc_processing.py | 22 +++++++++++--------- 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index 545a784..8189827 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -3,7 +3,7 @@ from .xgm import ( from .tim import ( load_TIM,) from .dssc_data import ( - save_xarray, load_xarray, get_data_formatted) + save_xarray, load_xarray, get_data_formatted, save_attributes_h5) from .dssc_misc import ( load_dssc_info, create_dssc_bins, quickmask_DSSC_ASIC, get_xgm_formatted, load_mask) @@ -27,6 +27,7 @@ __all__ = ( "save_xarray", "load_xarray", "get_data_formatted", + "save_attributes_h5", "quickmask_DSSC_ASIC", "load_mask", # Classes diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 2c3e726..333abe4 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -182,7 +182,7 @@ class DSSCBinner: # ------------------------------------------------------------------------- # Data processing # ------------------------------------------------------------------------- - def process_data(self, modules=[], + def process_data(self, modules=[], filepath='./', chunksize=512, backend='loky', n_jobs=None, dark_image=None, xgm_normalization=False, normevery=1 @@ -240,6 +240,7 @@ class DSSCBinner: run_nr=self.runnr, module=m, chunksize=chunksize, + path=filepath, info=self.info, dssc_binners=self.binners, pulsemask=self.pulsemask, @@ -252,14 +253,13 @@ class DSSCBinner: data = None log.info(f'using parallelization backend {backend}') - data = joblib.Parallel(n_jobs=njobs, backend=backend) \ + joblib.Parallel(n_jobs=njobs, backend=backend) \ (joblib.delayed(process_dssc_data)(**module_jobs[i]) \ for i in range(len(mod_list))) - data = xr.concat(data, dim='module') - data = data.assign_coords(module=("module", np.array(mod_list))) + #data = xr.concat(data, dim='module') + #data = data.assign_coords(module=("module", np.array(mod_list))) log.info(f'Binning done') - return data class DSSCFormatter: diff --git a/src/toolbox_scs/detectors/dssc_data.py b/src/toolbox_scs/detectors/dssc_data.py index f38afe6..fa918bb 100644 --- a/src/toolbox_scs/detectors/dssc_data.py +++ b/src/toolbox_scs/detectors/dssc_data.py @@ -157,6 +157,11 @@ def get_data_formatted(filenames=[], data_list=[]): elif any(data_list) is True: data = data_list + mod_list = [] + for d in data: + if 'module' in d.attrs.keys(): + mod_list.append(d.attrs['module']) + if type(data[0]).__module__ == 'xarray.core.dataset': data = xr.concat(data, dim='module') elif type(data[0]).__module__ == 'pandas.core.frame': @@ -164,7 +169,10 @@ def get_data_formatted(filenames=[], data_list=[]): elif type(data[0]).__module__ == 'dask.dataframe.core': pass + if mod_list is not None: + data = data.assign_coords(module=("module", mod_list)) data = data.sortby("module") + data.attrs.clear() return data.transpose('trainId', 'pulse', 'module', 'x', 'y') diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py index fb8eb0d..256e13f 100644 --- a/src/toolbox_scs/detectors/dssc_processing.py +++ b/src/toolbox_scs/detectors/dssc_processing.py @@ -4,6 +4,7 @@ comment: contributions should comply with pep8 code structure guidelines. """ import logging +import os from tqdm import tqdm import numpy as np @@ -14,6 +15,7 @@ import extra_data as ed from .dssc_misc import get_xgm_formatted from ..constants import mnemonics as _mnemonics +from .dssc_data import save_xarray log = logging.getLogger(__name__) @@ -134,11 +136,12 @@ def _load_chunk_xgm(sel, xgm_mnemonic, stride): return d -def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, +def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, + path='./', pulsemask=None, dark_image=None, xgm_normalization=False, - xgm_mnemonic='SCS_SA3', + xgm_mnemonic='SCS_SA3', normevery=1 ): """ @@ -195,10 +198,6 @@ def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, # create empty dataset for dssc data to be filled iteratively module_data = create_empty_dataset(info, dssc_binners) - # progress bar for module 15 - if module == 15: - pbar = tqdm(total=len(chunks)) - # load data chunk by chunk and merge result into prepared empty dataset for chunk in chunks: log.debug(f"Module {module}: " @@ -263,12 +262,15 @@ def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, log.debug(f"Module {module}: " f"processed trains {chunk}:{chunk + chunksize}") - if module == 15: - pbar.update(1) - log.debug(f'Module {module}: calculate mean') module_data['data'] = module_data['data'] / module_data['hist'] module_data = module_data.transpose('trainId', 'pulse', 'x', 'y') + module_data.attrs['module'] = module + + log.info(f'saving module {module}') + if not os.path.isdir(path): + os.mkdir(path) + fname = f'run_{run_nr}_module{module}.h5' + save_xarray(path+fname, module_data, mode='a') log.info(f"processing module {module}: done") - return module_data -- GitLab