diff --git a/.gitignore b/.gitignore index 7309bfd11c3d8eef40fd646a2d6332a755f02eec..a44e9b685f7c47f2118467fc55437d864bd22971 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ src/*.egg* *.pyc *__pycache__* +tmp/ diff --git a/setup.py b/setup.py index 94f46be08390556815accdc8a57b58426dc651c5..9076e9155e56da1c0d690e125316e83fbf8a7727 100644 --- a/setup.py +++ b/setup.py @@ -23,6 +23,6 @@ setup(name='toolbox_scs', install_requires=[ 'xarray>=0.13.0', 'numpy', 'matplotlib', 'pandas', 'scipy', 'h5py', 'h5netcdf', - 'extra_data', 'euxfel_bunch_pattern', + 'extra_data', 'euxfel_bunch_pattern>=0.6', ], ) diff --git a/src/toolbox_scs/__init__.py b/src/toolbox_scs/__init__.py index 3e4867d43772f40b337d86e6ac92280a19a7c35e..fa7d6ca30e53e2b79fbcabfa191bdff94053c28f 100644 --- a/src/toolbox_scs/__init__.py +++ b/src/toolbox_scs/__init__.py @@ -1,4 +1,4 @@ -from .load import (load, concatenateRuns, load_binned_array, +from .load import (load, concatenateRuns, get_binned_array, load_run, run_by_path) from .constants import mnemonics @@ -7,7 +7,7 @@ __all__ = ( # functions "load", "concatenateRuns", - "load_binned_array", + "get_binned_array", "load_run", "run_by_path", # Classes diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index a3674192a2581bdf706ac6e96c4ac7a4123ddeef..50e9a129e095fa050633ee22541e93a961b1fa08 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -5,10 +5,12 @@ from .tim import ( from .dssc_data import ( save_to_file, load_from_file) from .dssc_misc import ( - load_dssc_info, load_geom, quickmask_DSSC_ASIC, calc_xgm_frame_indices) + load_dssc_info, load_geom, quickmask_DSSC_ASIC, calc_xgm_frame_indices, + create_xgm_pulsemask, get_xgm_binned, get_xgm_formatted) from .dssc_processing import ( - split_frames, bin_data_multipr, process_intra_train, bin_data) -from .dssc import DSSC + split_frames, bin_data_multipr, sum_trains_multipr, + sum_trains, bin_data) +from .dssc import DSSCBinner from .azimuthal_integrator import AzimuthalIntegrator __all__ = ( @@ -23,7 +25,7 @@ __all__ = ( "process_intra_train", "split_frames", # Classes - "DSSC", + "DSSCBinner", "AzimuthalIntegrator", # Variables ) diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index e729c97e8aabda5cdb76a78e2ca31dd21eb39f6a..6d5d5c49fd3c1423a080c397155952768705e826 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -7,10 +7,6 @@ its objects. comments: - - DSSC class methods will mostly use functions defined in other - files. The redefinition of its names makes it easy to adapt the code - in case we later rename the underlying functions in the course of - development. - contributions should comply with pep8 code structure guidelines. - Plot routines don't fit into objects since they are rather fluent. They have been outsourced to dssc_plot.py. Alternatively they could @@ -24,42 +20,31 @@ import multiprocessing import numpy as np import xarray as xr -from ..load import ( - load_run as _load_run, - load_binned_array as _load_binned_array, -) -from .xgm import ( - load_xgm as _load_xgm_data, - ) -from .dssc_misc import ( - load_dssc_info as _dssc_info, - calc_xgm_frame_indices as _xgm_frame_indices, - ) -from .dssc_data import ( - save_to_file as _save_to_file, - load_from_file as _load_from_file, - ) -from .dssc_processing import ( - process_intra_train as _process_intra_train, - bin_data_multipr as _bin_data_multipr, - bin_data as _bin_data, - ) - +from ..load import load_run, get_binned_array +from .dssc_data import save_to_file, load_from_file +from .dssc_misc import (load_dssc_info, calc_xgm_frame_indices, + create_xgm_pulsemask, get_xgm_formatted, get_xgm_binned) +from .dssc_processing import (bin_data, bin_data_multipr, sum_trains, + sum_trains_multipr) +__all__ = ["DSSCBinner", "DSSCImage"] log = logging.getLogger(__name__) def _setup_dir(): - for f in ['tmp', 'images', 'processed_runs']: + for f in ['tmp', 'processed_runs', 'images']: if not os.path.isdir(f): os.mkdir(f) -class DSSC: +class DSSCBinner: def __init__(self, proposal_nr, run_nr, is_dark=False, bin_variable_name=None, bin_size=None, - framepattern=['image'], xgm_threshold=(0, np.inf), - detector_distance=1 + bin_file='./tmp/scan.h5', + framepattern=['image'], + use_xgm = False, xgm_threshold=(0, np.inf), + use_tim = False, + mask_file='./tmp/mask.h5' ): """ (doc to be done) @@ -67,113 +52,85 @@ class DSSC: _setup_dir() # --------------------------------------------------------------------- - # Internal variables + # object (run) properties # --------------------------------------------------------------------- - self.proposal_nr = proposal_nr - self.run_nr = run_nr + self.proposal = proposal_nr + self.runnr = run_nr self.framepattern = framepattern - self.is_dark = is_dark - self.info = _dssc_info(proposal_nr, run_nr) + self.isdark = is_dark + self.info = load_dssc_info(proposal_nr, run_nr) self.fpt = self.info['frames_per_train'] - self.run = _load_run(proposal_nr, run_nr, include='*DA*') - self.xgm_threshold = xgm_threshold - + self.run = load_run(proposal_nr, run_nr, include='*DA*') + self.xgmthreshold = xgm_threshold # --------------------------------------------------------------------- - # Create file containing bins for later data reduction (bin_data()) + # Create file containing bins for later data reduction (binning()) # --------------------------------------------------------------------- - self.scanfile = './tmp/scan.h5' - self.bin_variable = self.get_bin_variable( - self.run, bin_variable_name, bin_size) - _save_to_file(self.bin_variable, self.scanfile, overwrite = True) + self.binfile = bin_file + self.bins = get_binned_array(self.run, bin_variable_name, + bin_size) + self.bins.name = 'scan_variable' + save_to_file(self.bins, bin_file, overwrite = True) # --------------------------------------------------------------------- - # Additional data reduction through masking + # Prepare pulse mask for additional data reduction # --------------------------------------------------------------------- - self.filter_mask = None - self.maskfile = './tmp/mask.h5' + self.maskfile = mask_file self.xgm = None - if not self.is_dark: - self.xgm = self.load_xgm() - self.create_filter_mask() + self.tim = None + arr = np.ones([len(self.run.train_ids), self.fpt], dtype=bool) + self.pulsemask = xr.DataArray(arr, + dims=['trainId', 'pulse'], + coords={'trainId': self.run.train_ids, + 'pulse': range(self.fpt)}) + if not is_dark: + if use_xgm: + self.xgm = get_xgm_formatted(self.run, framepattern) + self.pulsemask = create_xgm_pulsemask( + self.xgm, self.pulsemask, + framepattern, xgm_threshold) + save_to_file(self.pulsemask, mask_file, overwrite=True) - # --------------------------------------------------------------------- - # Detector geometry - # --------------------------------------------------------------------- - self.det_distance = detector_distance - self.px_pitch_h = 236 - self.px_pitch_v = 204 - self.aspect = self.px_pitch_v/self.px_pitch_h - self.geom = None - self.mask = None log.debug("Constructed DSSC object") def __del__(self): - # cleanup pass # ------------------------------------------------------------------------- - # Data handling + # Class internal data handling # ------------------------------------------------------------------------- - def load_geom(self): - pass - - def load_mask(self): - pass - - def load_xgm(self): - xgm = _load_xgm_data(self.run) - xgm_frame_coords = _xgm_frame_indices(xgm.shape[1], self.framepattern) - xgm['pulse'] = xgm_frame_coords - return xgm - - def create_filter_mask(self): - pulsemask = xr.DataArray( - np.ones([len(self.run.train_ids), self.fpt], dtype=bool), - dims=['trainId', 'pulse'], - coords={'trainId': self.run.train_ids, - 'pulse': range(self.fpt)}) - if self.xgm is not None: - n_frames_dark = len([p for p in self.framepattern if 'dark' in p]) - valid = (self.xgm > self.xgm_threshold[0]) * \ - (self.xgm < self.xgm_threshold[1]) - pulsemask = valid.combine_first(pulsemask).astype(bool) - nrejected = int(valid.size - valid.sum()) - percent_rejected = 100 * nrejected / valid.size - log.info(f'rejecting {nrejected} out of {valid.size} pulses' - f'({percent_rejected:.1f}%) due to xgm threshold') - - _save_to_file(pulsemask, self.maskfile, overwrite = True) - - def get_bin_variable(self, run, name, stepsize=None): - data = _load_binned_array(run, name, stepsize) - data.name = 'scan_variable' - return data - - def save_binned_data(self): - pass - - def merge_xgm_binned(self): - #pulses_no_dark = [p for p in framepattern if 'dark' not in p] - #if maskfile is not None: - #xgm = xgm.where(valid) - #xgm = tbdet.split_frames(xgm, pulses_no_dark, prefix='xgm_') - #xgm['scan_variable'] = scan_variable - #xgm = xgm.groupby('scan_variable').mean('trainId') - #module_data = xr.merge([module_data, xgm]) - #module_data = module_data.transpose( - # 'scan_variable', 'module', 'x', 'y') - pass - + def save_binned_data(self, module_data, + path="processed_runs", merge_xgm=True): + if merge_xgm and self.xgm is not None: + xgm_data = get_xgm_binned(self.xgm, self.xgmthreshold, + self.bins, self.framepattern) + module_data = xr.merge([module_data, xgm_data]) + module_data = module_data.transpose('scan_variable', 'module', + 'x', 'y') + fname = f'run{self.runnr}.h5' + save_to_file(module_data, fname, + path=path, overwrite = True) + + def save_summed_data(self, module_data, + path="processed_runs", merge_xgm=True): + if merge_xgm and self.xgm is not None: + self.xgm['pulse'] = np.arange(self.fpt, dtype=int) + self.xgm = self.xgm.mean('trainId') + self.xgm.name = 'xgm' + module_data = xr.merge([module_data, self.xgm]) + + fname = f'run{self.runnr}_summed.h5' + save_to_file(module_data, fname, + path=path, overwrite = True) # ------------------------------------------------------------------------- # Data processing # ------------------------------------------------------------------------- - def bin_data(self, use_joblib=False, process_modules=[], chunksize = 512): - log.info("Binning data according to bins given in scanfile") - log.info(f'Processing {chunksize} trains per chunk') + def bin_data(self, use_joblib=False, modules=[], chunksize = 248): + log.info("Bin data according to bins given in binfile") + log.info(f'Process {chunksize} trains per chunk') - mod_list = process_modules + mod_list = modules if any(mod_list) is False: mod_list = [i for i in range(16)] n_jobs = len(mod_list) @@ -181,28 +138,78 @@ class DSSC: jobs = [] for m in mod_list: jobs.append(dict( - proposal=self.proposal_nr, - run_nr=self.run_nr, + proposal=self.proposal, + run_nr=self.runnr, module=m, chunksize=chunksize, - binfile=self.scanfile, + binfile=self.binfile, framepattern=self.framepattern, - maskfile=None if self.is_dark else self.maskfile, + maskfile=None if self.isdark else self.maskfile, )) data = None if use_joblib: + log.info(f'using joblib module for multithreading') data = joblib.Parallel(n_jobs=n_jobs) \ - (joblib.delayed(_bin_data)(**jobs[i]) for i in range(n_jobs)) + (joblib.delayed(bin_data)(**jobs[i]) for i in range(n_jobs)) else: + log.info(f'using multiprocessing module for multithreading') with multiprocessing.Pool(n_jobs) as pool: - data = pool.map(_bin_data_multipr, jobs) + data = pool.map(bin_data_multipr, jobs) data = xr.concat(data, dim='module') data = data.dropna('scan_variable') - data['run'] = self.run_nr + data['run'] = self.runnr log.info(f'Binning done') return data - def azimuthal_integration(self): + def sum_trains(self, use_joblib=False, modules=[], chunksize = 248): + log.info("Sum dssc frames along trains") + log.info(f'Process {chunksize} trains per chunk') + + mod_list = modules + if any(mod_list) is False: + mod_list = [i for i in range(16)] + n_jobs = len(mod_list) + + jobs = [] + for m in mod_list: + jobs.append(dict( + proposal=self.proposal, + run_nr=self.runnr, + module=m, + chunksize=chunksize, + fpt=self.fpt, + )) + + data = None + if use_joblib: + log.info(f'using joblib module for multithreading') + data = joblib.Parallel(n_jobs=n_jobs) \ + (joblib.delayed(sum_trains)(**jobs[i]) for i in range(n_jobs)) + else: + log.info(f'using multiprocessing module for multithreading') + with multiprocessing.Pool(n_jobs) as pool: + data = pool.map(sum_trains_multipr, jobs) + + data = xr.concat(data, dim='module') + data['run'] = self.runnr + data = data.transpose('pulse', 'module', 'x', 'y') + log.info(f'Processing done') + return data + + +class DSSCImage: + def __init__(self, + distance=1, mask_file='' + ): + """ + (doc to be done) + """ + #self.distance = distance + #self.pxpitchh = 236 # horizontal pitch in microns + #self.pxpitchv = 204 # vertical pitch in microns + #self.aspect = self.pxpitchv/self.pxpitchh + #self.geom = self.load_geom() + #self.imagemask = self.load_mask(mask_file) pass diff --git a/src/toolbox_scs/detectors/dssc_misc.py b/src/toolbox_scs/detectors/dssc_misc.py index 9d070c453d1f7cf4fdfb3861b3c0966de4503263..33ff3019522f62216c223d0a5b62556f1c58a043 100644 --- a/src/toolbox_scs/detectors/dssc_misc.py +++ b/src/toolbox_scs/detectors/dssc_misc.py @@ -1,11 +1,6 @@ """ DSSC-related sub-routines. - original-author: Michael Schneider - authors: SCS-team members - - license: BSD 3-Clause License (see LICENSE_BSD for more info) - comment: contributions should comply with pep8 code structure guidelines. """ import logging @@ -15,9 +10,11 @@ from tqdm import tqdm import numpy as np import xarray as xr import pandas as pd +from imageio import imread, imsave import extra_data as ed from extra_geom import DSSC_1MGeometry +from .xgm import load_xgm from .dssc_processing import split_frames as _split_frames @@ -57,6 +54,7 @@ def calc_xgm_frame_indices(nbunches, framepattern): Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -85,6 +83,36 @@ def calc_xgm_frame_indices(nbunches, framepattern): return np.sort(np.concatenate(frame_indices)) +def get_xgm_formatted(run, framepattern): + xgm = load_xgm(run) + xgm_frame_coords = calc_xgm_frame_indices(xgm.shape[1], framepattern) + xgm['pulse'] = xgm_frame_coords + return xgm + + +def get_xgm_binned(xgm_data, xgm_threshold, bins, framepattern): + pulses_no_dark = [p for p in framepattern if 'dark' not in p] + valid = (xgm_data > xgm_threshold[0]) * \ + (xgm_data < xgm_threshold[1]) + xgm_data = xgm_data.where(valid) + xgm_data = _split_frames(xgm_data, pulses_no_dark, prefix='xgm_') + xgm_data['scan_variable'] = bins + return xgm_data.groupby('scan_variable').mean('trainId') + + +def create_xgm_pulsemask(xgm, pulsemask, framepattern, xgm_threshold): + n_frames_dark = len([p for p in framepattern if 'dark' in p]) + valid = (xgm > xgm_threshold[0]) * \ + (xgm < xgm_threshold[1]) + pulsemask = valid.combine_first(pulsemask).astype(bool) + + nrejected = int(valid.size - valid.sum()) + percent_rejected = 100 * nrejected / valid.size + log.info(f'rejecting {nrejected} out of {valid.size} pulses' + f'({percent_rejected:.1f}%) due to xgm threshold') + return pulsemask + + def load_geom(geopath=None, quad_pos=None): """ Loads and return the DSSC geometry. @@ -121,6 +149,10 @@ def quickmask_DSSC_ASIC(geom, poslist): Returns a mask for the given DSSC geometry with ASICs given in poslist blanked. poslist is a list of (module, row, column) tuples. Each module consists of 2 rows and 8 columns of individual ASICS. + + Copyright (c) 2019, Michael Schneider + Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) ''' mask = np.ones([16, 128, 512], dtype=float) # need floats to use NaN for (module, row, col) in poslist: @@ -129,21 +161,27 @@ def quickmask_DSSC_ASIC(geom, poslist): return geom.position_modules_fast(mask)[0] -def merge_xgm_binned(): - """ - if not is_dark: - pulses_no_dark = [p for p in framepattern if 'dark' not in p] - - if maskfile is not None: - xgm = xgm.where(valid) - xgm = _split_frames(xgm, pulses_no_dark, prefix='xgm_') - xgm['scan_variable'] = scan_variable - xgm = xgm.groupby('scan_variable').mean('trainId') - module_data = xr.merge([module_data, xgm]) - - return = module_data.transpose('scan_variable', 'module', 'x', 'y') +def load_mask(fname): + """ + Load a DSSC mask file. + + Copyright (c) 2019, Michael Schneider + Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) + + Parameters + ---------- + fname: str + string of the filename of the mask file + + Returns + ------- + dssc_mask: """ - pass + mask = imread(fname) + mask = dssc_mask.astype(float)[..., 0] // 255 + mask[dssc_mask==0] = np.nan + return mask def substract_dark(): diff --git a/src/toolbox_scs/detectors/dssc_plot.py b/src/toolbox_scs/detectors/dssc_plot.py index 9c66a8a567aec7db7a59c43a62c58e1f08914119..8351a7ff456c0786aca23513b8a8e5965a6eb2f6 100644 --- a/src/toolbox_scs/detectors/dssc_plot.py +++ b/src/toolbox_scs/detectors/dssc_plot.py @@ -29,6 +29,7 @@ def plot_xgm_threshold(xgm, if xgm_max: ax.axhline(xgm_max, c='r') + ax.set_xlabel('trainId') ax.set_ylabel('xgm') ax.set_title(f'run: {run_nr}') @@ -37,17 +38,17 @@ def plot_xgm_threshold(xgm, fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) -def plot_scanned_range(scan_variable, - xgm_min = None, xgm_max = None, - run_nr = '', - safe_fig = False): +def plot_1D(data, + dname = 'data', + run_nr = '', + safe_fig = False): fig = plt.figure() ax = fig.add_subplot(111) - ax.plot(scan_variable.trainId, scan_variable) + ax.plot(data.trainId, data) - ax.set_ylabel('scan variable') + ax.set_ylabel(dname) ax.set_xlabel('trainId') ax.set_title(f'run: {run_nr}') @@ -56,23 +57,24 @@ def plot_scanned_range(scan_variable, fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) -def plot_hist_binvar(scan, - run_nr = '', - safe_fig = False): +def plot_hist_1D(data, + dname = 'data', + run_nr = '', + safe_fig = False): - counts = xr.DataArray(np.ones(len(scan)), - dims=['scan_variable'], - coords={'scan_variable': scan.values}, + counts = xr.DataArray(np.ones(len(data)), + dims=[dname], + coords={dname: data.values}, name='counts') - counts = counts.groupby('scan_variable').sum() + counts = counts.groupby(dname).sum() fig = plt.figure() ax = fig.add_subplot(111) - ax.plot(counts.scan_variable, counts, 'o', ms=4) + ax.plot(counts[dname], counts, 'o', ms=4) - ax.set_xlabel('scan variable') + ax.set_xlabel(dname) ax.set_ylabel('number of trains') ax.set_title(f'run {run_nr}') ax.grid(True) diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py index 2c497183b6ff1f5e8b7f0451000cea8c1f3a65c6..a5ec365caa0f6d199ed813b56ea7c92158bcb2a6 100644 --- a/src/toolbox_scs/detectors/dssc_processing.py +++ b/src/toolbox_scs/detectors/dssc_processing.py @@ -1,11 +1,6 @@ """ DSSC-related sub-routines. - original-author: Michael Schneider - authors: SCS-team members - - license: BSD 3-Clause License (see LICENSE_BSD for more info) - comment: contributions should comply with pep8 code structure guidelines. """ import logging @@ -29,6 +24,7 @@ def prepare_module_empty(scan_variable, framepattern): Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -70,6 +66,7 @@ def load_chunk_data(sel, sourcename, maxframes=None): Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -112,6 +109,7 @@ def merge_chunk_data(module_data, chunk_data, framepattern): Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -147,6 +145,7 @@ def split_frames(data, pattern, prefix=''): Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -173,13 +172,28 @@ def split_frames(data, pattern, prefix=''): return dataset -def process_intra_train(job): +def sum_trains_multipr(job): + """ + This method simply calls the actual routine when using the multiprocessing + module. It just exists due to the slightly different syntax. For more + information have a look at "process_intra_train". + + Returns + ------- + module_data: xarray.Dataset + """ + return sum_trains(**job) + + +def sum_trains(proposal, run_nr, module, chunksize, fpt, + maxframes=None): """ Aggregate DSSC data (chunked, to fit into memory) for a single module. Averages over all trains, but keeps all pulses. Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -203,23 +217,15 @@ def process_intra_train(job): """ - proposal = job['proposal'] - run_nr = job['run_nr'] - module = job['module'] - chunksize = job['chunksize'] - fpt = job['fpt'] - maxframes = job.get('maxframes', None) - log.info(f"Processing dssc module {module}: start") sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' collection = ed.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') fpt = min(fpt, maxframes) if maxframes is not None else fpt + arr = np.zeros([fpt, 128, 512], dtype=float) dims = ['pulse', 'x', 'y'] coords = {'pulse': np.arange(fpt, dtype=int)} - shape = [fpt, 128, 512] - module_data = xr.DataArray(np.zeros(shape, dtype=float), - dims=dims, coords=coords) + module_data = xr.DataArray(arr, dims=dims, coords=coords) module_data = module_data.to_dataset(name='image') module_data['sum_count'] = xr.DataArray(np.zeros(fpt, dtype=int), dims=['pulse']) @@ -256,46 +262,16 @@ def process_intra_train(job): def bin_data_multipr(job): """ - Entry point for binning routines using the multrprocessing module - - Parameters - ---------- - job: dictionary - Designed for the multiprocessing module - expects a job dictionary with - the following keys: - proposal : int - proposal number - run_nr : int - run number - module : int - DSSC module to process - chunksize : int - number of trains to process simultaneously - binfile : str - name of hdf5 file with xarray.DataArray containing the - scan variable and trainIds - framepattern : list of str - names for the (possibly repeating) intra-train pulses. See - split_dssc_data - pulsemask : str - name of hdf5 file with boolean xarray.DataArray to - select/reject trains and pulses + This method simply calls the actual routine when using the multiprocessing + module. It just exists due to its slightly special syntax. For more + information have a look at "bin_data". Returns ------- module_data: xarray.Dataset """ - params = {} - params['proposal'] = job['proposal'] - params['run_nr'] = job['run_nr'] - params['module'] = job['module'] - params['chunksize'] = job['chunksize'] - params['binfile'] = job['binfile'] - params['framepattern'] = job.get('framepattern', ['image']) - params['maskfile'] = job.get('maskfile', None) - - return bin_data(**params) + return bin_data(**job) def bin_data(proposal, run_nr, module, chunksize, binfile, @@ -308,6 +284,7 @@ def bin_data(proposal, run_nr, module, chunksize, binfile, Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- diff --git a/src/toolbox_scs/load.py b/src/toolbox_scs/load.py index d8024bcd800c61cb5fea7a24c06263f24805ae3f..235581a9b57a23271b9cfd90e032b4f88bc055c5 100644 --- a/src/toolbox_scs/load.py +++ b/src/toolbox_scs/load.py @@ -240,7 +240,7 @@ def concatenateRuns(runs): return result -def load_binned_array(run, mnemonic_key=None, binsize=None): +def get_binned_array(run, mnemonic_key=None, binsize=None): """ Loads the required 1D-data and rounds its values to integer multiples of stepsize for consistent grouping (except for stepsize=None). diff --git a/src/toolbox_scs/test/test_dssc_methods.py b/src/toolbox_scs/test/test_dssc_methods.py index 3672822b77d7543a11fdaf6aa384a92c37437e9c..b5db1d8ba0a7c396349e7ce3791b9efe73ee5d5e 100644 --- a/src/toolbox_scs/test/test_dssc_methods.py +++ b/src/toolbox_scs/test/test_dssc_methods.py @@ -12,9 +12,8 @@ import xarray as xr import extra_data as ed import toolbox_scs as tb import toolbox_scs.detectors as tbdet -from toolbox_scs.detectors.dssc_routines import (load_chunk_data, - prepare_module_empty, split_frames, merge_chunk_data) -from toolbox_scs.detectors.dssc_data import save_to_file, load_from_file +from toolbox_scs.detectors.dssc_processing import (load_chunk_data, + prepare_module_empty, merge_chunk_data) from toolbox_scs.util.exceptions import ToolBoxFileError logging.basicConfig(level=logging.DEBUG) @@ -25,12 +24,7 @@ suites = {"metafunctions": ( "test_info", "test_calcindices", "test_createpulsemask", - "test_storage" - ), - "processing-related-methods": ( - "test_info", - "test_calcindices", - "test_createpulsemask", + "test_storage", "test_loadmergechunk", ), "image-processing": ( @@ -87,9 +81,9 @@ class TestDSSC(unittest.TestCase): cls._run = tb.load_run(cls._proposal, cls._run_nr, include='*DA*') - cls._scan_variable = tb.load_scan_variable(cls._run, - cls._scan_variable_name, - cls._stepsize) + cls._scan_variable = tb.load_binned_array(cls._run, + cls._scan_variable_name, + cls._stepsize) cls._scan_variable.to_netcdf(cls._scanfile, group='data', mode='w', engine='h5netcdf') cls._xgm = tbdet.load_xgm(cls._run) @@ -173,8 +167,8 @@ class TestDSSC(unittest.TestCase): chunk_data = chunk_data.where(pulsemask) sum_count = sum_count.where(pulsemask) - chunk_data = split_frames(chunk_data, cls._framepattern) - sum_count = split_frames(sum_count, cls._framepattern, + chunk_data = tbdet.split_frames(chunk_data, cls._framepattern) + sum_count = tbdet.split_frames(sum_count, cls._framepattern, prefix='sum_count_') chunk_data = xr.merge([chunk_data, sum_count]) chunk_data['scan_variable'] = cls._scan_variable @@ -186,9 +180,9 @@ class TestDSSC(unittest.TestCase): def test_processmodule(self): cls = self.__class__ - max_GB = 400 + max_GB = 300 chunksize = int(max_GB * 128 // cls._fpt) - chunksize = min(512, chunksize) + chunksize = min(512, chunksize) print('processing', chunksize, 'trains per chunk') jobs = [] @@ -198,15 +192,15 @@ class TestDSSC(unittest.TestCase): run_nr=cls._run_nr, module=m, chunksize=chunksize, - scanfile=cls._scanfile, + binfile=cls._scanfile, framepattern=cls._framepattern, maskfile=None if cls._is_dark else cls._maskfile, - maxframes=cls._maxframes,)) + )) print(f'start processing modules:', strftime('%X')) - with multiprocessing.Pool(2) as pool: - module_data = pool.map(tbdet.process_dssc_module, jobs) + with multiprocessing.Pool(16) as pool: + module_data = pool.map(tbdet.bin_data_multipr, jobs) print('finished processing modules:', strftime('%X')) self.assertIsNotNone(module_data) @@ -244,17 +238,18 @@ class TestDSSC(unittest.TestCase): def test_storage(self): cls = self.__class__ - save_to_file(cls._scan_variable, 'tmp/scan2.h5') - save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') - save_to_file(cls._scan_variable, 'tmp/scan3.h5', overwrite = True) - scandata = load_from_file('tmp/scan3.h5') + tbdet.save_to_file(cls._scan_variable, 'tmp/scan2.h5') + tbdet.save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') + tbdet.save_to_file(cls._scan_variable, 'tmp/scan3.h5', + overwrite = True) + scandata = tbdet.load_from_file('tmp/scan3.h5') self.assertIsNotNone(scandata) - maskdata = load_from_file('tmp/mask.h5') + maskdata = tbdet.load_from_file('tmp/mask.h5') self.assertIsNotNone(maskdata) with self.assertRaises(ToolBoxFileError) as cm: - save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') + tbdet.save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') self.assertEqual(cm.exception.value, './tmp/scan3.h5')