diff --git a/doc/dssc/DSSCBinner.rst b/doc/dssc/DSSCBinner.rst index 13e0fd56284d629790e7bc30092f42ee244f8f7a..75eddebb147c7914e3bbf3a9712fe534e820808a 100644 --- a/doc/dssc/DSSCBinner.rst +++ b/doc/dssc/DSSCBinner.rst @@ -1,51 +1,58 @@ .. code:: ipython3 - + import os + import logging + import importlib + from time import strftime + + import numpy as np + import xarray as xr + import toolbox_scs as tb import toolbox_scs.detectors as tbdet import toolbox_scs.detectors.dssc_plot as dssc_plot - - + + #logging.basicConfig(level=logging.DEBUG) .. code:: ipython3 - # global settings - proposal = 2212 - - # settings that change non-frequently. - # but we can change the ones that do later. - params = { - 'is_dark': False, - 'bin_variable_name': 'PP800_PhaseShifter', - 'bin_size': 0.03, - 'bin_file': './tmp/scan.h5', - 'framepattern': ['pumped', 'unpumped'], - 'use_xgm': True, - 'xgm_threshold': (0,np.inf)} + # settings that change infrequently. + proposal_nb = 2212 common run ---------- .. code:: ipython3 - # Define run parameters and create a DSSCBinner object - run = 235 - params['is_dark'] = False - run235 = tbdet.DSSCBinner(proposal, run, **params) - - -.. parsed-literal:: + # Define run parameters and create run object + run_nb = 235 + + # Collect information about run + run_obj = tb.load_run(proposal_nb, run_nb, include='*DA*') + detector_info = tbdet.load_dssc_info(proposal_nb, run_nb) + + # Create binners + bins_trainId = tb.get_array(run_obj, 'PP800_PhaseShifter', 0.03) + bins_pulse = ['pumped', 'unpumped'] * 10 + + binner1 = tbdet.create_dssc_bins("trainId", + detector_info['trainIds'], + bins_trainId.values) + binner2 = tbdet.create_dssc_bins("pulse", + np.linspace(0,19,20, dtype=int), + bins_pulse) + binners = {'trainId': binner1, 'pulse': binner2} - File ././tmp/scan.h5 existed: overwritten - File ././tmp/mask.h5 existed: overwritten +.. code:: ipython3 + run235 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners, use_xgm=True) .. code:: ipython3 # plot xgm and bin statistics - dssc_plot.plot_xgm_threshold(run235.xgm,run_nr=run) - dssc_plot.plot_1D(run235.bins, run_nr=run, dname='scan_variable') - dssc_plot.plot_hist_1D(run235.bins, run_nr=run, dname='scan_variable') + dssc_plot.plot_xgm_threshold(run235.xgm,run_nr=run_nb) + dssc_plot.plot_binner(binner1, xname='trainId', yname='PhaseShifter', run_nr=run_nb) + dssc_plot.plot_binner_hist(binner1, dname='Phase_Shifter', run_nr=run_nb) @@ -70,11 +77,13 @@ common run # save binned data to file in a predefined format. run235.save_binned_data(binned_data) +Process individual modules +-------------------------- -.. parsed-literal:: - - File processed_runs/run235.h5 existed: overwritten +.. code:: ipython3 + # binning or summing can be done for single modules as well + binned_data = run235.bin_data(use_joblib=True, modules=[5,11], chunksize=248) dark run -------- @@ -82,38 +91,38 @@ dark run .. code:: ipython3 # Define run parameters and create run object - run = 232 - params['is_dark'] = True - run232 = tbdet.DSSCBinner(proposal, run, **params) - - -.. parsed-literal:: - - File ././tmp/scan.h5 existed: overwritten - + run_nb = 232 + + # Collect information about run + run_obj = tb.load_run(proposal_nb, run_nb, include='*DA*') + detector_info = tbdet.load_dssc_info(proposal_nb, run_nb) + + # Create binners + bins_trainId = tb.get_array(run_obj, + 'PP800_PhaseShifter', + 0.03) + bins_pulse = ['image'] * 20 + + binner1 = tbdet.create_dssc_bins("trainId", + detector_info['trainIds'], + bins_trainId.values) + binner2 = tbdet.create_dssc_bins("pulse", + np.linspace(0,19,20, dtype=int), + bins_pulse) + binners = {'trainId': binner1, 'pulse': binner2} .. code:: ipython3 - dssc_plot.plot_1D(run232.bins, run_nr=run, dname='scan_variable') - dssc_plot.plot_hist_1D(run232.bins, run_nr=run, dname='scan_variable') - - - -.. image:: output_9_0.png - - - -.. image:: output_9_1.png - + run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) .. code:: ipython3 - # bin data + # bin data using multiprocessing module binned_data = run232.bin_data(use_joblib=True, chunksize = 248) .. code:: ipython3 - # save binned data to file + # save binned data to file in predefined format run232.save_binned_data(binned_data, merge_xgm=False) @@ -127,32 +136,26 @@ sum along trains (pulse to pulse changes) .. code:: ipython3 - run = 89 - run89 = tbdet.DSSCBinner(proposal, run, use_xgm=True) - - -.. parsed-literal:: - - File ././tmp/scan.h5 existed: overwritten - File ././tmp/mask.h5 existed: overwritten - + run_nb = 89 + + # Collect information about run + run_obj = tb.load_run(proposal_nb, run_nb, include='*DA*') + detector_info = tbdet.load_dssc_info(proposal_nb, run_nb) + + # Create binner along the trainId dimension (1bin for all values) + binner1 = tbdet.create_dssc_bins("trainId", + detector_info['trainIds'], + np.ones(len(detector_info['trainIds']))) + binners = {'trainId': binner1} .. code:: ipython3 - summed_data = run89.sum_trains(use_joblib=True, chunksize=248) + run89 = tbdet.DSSCBinner(proposal_nb, run_nb) .. code:: ipython3 - run89.save_summed_data(summed_data) - -Process individual modules --------------------------- + summed_data = run89.bin_data(use_joblib=False, chunksize = 512) .. code:: ipython3 - run = 235 - params['is_dark'] = False - run235 = tbdet.DSSCBinner(proposal, run, **params) - - binned_data = run235.bin_data(use_joblib=True, modules=[5,11], chunksize=248) - summed_data = run235.sum_trains(use_joblib=True, modules=[5,11], chunksize=248) + run89.save_summed_data(summed_data) diff --git a/doc/dssc/dssc_tbdet.rst b/doc/dssc/dssc_tbdet.rst deleted file mode 100644 index 9736c79c5c5d97f06a9d6fd9191dc663d9aae5bd..0000000000000000000000000000000000000000 --- a/doc/dssc/dssc_tbdet.rst +++ /dev/null @@ -1,234 +0,0 @@ -.. code:: ipython3 - - import os - import logging - import importlib - import joblib - import multiprocessing - from time import strftime - - import numpy as np - import xarray as xr - - import toolbox_scs as tb - import toolbox_scs.detectors as tbdet - import toolbox_scs.detectors.dssc_plot as dssc_plot - -.. code:: ipython3 - - for f in ['tmp', 'images', 'processed_runs']: - if not os.path.isdir(f): - os.mkdir(f) - - - -global settings -~~~~~~~~~~~~~~~ - -.. code:: ipython3 - - proposal = 2212 - - framepattern = ['pumped', 'unpumped'] - # framepattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark'] - # framepattern = ['image'] - stepsize = .05 - scan_variable_name = 'PP800_PhaseShifter' - xgm_min = 0 - xgm_max = np.inf - - scanfile = './tmp/scan.h5' - maskfile = './tmp/mask.h5' - -common run ----------- - -.. code:: ipython3 - - # run specific information - run_nr = 235 - is_dark = False - -.. code:: ipython3 - - # load run metadata - info = tbdet.load_dssc_info(proposal, run_nr) - fpt = info['frames_per_train'] - -.. code:: ipython3 - - # load run - run = tb.load_run(proposal, run_nr, include='*DA*') - -.. code:: ipython3 - - # get a binned array for the source that was "scanned", and save for - # later use. - binned_scan_data = tb.get_binned_array(run, scan_variable_name, stepsize) - binned_scan_data.name = 'scan_variable' - tbdet.save_to_file(binned_scan_data, scanfile, overwrite = True) - - -.. parsed-literal:: - - File ././tmp/scan.h5 existed: overwritten - - -.. code:: ipython3 - - # load xgm data and add coordinates according to given framepattern - if not is_dark: - xgm = tbdet.load_xgm(run) - xgm_frame_coords = tbdet.calc_xgm_frame_indices( - xgm.shape[1], framepattern) - xgm['pulse'] = xgm_frame_coords - -.. code:: ipython3 - - # create a mask that can be used to discard certain dssc frames. - if maskfile is not None: - pulsemask = xr.DataArray( - np.ones([len(run.train_ids), fpt], dtype=bool), - dims=['trainId', 'pulse'], - coords={'trainId': run.train_ids, - 'pulse': range(fpt)}) - - if not is_dark: - n_frames_dark = len([p for p in framepattern if 'dark' in p]) - valid = (xgm > xgm_min) * (xgm < xgm_max) - pulsemask = valid.combine_first(pulsemask).astype(bool) - nrejected = int(valid.size - valid.sum()) - percent_rejected = 100 * nrejected / valid.size - - print(f'rejecting {nrejected} out of {valid.size} pulses' - f'({percent_rejected:.1f}%) due to xgm threshold') - - tbdet.save_to_file(pulsemask, maskfile, overwrite = True) - - -.. parsed-literal:: - - File ././tmp/mask.h5 existed: overwritten - - -.. code:: ipython3 - - # some plots - if not is_dark: - dssc_plot.plot_xgm_threshold(xgm,run_nr=run_nr) - - dssc_plot.plot_1D(binned_scan_data, run_nr=run_nr, dname='scan_variable') - dssc_plot.plot_hist_1D(binned_scan_data, run_nr=run_nr, dname='scan_variable') - - -.. image:: xgm_threshold.png - - - -.. image:: plot1D.png - - - -.. image:: hist1D.png - - -bin data -~~~~~~~~ - -.. code:: ipython3 - - # prepare jobs for parallel binning of dssc frames - - max_GB = 100 - - # max_GB / (8byte * 16modules * 128px * 512px * N_pulses) - chunksize = int(max_GB * 128 // fpt) - chunksize = min(512, chunksize) - print('processing', chunksize, 'trains per chunk') - - jobs = [] - for m in range(16): - jobs.append(dict( - proposal=proposal, - run_nr=run_nr, - module=m, - chunksize=chunksize, - binfile=scanfile, - framepattern=framepattern, - maskfile=None if is_dark else maskfile, - )) - - -.. parsed-literal:: - - processing 512 trains per chunk - - -.. code:: ipython3 - - # bin data using the joblib module - - %%time - print(f'start time:', strftime('%X')) - - module_data = joblib.Parallel(n_jobs=16)(joblib.delayed(tbdet.bin_data)(**jobs[i]) for i in range(16)) - - print('finished:', strftime('%X')) - - module_data = xr.concat(module_data, dim='module') - module_data = module_data.dropna('scan_variable') - module_data['run'] = run_nr - -.. code:: ipython3 - - %%time - # bin data using the multiprocessing module - print(f'start time:', strftime('%X')) - - with multiprocessing.Pool(16) as pool: - module_data = pool.map(tbdet.bin_data_multipr, jobs) - - print('finished:', strftime('%X')) - - module_data = xr.concat(module_data, dim='module') - module_data = module_data.dropna('scan_variable') - module_data['run'] = run_nr - - -Store processed data -~~~~~~~~~~~~~~~~~~~~ - -.. code:: ipython3 - - # Merge xgm data into binned module data - - if not is_dark: - pulses_no_dark = [p for p in framepattern if 'dark' not in p] - - if maskfile is not None: - xgm = xgm.where(valid) - xgm = tbdet.split_frames(xgm, pulses_no_dark, prefix='xgm_') - xgm['scan_variable'] = binned_scan_data - xgm = xgm.groupby('scan_variable').mean('trainId') - module_data = xr.merge([module_data, xgm]) - - module_data = module_data.transpose('scan_variable', 'module', 'x', 'y') - -.. code:: ipython3 - - # Save processed data for later use - - prefix = '' - overwrite = True - - save_folder = './processed_runs/' - - fname = f'{prefix}run{run_nr}.h5' - - tbdet.save_to_file(module_data, fname, path=save_folder, overwrite = True) - - -.. parsed-literal:: - - File ./processed_runs/run232.h5 existed: overwritten - diff --git a/doc/dssc/hist1D.png b/doc/dssc/hist1D.png index 6412cb53d046fa92dd794ede27b1f127f422b34a..8bce7dd4b7a74d5456a33613dcb87fc3d273b3c7 100644 Binary files a/doc/dssc/hist1D.png and b/doc/dssc/hist1D.png differ diff --git a/doc/dssc/output_9_0.png b/doc/dssc/output_9_0.png deleted file mode 100644 index 710e3ca9513e1b2606e484b25d9dabb6764019e5..0000000000000000000000000000000000000000 Binary files a/doc/dssc/output_9_0.png and /dev/null differ diff --git a/doc/dssc/output_9_1.png b/doc/dssc/output_9_1.png deleted file mode 100644 index d9491ddc39dbb986d76832a63a37447d89d0f6a3..0000000000000000000000000000000000000000 Binary files a/doc/dssc/output_9_1.png and /dev/null differ diff --git a/doc/dssc/plot1D.png b/doc/dssc/plot1D.png index 10d4851cf7b785d620d18688cf7c0d996233127b..0038c546b44cb5eed462b558f6cd8dd8ca9a1d8d 100644 Binary files a/doc/dssc/plot1D.png and b/doc/dssc/plot1D.png differ diff --git a/doc/howtos.rst b/doc/howtos.rst index 1b48d92fb9f644fa60b4813b74b2931e62622413..aac8797f2d698f250422a21568e4654377b821f7 100644 --- a/doc/howtos.rst +++ b/doc/howtos.rst @@ -17,9 +17,9 @@ detectors (dssc) Most of the functions within toolbox_scs.detectors can be accessed directly. This is useful during development, or when working in a non-standardized way, which is often neccessary during data evaluation. For frequent routines there is the possibility to use dssc objects that guarantee consistent data structure, and reduce the amount of recurring code within the notebook. -* :doc:`bin data using toolbox_scs.tbdet <dssc/dssc_tbdet>`. +* bin data using toolbox_scs.tbdet -> *to be documented*. * :doc:`bin data using the DSSCBinner <dssc/DSSCBinner>`. -* post processing -> *to be documented* +* post processing, data analysis -> *to be documented* routines -------- diff --git a/src/toolbox_scs/__init__.py b/src/toolbox_scs/__init__.py index fbd1b53d9d4b8088ffbd8fedef452329147e3f6f..fede7d9ce230df11adbd2ff392a0f828da41171b 100644 --- a/src/toolbox_scs/__init__.py +++ b/src/toolbox_scs/__init__.py @@ -1,4 +1,4 @@ -from .load import (load, concatenateRuns, get_binned_array, +from .load import (load, concatenateRuns, get_array, load_run, run_by_path) from .constants import mnemonics @@ -7,7 +7,7 @@ __all__ = ( # functions "load", "concatenateRuns", - "get_binned_array", + "get_array", "load_run", "run_by_path", # Classes diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index e90d281d362c6bb5e9a353a8cf505400f4415b0b..b5c2d16e7d532e44f73dda8cfc5050038b00bad4 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -5,11 +5,10 @@ from .tim import ( from .dssc_data import ( save_to_file, load_from_file) from .dssc_misc import ( - load_dssc_info, load_geom, quickmask_DSSC_ASIC, calc_xgm_frame_indices, - create_xgm_pulsemask, get_xgm_binned, get_xgm_formatted, load_mask) + load_dssc_info, create_dssc_bins, load_geom, quickmask_DSSC_ASIC, + calc_xgm_frame_indices, get_xgm_formatted, load_mask) from .dssc_processing import ( - split_frames, bin_data_multipr, sum_trains_multipr, - sum_trains, bin_data) + bin_data_multipr, bin_data) from .dssc import DSSCBinner from .azimuthal_integrator import AzimuthalIntegrator @@ -20,15 +19,11 @@ __all__ = ( "load_TIM", "matchXgmTimPulseId", "load_dssc_info", + "create_dssc_bins", "calc_xgm_frame_indices", - "create_xgm_pulsemask", - "get_xgm_binned", "get_xgm_formatted", "bin_data_multipr", "bin_data", - "sum_trains", - "sum_trains_multipr", - "split_frames", "save_to_file", "load_from_file", "load_geom", diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 4883c60d69c1f2267ffc09dd4bb96c64c593d3f7..1b30bcb36d2cfd8d17fbc67bca337d0a5fc82bab 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -20,35 +20,33 @@ import multiprocessing import numpy as np import xarray as xr -from ..load import load_run, get_binned_array +from ..load import load_run, get_array from .dssc_data import save_to_file from .dssc_misc import ( - load_dssc_info, create_xgm_pulsemask, get_xgm_formatted, - get_xgm_binned) + load_dssc_info, get_xgm_formatted) from .dssc_processing import ( - bin_data, bin_data_multipr, sum_trains, sum_trains_multipr) + bin_data, bin_data_multipr) __all__ = ["DSSCBinner", "DSSCImager"] log = logging.getLogger(__name__) def _setup_dir(): - for f in ['tmp', 'processed_runs', 'images']: + for f in ['processed_runs']: if not os.path.isdir(f): os.mkdir(f) class DSSCBinner: def __init__(self, proposal_nr, run_nr, - is_dark=False, bin_variable_name=None, bin_size=None, - bin_file='./tmp/scan.h5', - framepattern=['image'], - use_xgm=False, xgm_threshold=(0, np.inf), - use_tim=False, - mask_file='./tmp/mask.h5'): + binners = {}, + use_xgm=False, xgm_threshold=(0, np.inf), xgm_bins=None, + use_tim=False + ): """ - A standardized binner object. Loads an array according to which the - dssc images shall be binned. + A dssc binner object. Loads and bins the dssc data according to the + bins specified in 'binners'. The data can be reduced further through + masking using XGMÂ or TIM data. Parameters ---------- @@ -56,26 +54,16 @@ class DSSCBinner: proposal number containing run folders run_nr: int, str run number - is_dark: bool - In case of a dark run, no xgm data will be loaded - bin_variable_name: str - A valid name from the list of mnemonics. The dssc frames will be - binned according to this data. - bin_size: float - The step width of the aforementioned array of bins. - bin_file: str - file in which the bins will be stored temporarily. - framepattern: list of strings - The name of bins along the pulse dimension. + binners: dictionary + dictionary containing binners constructed using the + 'create_dssc_bins' tbdet-method. use_xgm: bool - in case the run is not a dark run the xgm data can be discarded. + in case run is not a dark the xgm data can be used to create a + pulsemask for additional data filtering. xgm_threshold: tuple the lower and upper boundary of xgm values rendering valid data. use_tim: bool - The frames might also be sorted according to the tim data. - mask_file: str - From the xgm data a mask is constructed that can be used to filter - dssc images during the binning process. + -> to be implemented. Same role as 'use_xgm' Returns ------- @@ -83,31 +71,14 @@ class DSSCBinner: Raises ------ - ToolBoxValueError: Exception - in case of invalid mnemonic, ... + ToDo.... Example ------- + 1.) quick -> generic bins, no xgm, >>> import toolbox_scs.detectors as tbdet - >>> # quick -> generic bins, no xgm, proposal 2212, run 235 - >>> run235 = tbdet.DSSCBinner(2212, 235) - >>> # detailed -> set parameters explicitly - >>> params = { - ... 'is_dark': False, - ... 'bin_variable_name': 'PP800_PhaseShifter', - ... 'bin_size': 0.03, - ... 'bin_file': './tmp/scan.h5', - ... 'framepattern': ['pumped', 'unpumped'], - ... 'use_xgm': True, - ... 'xgm_threshold': (0,np.inf)} - >>> run235 = tbdet.DSSCBinner(2212, 235, **params) - - Comments - -------- - binning: The constructed bins have equal spacing. It is possible to - overwrite the bins manually (self.bins = ...). A future version of - the class will be more general in a sense that it will generically - bin along any dimensions of the dssc data. + >>> run235 = tbdet.DSSCBinner(proposal_nb=2212, run_nb=235) + 2.) detailed -> docs """ _setup_dir() @@ -116,40 +87,32 @@ class DSSCBinner: # --------------------------------------------------------------------- self.proposal = proposal_nr self.runnr = run_nr - self.framepattern = framepattern - self.isdark = is_dark self.info = load_dssc_info(proposal_nr, run_nr) - self.fpt = self.info['frames_per_train'] self.run = load_run(proposal_nr, run_nr, include='*DA*') self.xgmthreshold = xgm_threshold + self.binners = binners # --------------------------------------------------------------------- - # Create file containing bins for later data reduction (binning()) + # Prepare pulse mask for additional data reduction next to binning # --------------------------------------------------------------------- - self.binfile = bin_file - self.bins = get_binned_array(self.run, bin_variable_name, - bin_size) - self.bins.name = 'scan_variable' - save_to_file(self.bins, bin_file, overwrite=True) - - # --------------------------------------------------------------------- - # Prepare pulse mask for additional data reduction - # --------------------------------------------------------------------- - self.maskfile = mask_file self.xgm = None - self.tim = None - arr = np.ones([len(self.run.train_ids), self.fpt], dtype=bool) - self.pulsemask = xr.DataArray(arr, + fpt = self.info['frames_per_train'] + data = np.ones([len(self.run.train_ids), fpt], dtype=bool) + self.pulsemask = xr.DataArray(data, dims=['trainId', 'pulse'], coords={'trainId': self.run.train_ids, - 'pulse': range(self.fpt)}) - if not is_dark: - if use_xgm: - self.xgm = get_xgm_formatted(self.run, framepattern) - self.pulsemask = create_xgm_pulsemask( - self.xgm, self.pulsemask, - framepattern, xgm_threshold) - save_to_file(self.pulsemask, mask_file, overwrite=True) + 'pulse': range(fpt)}) + if use_xgm: + self.xgm = get_xgm_formatted(self.run, xgm_bins) + valid = (self.xgm > self.xgmthreshold[0]) * \ + (self.xgm < self.xgmthreshold[1]) + self.pulsemask = \ + valid.combine_first(self.pulsemask).astype(bool) + + nrejected = int(valid.size - valid.sum()) + percent_rejected = 100 * nrejected / valid.size + log.info(f'rejecting {nrejected} out of {valid.size} pulses' + f'({percent_rejected:.1f}%) due to xgm threshold') log.debug("Constructed DSSC object") @@ -159,41 +122,16 @@ class DSSCBinner: # ------------------------------------------------------------------------- # Class internal data handling # ------------------------------------------------------------------------- - def save_binned_data(self, module_data, - path="processed_runs", merge_xgm=True): + def save_binned_data(self, data, + path="processed_runs", + merge_xgm=False, merge_tim=False): """ Save binnded data in a standardized manner. Alternatively, one can use the "save_to_file" method provided via toolbox_scs.detectors. Parameters ---------- - module_data: xarray.DataSet - previously binned data to be saved to file. - path: str - folder, where the data should be stored. - merge_xgm: bool - In case the xgm data has been loaded previously it can be merged - with module_data before being stored. - """ - if merge_xgm and self.xgm is not None: - xgm_data = get_xgm_binned(self.xgm, self.xgmthreshold, - self.bins, self.framepattern) - module_data = xr.merge([module_data, xgm_data]) - module_data = module_data.transpose('scan_variable', 'module', - 'x', 'y') - fname = f'run{self.runnr}.h5' - save_to_file(module_data, fname, - path=path, overwrite=True) - - def save_summed_data(self, module_data, - path="processed_runs", merge_xgm=True): - """ - Save summed data in a standardized manner. Alternatively, one can - use the "save_to_file" method provided via toolbox_scs.detectors. - - Parameters - ---------- - module_data: xarray.DataSet + data: xarray.DataSet previously binned data to be saved to file. path: str folder, where the data should be stored. @@ -202,13 +140,12 @@ class DSSCBinner: with module_data before being stored. """ if merge_xgm and self.xgm is not None: - self.xgm['pulse'] = np.arange(self.fpt, dtype=int) - self.xgm = self.xgm.mean('trainId') - self.xgm.name = 'xgm' - module_data = xr.merge([module_data, self.xgm]) + pass + if merge_tim and self.tim is not None: + pass - fname = f'run{self.runnr}_summed.h5' - save_to_file(module_data, fname, + fname = f'run{self.runnr}.h5' + save_to_file(data, fname, path=path, overwrite=True) # ------------------------------------------------------------------------- @@ -233,13 +170,8 @@ class DSSCBinner: ------- data: xarray.DataArray DataArray containing the processed data. - - Example - ------- - >>> run235 = tbdet.DSSCBinner(2212, 235) - >>> binned_data = run235.bin_data(use_joblib=True, chunksize=248) """ - log.info("Bin data according to bins given in binfile") + log.info("Bin data according to binners") log.info(f'Process {chunksize} trains per chunk') mod_list = modules @@ -254,9 +186,8 @@ class DSSCBinner: run_nr=self.runnr, module=m, chunksize=chunksize, - binfile=self.binfile, - framepattern=self.framepattern, - maskfile=None if self.isdark else self.maskfile, + info=self.info, + dssc_binners=self.binners, )) data = None @@ -270,77 +201,29 @@ class DSSCBinner: data = pool.map(bin_data_multipr, jobs) data = xr.concat(data, dim='module') - data = data.dropna('scan_variable') - data['run'] = self.runnr log.info(f'Binning done') return data - def sum_trains(self, use_joblib=False, modules=[], chunksize=248): - """ - Load and sum dssc data along the train-id dimension. This can be used - to look at intra-train behaviour. - - Parameters - ---------- - use_joblib: bool - use the joblib module for multithreading. If set to false the - multiprocessing module is used. - modules: list of ints - a list containing the module numbers that should be processed. If - empty, all modules are processed. - chunksize: int - The number of trains that should be read in one iterative step. - Returns - ------- - data: xarray.DataArray - DataArray containing the processed data. +class DSSCFormatter: + def __init__(self): """ - log.info("Sum dssc frames along trains") - log.info(f'Process {chunksize} trains per chunk') - - mod_list = modules - if any(mod_list) is False: - mod_list = [i for i in range(16)] - n_jobs = len(mod_list) - - jobs = [] - for m in mod_list: - jobs.append(dict( - proposal=self.proposal, - run_nr=self.runnr, - module=m, - chunksize=chunksize, - fpt=self.fpt, - )) - - data = None - if use_joblib: - log.info(f'using joblib module for multithreading') - data = joblib.Parallel(n_jobs=n_jobs) \ - (joblib.delayed(sum_trains)(**jobs[i]) for i in range(n_jobs)) - else: - log.info(f'using multiprocessing module for multithreading') - with multiprocessing.Pool(n_jobs) as pool: - data = pool.map(sum_trains_multipr, jobs) - - data = xr.concat(data, dim='module') - data['run'] = self.runnr - data = data.transpose('pulse', 'module', 'x', 'y') - log.info(f'Processing done') - return data + placeholder for possible future class handling formatting standards + for data produced by DSSCBinner before being handed over to the + DSSCAnalyzer class for actual image operations. + """ + pass -class DSSCImager: +class DSSCAnalyzer: def __init__(self): """ - (Placeholder for possible future class handling geometrical properties. - Its main method could be to call the azimuthal integrator). + Placeholder for future class handling image operation/manipulation + (the actual data evaluation). """ # self.distance = distance # self.pxpitchh = 236 # self.pxpitchv = 204 # self.aspect = self.pxpitchv/self.pxpitchh - # self.geom = None - # self.mask = None + # ..... pass diff --git a/src/toolbox_scs/detectors/dssc_misc.py b/src/toolbox_scs/detectors/dssc_misc.py index de33c0f054ddef723dfc733790f879e785478e63..784b7e421da1137f1ce91e9c751d4b14e039b0dc 100644 --- a/src/toolbox_scs/detectors/dssc_misc.py +++ b/src/toolbox_scs/detectors/dssc_misc.py @@ -6,12 +6,13 @@ import logging import numpy as np +import xarray as xr from imageio import imread import extra_data as ed from extra_geom import DSSC_1MGeometry from .xgm import load_xgm -from .dssc_processing import split_frames as _split_frames +#from .dssc_processing import split_frames as _split_frames log = logging.getLogger(__name__) @@ -37,88 +38,92 @@ def load_dssc_info(proposal, run_nr): module = ed.open_run(proposal, run_nr, include='*DSSC00*') info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf') + info["trainIds"] = module.train_ids log.debug("Fetched information for DSSC module nr. 0.") return info -def calc_xgm_frame_indices(nbunches, framepattern): +def create_dssc_bins(name, coordinates, bins): """ - Returns a coordinate array for XGM data. The coordinates correspond to - DSSC frame numbers and depend on the number of FEL pulses per train - ("nbunches") and the framepattern. In framepattern, dark DSSC frame - names (i.e., without FEL pulse) _must_ include "dark" as a substring. - - Copyright (c) 2019, Michael Schneider - Copyright (c) 2020, SCS-team - license: BSD 3-Clause License (see LICENSE_BSD for more info) + Creates a single entry for the dssc binner dictionary. The produced xarray + data-array will later be used to perform grouping operations according to + the given bins. Parameters ---------- - nbunches: int - number of bunches per train - framepattern: list - experimental pattern + name: str + name of the coordinate to be binned. + coordinates: numpy.ndarray + the original coordinate values (1D) + bins: numpy.ndarray + the bins (groups) according to which the corresponding dimension should + be binned. Returns ------- - frame_indices: numpy.ndarray - coordinate array corresponding to DSSC frame numbers + da: xarray.DataArray + A pre-formatted xarray.DataArray relating the specified dimension with + its bins. + """ + da = xr.DataArray(bins, + dims=[name], + coords={name:coordinates}) + log.debug(f'created dssc bin array for dimension {name}') + return da + +def calc_xgm_frame_indices(pulse_bins): """ + Returns a coordinate array for XGM data. The coordinates correspond to + DSSC frame numbers which are not darks. - n_frames = len(framepattern) - n_data_frames = np.sum(['dark' not in p for p in framepattern]) - frame_max = nbunches * n_frames // n_data_frames + Parameters + ---------- + pulse_bins: list + bins along which the pulse dimension will be binned + + Returns + ------- + frame_indices: numpy.ndarray + coordinate array corresponding to DSSC frame numbers which are not + darks. + """ frame_indices = [] - for i, p in enumerate(framepattern): + for i, p in enumerate(pulse_bins): if 'dark' not in p: - frame_indices.append(np.arange(i, frame_max, n_frames)) + frame_indices.append(i) log.debug("Constructed coordinate array for XGM data.") - return np.sort(np.concatenate(frame_indices)) + return frame_indices -def get_xgm_formatted(run, framepattern): +def get_xgm_formatted(run, pulse_bins): """ Load the xgm data and define coordinates along the pulse dimension. + + Parameters + ---------- + run: extra_data.DataCollection + DataCollection object providing access to the xgm data to be loaded + pulse_bins: list, numpy.ndarray + bins along the pulse dimension not containing darks + + Returns + ------- + xgm: xarray.DataArray + xgm data with coordinate 'pulse'. """ xgm = load_xgm(run) - xgm_frame_coords = calc_xgm_frame_indices(xgm.shape[1], framepattern) + if pulse_bins is None: + xgm_frame_coords = np.linspace(0, xgm.shape[1]-1, xgm.shape[1], + dtype=np.uint64) + else: + xgm_frame_coords = calc_xgm_frame_indices(pulse_bins) xgm['pulse'] = xgm_frame_coords return xgm -def get_xgm_binned(xgm_data, xgm_threshold, bins, framepattern): - """ - Drop out-of bounds xgm data before binning the data along the train-id - dimension (bins) and the pulse dimension (framepattern). - """ - pulses_no_dark = [p for p in framepattern if 'dark' not in p] - valid = (xgm_data > xgm_threshold[0]) * \ - (xgm_data < xgm_threshold[1]) - xgm_data = xgm_data.where(valid) - xgm_data = _split_frames(xgm_data, pulses_no_dark, prefix='xgm_') - xgm_data['scan_variable'] = bins - return xgm_data.groupby('scan_variable').mean('trainId') - - -def create_xgm_pulsemask(xgm, pulsemask, framepattern, xgm_threshold): - """ - Create a mask that can be used to filter dssc images that correspond to - out-of-bounds xgm data. - """ - valid = (xgm > xgm_threshold[0]) * \ - (xgm < xgm_threshold[1]) - pulsemask = valid.combine_first(pulsemask).astype(bool) - - nrejected = int(valid.size - valid.sum()) - percent_rejected = 100 * nrejected / valid.size - log.info(f'rejecting {nrejected} out of {valid.size} pulses' - f'({percent_rejected:.1f}%) due to xgm threshold') - return pulsemask - - def load_geom(geopath=None, quad_pos=None): """ Loads and return the DSSC geometry. diff --git a/src/toolbox_scs/detectors/dssc_plot.py b/src/toolbox_scs/detectors/dssc_plot.py index 8a5cbf0a69802c946e20d3475571ab3978ff9edd..470a2f4df382dff6bf75d689e926e3c83fda7473 100644 --- a/src/toolbox_scs/detectors/dssc_plot.py +++ b/src/toolbox_scs/detectors/dssc_plot.py @@ -37,29 +37,26 @@ def plot_xgm_threshold(xgm, fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) -def plot_1D(data, - dname = 'data', run_nr = '', safe_fig = False): +def plot_binner(binner, + yname = 'data', xname='data', + run_nr = ''): fig = plt.figure() ax = fig.add_subplot(111) - ax.plot(data.trainId, data) + ax.plot(binner.values) - ax.set_ylabel(dname) - ax.set_xlabel('trainId') + ax.set_ylabel(yname) + ax.set_xlabel(xname) ax.set_title(f'run: {run_nr}') - if safe_fig == True: - tstamp = strftime('%y%m%d_%H%M') - fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) - -def plot_hist_1D(data, - dname = 'data', run_nr = '', safe_fig = False): +def plot_binner_hist(binner, + dname = 'data', run_nr = ''): - counts = xr.DataArray(np.ones(len(data)), + counts = xr.DataArray(np.ones(len(binner.values)), dims=[dname], - coords={dname: data.values}, + coords={dname: binner.values}, name='counts') counts = counts.groupby(dname).sum() @@ -70,13 +67,13 @@ def plot_hist_1D(data, ax.plot(counts[dname], counts, 'o', ms=4) ax.set_xlabel(dname) - ax.set_ylabel('number of trains') + ax.set_ylabel('counts') ax.set_title(f'run {run_nr}') ax.grid(True) - if safe_fig == True: - tstamp = strftime('%y%m%d_%H%M') - fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) + #if safe_fig == True: + # tstamp = strftime('%y%m%d_%H%M') + # fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) def plot_hist_processed(hist_data): diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py index 2702ccc9ecd2c309ca5557b1f5f321bcbe111c91..6086d4fa2454da9a654a49d2b0eb1f897a62889a 100644 --- a/src/toolbox_scs/detectors/dssc_processing.py +++ b/src/toolbox_scs/detectors/dssc_processing.py @@ -16,46 +16,54 @@ import extra_data as ed log = logging.getLogger(__name__) -def prepare_module_empty(scan_variable, framepattern): +def create_empty_dataset(run_info, binners = {}): """ - Create empty (zero-valued) DataArray for a single DSSC module + Create empty (zero-valued) DataSet for a single DSSC module to iteratively add data to. - Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team - license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- - scan_variable : xarray.DataArray - xarray DataArray containing the specified scan variable using - the trainId as coordinate. - framepattern: list of strings - example: ['pumped', 'unpumped'] - + Returns ------- - module_data: xarray.Dataset - empty DataArray + """ + fpt = run_info['frames_per_train'] + len_x = run_info['dims'][0] + len_y = run_info['dims'][1] + defaults = {"trainId": run_info["trainIds"], + "pulse": np.linspace(0,fpt-1,fpt, dtype=int), + "x": np.linspace(1,len_x,len_x, dtype=int), + "y": np.linspace(1,len_y,len_y, dtype=int)} - len_scan = len(np.unique(scan_variable)) - dims = ['scan_variable', 'x', 'y'] - coords = {'scan_variable': np.unique(scan_variable)} - shape = [len_scan, 128, 512] + dims = [] + coords = {} + shape = [] + shape_hist = [] - empty = xr.DataArray(np.zeros(shape, dtype=float), - dims=dims, coords=coords) - empty_sum_count = xr.DataArray(np.zeros(len_scan, dtype=int), - dims=['scan_variable']) - module_data = xr.Dataset() + for key in defaults: + dims.append(key) + df = pd.DataFrame({'data':defaults[key]}) + if key in binners: + df = pd.DataFrame({'data':binners[key].values}) + grouped = df.groupby(['data']) + groups = list(grouped.groups.keys()) + coords[key] = groups + shape.append(len(groups)) - for name in framepattern: - module_data[name] = empty.copy() - module_data['sum_count_' + name] = empty_sum_count.copy() + da_data = xr.DataArray(np.zeros(shape, dtype=np.float64), + coords=coords, + dims=dims) + ds = da_data.to_dataset(name="data") + ds['hist'] = xr.full_like(ds.data, fill_value=1) - log.debug("Prepared empty data array for single dssc module") - return module_data + # ToDo: drop dimensions not used for hist + # ... + + log.debug("Prepared empty dataset for single dssc module") + return ds def load_chunk_data(sel, sourcename, maxframes=None): @@ -98,192 +106,28 @@ def load_chunk_data(sel, sourcename, maxframes=None): return data.loc[{'pulse': np.s_[:maxframes]}] -def merge_chunk_data(module_data, chunk_data, framepattern): - """ - Merge chunk data with prepared dataset for entire module. - Aligns on "scan_variable" and sums values for variables - ['pumped', 'unpumped', 'sum_count'] - Concatenates the data along a new dimension ('tmp') and uses - the sum() method for automatic dtype conversion - - Copyright (c) 2019, Michael Schneider - Copyright (c) 2020, SCS-team - license: BSD 3-Clause License (see LICENSE_BSD for more info) - - Parameters - ---------- - module_data: xarray.Dataset - module data array to be filled - chunk_data: xarray.Dataset - loaded chunk of data to be merged into module_data - framepattern: list of strings - example: ['pumped', 'unpumped', 'sum_count'] - - Returns - ------- - module_data: xarray.Dataset - merged module data: - """ - - where = dict(scan_variable=chunk_data.scan_variable) - for name in framepattern: - for prefix in ['', 'sum_count_']: - var = prefix + name - summed = xr.concat([module_data[var].loc[where], chunk_data[var]], - dim='tmp').sum('tmp') - module_data[var].loc[where] = summed - - log.debug("Merged chunked data") - return module_data - - -def split_frames(data, pattern, prefix=''): - """ - Split frames according to "pattern" (possibly repeating) and average over - resulting splits. - - Copyright (c) 2019, Michael Schneider - Copyright (c) 2020, SCS-team - license: BSD 3-Clause License (see LICENSE_BSD for more info) - - Parameters - ---------- - data: - pattern: - A list of frame names (order matters!). Examples: - # 4 DSSC frames, 2 FEL pulses - pattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark'] - # 2 FEL frames, no intermediate darks - pattern = ['pumped', 'unpumped'] - # no splitting, average over all frames - pattern = ['image'] - - Returns - ------- - dataset: xarray.DataArray - a dataset with data variables named prefix + framename - """ - - n = len(pattern) - dataset = xr.Dataset() - for i, name in enumerate(pattern): - dataset[prefix + name] = data.loc[{'pulse': np.s_[i::n]}].mean('pulse') - return dataset - - -def sum_trains_multipr(job): - """ - This method simply calls the actual routine when using the multiprocessing - module. It just exists due to the slightly different syntax. For more - information have a look at "process_intra_train". - - Returns - ------- - module_data: xarray.Dataset - """ - return sum_trains(**job) - - -def sum_trains(proposal, run_nr, module, chunksize, fpt, - maxframes=None): - """ - Aggregate DSSC data (chunked, to fit into memory) for a single module. - Averages over all trains, but keeps all pulses. - - Copyright (c) 2019, Michael Schneider - Copyright (c) 2020, SCS-team - license: BSD 3-Clause License (see LICENSE_BSD for more info) - - Parameters - ---------- - job: dictionary - Designed for the multiprocessing module - expects a job dictionary with - the following keys: - proposal : int - proposal number - run : int - run number - module : int - DSSC module to process - chunksize : int - number of trains to process simultaneously - fpt : int - frames per train - - Returns - ------- - module_data: xarray.Dataset - - """ - - log.info(f"Processing dssc module {module}: start") - - sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - collection = ed.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') - fpt = min(fpt, maxframes) if maxframes is not None else fpt - arr = np.zeros([fpt, 128, 512], dtype=float) - dims = ['pulse', 'x', 'y'] - coords = {'pulse': np.arange(fpt, dtype=int)} - module_data = xr.DataArray(arr, dims=dims, coords=coords) - module_data = module_data.to_dataset(name='image') - module_data['sum_count'] = xr.DataArray(np.zeros(fpt, dtype=int), - dims=['pulse']) - ntrains = len(collection.train_ids) - chunks = np.arange(ntrains, step=chunksize) - - if module == 15: - pbar = tqdm(total=len(chunks)) - - for start_index in chunks: - log.debug(f"Module {module}: " - f"load trains {start_index}:{start_index + chunksize}") - - sel = collection.select_trains( - ed.by_index[start_index:start_index + chunksize]) - data = load_chunk_data(sel, sourcename, maxframes) - data = data.to_dataset(name='image') - - data['sum_count'] = xr.full_like(data.image[..., 0, 0], fill_value=1) - data = data.sum('trainId') - - for var in ['image', 'sum_count']: - module_data[var] = xr.concat([module_data[var], data[var]], - dim='tmp').sum('tmp') - log.debug(f"Module {module}: merged chunked data") - - if module == 15: - pbar.update(1) - - module_data['image'] = module_data['image'] / module_data.sum_count - log.info(f"processing module {module}: done") - return module_data - - def bin_data_multipr(job): """ - This method simply calls the actual routine when using the multiprocessing - module. It just exists due to its slightly special syntax. For more - information have a look at "bin_data". + This method simply calls the actual binning routine when using the + multiprocessing module. It exists due to the syntax in the main caller. + For more information have a look at "bin_data". Returns ------- module_data: xarray.Dataset - """ return bin_data(**job) -def bin_data(proposal, run_nr, module, chunksize, binfile, - framepattern=['image'], maskfile=None): +def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners, + pulsemask=None): """ Aggregate DSSC data (chunked, to fit into memory) for a single module. Groups by "scan_variable" in given bins - use dummy scan_variable to average over all trains. This implies, that only trains found in bins are considered. - Copyright (c) 2019, Michael Schneider Copyright (c) 2020, SCS-team - license: BSD 3-Clause License (see LICENSE_BSD for more info) Parameters ---------- @@ -295,15 +139,11 @@ def bin_data(proposal, run_nr, module, chunksize, binfile, DSSC module to process chunksize : int number of trains to process simultaneously - binfile : str - name of hdf5 file with xarray.DataArray containing the - bin variable and trainIds - framepattern : list of str - names for the (possibly repeating) intra-train pulses. See - split_dssc_data - pulsemask : str - name of hdf5 file with boolean xarray.DataArray to - select/reject trains and pulses + info: dictionary + dictionary containing keys 'dims', 'frames_per_train', 'total_frames', + 'trainIds' + pulsemask : numpy.ndarray + array of booleans to be used to mask dssc data according to xgm data. Returns ------- @@ -315,59 +155,57 @@ def bin_data(proposal, run_nr, module, chunksize, binfile, collection = ed.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') ntrains = len(collection.train_ids) - - log.info(f"Processing dssc module {module}: start") - - # read preprocessed bins from file - bins = xr.open_dataarray(binfile, 'data', engine='h5netcdf') - - # read binary pulse/train mask - e.g. from XGM thresholding - if maskfile is not None: - pulsemask = xr.open_dataarray(maskfile, 'data', engine='h5netcdf') - else: - pulsemask = None - - module_data = prepare_module_empty(bins, framepattern) chunks = np.arange(ntrains, step=chunksize) - + log.info(f"Processing dssc module {module}: start") + # progress bar if module == 15: pbar = tqdm(total=len(chunks)) + # create empty dataset to be filled iteratively + module_data = create_empty_dataset(info, dssc_binners) + # load data chunk by chunk and merge - for start_index in chunks: + for chunk in chunks: sel = collection.select_trains( - ed.by_index[start_index:start_index + chunksize]) + ed.by_index[chunk:chunk + chunksize]) nframes = sel.detector_info(sourcename)['total_frames'] - if nframes > 0: # some chunks have no DSSC data at all + if nframes > 0: log.debug(f"Module {module}: " - f"load trains {start_index}:{start_index + chunksize}") - data = load_chunk_data(sel, sourcename) - sum_count = xr.full_like(data[..., 0, 0], fill_value=1) + f"load trains {chunk}:{chunk + chunksize}") - if pulsemask is not None: - data = data.where(pulsemask) - sum_count = sum_count.where(pulsemask) - - data = split_frames(data, framepattern) - sum_count = split_frames(sum_count, framepattern, - prefix='sum_count_') - data = xr.merge([data, sum_count]) + chunk_data = load_chunk_data(sel, sourcename) + # ToDo: drop dimensions not used for hist + chunk_hist = xr.full_like(chunk_data, fill_value=1) - # aligns on trainId, drops non-matching trains - data['scan_variable'] = bins - data = data.groupby('scan_variable').sum('trainId') + # prefiltering -> xgm pulse masking + if pulsemask is not None: + chunk_data = chunk_data.where(pulsemask) + chunk_hist = chunk_hist.where(pulsemask) + chunk_data = chunk_data.to_dataset(name='data') + chunk_data['hist'] = chunk_hist + + + # data reduction -> apply binners + for b in dssc_binners: + chunk_data[b+"_binned"] = dssc_binners[b] + chunk_data = chunk_data.groupby(b+"_binned").sum(b) + chunk_data = chunk_data.rename(name_dict={b+"_binned":b}) + + # add chunk data to loaded data + for var in ['data', 'hist']: + module_data[var] = xr.concat([module_data[var], + chunk_data[var]], + dim='tmp').sum('tmp') log.debug(f"Module {module}: " - f"merge trains {start_index}:{start_index + chunksize}") - module_data = merge_chunk_data(module_data, data, framepattern) + f"processed trains {chunk}:{chunk + chunksize}") if module == 15: pbar.update(1) - for name in framepattern: - module_data[name] = module_data[name] / \ - module_data['sum_count_' + name] + module_data['data'] = module_data['data'] / module_data['hist'] + module_data = module_data.transpose('trainId', 'pulse', 'x', 'y') log.info(f"processing module {module}: done") return module_data diff --git a/src/toolbox_scs/load.py b/src/toolbox_scs/load.py index 645ec3e29f1c48143738aa9bd3d4ccbfd41eda5c..c9735342fdd4649036eb3c18b7cb7cdb3eb4f616 100644 --- a/src/toolbox_scs/load.py +++ b/src/toolbox_scs/load.py @@ -239,7 +239,7 @@ def concatenateRuns(runs): return result -def get_binned_array(run, mnemonic_key=None, binsize=None): +def get_array(run, mnemonic_key=None, stepsize=None): """ Loads the required 1D-data and rounds its values to integer multiples of stepsize for consistent grouping (except for stepsize=None). @@ -259,7 +259,7 @@ def get_binned_array(run, mnemonic_key=None, binsize=None): Returns ------- data : xarray.DataArray - xarray DataArray containing the binned array using the trainId as + xarray DataArray containing rounded array values using the trainId as coordinate. Raises @@ -272,7 +272,7 @@ def get_binned_array(run, mnemonic_key=None, binsize=None): >>> import toolbox_scs as tb >>> run = tb.load_run(2212, 235) >>> mnemonic = 'PP800_PhaseShifter' - >>> binned_array = tb.get_binned_array(run, mnemonic, 0.5) + >>> data_PhaseShifter = tb.get_array(run, mnemonic, 0.5) """ try: @@ -286,10 +286,10 @@ def get_binned_array(run, mnemonic_key=None, binsize=None): else: raise ToolBoxValueError("Invalid mnemonic", mnemonic_key) - if binsize is not None: - data = binsize * np.round(data / binsize) - data.name = 'binned_array' - log.debug(f"Constructed binned array for {mnemonic_key}") + if stepsize is not None: + data = stepsize * np.round(data / stepsize) + data.name = 'data' + log.debug(f"Got data for {mnemonic_key}") except ToolBoxValueError as err: log.error(f"{err.message}") raise diff --git a/src/toolbox_scs/test/test_dssc_cls.py b/src/toolbox_scs/test/test_dssc_cls.py index 6bec4772b0607d52bf5b2357f28b263d8653b567..7f2bc39434369832eee0eee2c8949d5c8ddd0a13 100644 --- a/src/toolbox_scs/test/test_dssc_cls.py +++ b/src/toolbox_scs/test/test_dssc_cls.py @@ -18,7 +18,6 @@ log_root = logging.getLogger(__name__) suites = {"no-processing": ( "test_create", - "test_tmpfiles", ), "processing": ( "test_binning", @@ -26,13 +25,7 @@ suites = {"no-processing": ( } -_temp_dirs = ['tmp', 'images', 'processed_runs'] - - -def setup_tmp_dir(): - for d in _temp_dirs: - if not os.path.isdir(d): - os.mkdir(d) +_temp_dirs = ['processed_runs'] def cleanup_tmp_dir(): @@ -48,7 +41,6 @@ class TestDSSC(unittest.TestCase): # --------------------------------------------------------------------- # global test settings # --------------------------------------------------------------------- - setup_tmp_dir() log_root.info("Finished global setup, start tests") @classmethod @@ -57,15 +49,37 @@ class TestDSSC(unittest.TestCase): cleanup_tmp_dir() def test_create(self): - params = {'bin_variable_name': 'PP800_PhaseShifter', - 'framepattern': ['pumped', 'unpumped']} + proposal_nb = 2212 + run_nb = 235 + run = tb.load_run(proposal_nb, run_nb, include='*DA*') + run_info = tbdet.load_dssc_info(proposal_nb, run_nb) + bins_trainId = tb.get_array(run, + 'PP800_PhaseShifter', + 0.04) + bins_pulse = ['pumped', 'unpumped'] * 10 + + binner1 = tbdet.create_dssc_bins("trainId", + run_info['trainIds'], + bins_trainId.values) + binner2 = tbdet.create_dssc_bins("pulse", + np.linspace(0,19,20, dtype=int), + bins_pulse) + binners = {'trainId': binner1, 'pulse': binner2} + params = {'binners': binners, + 'use_xgm': True, + 'xgm_threshold' : (0, np.inf), + 'xgm_bins': bins_pulse} # normal - run235 = tbdet.DSSCBinner(2212, 235) + run235 = tbdet.DSSCBinner(proposal_nb, run_nb) del(run235) - run235 = tbdet.DSSCBinner(2212, 235, is_dark=False) - run235 = tbdet.DSSCBinner(2212, 235, is_dark=False, **params) - self.assertEqual(run235.bins.values[0], 7585.5) + run235 = tbdet.DSSCBinner(2212, 235, use_xgm=True) + run235 = tbdet.DSSCBinner(2212, 235, + use_xgm=True, + xgm_bins=bins_pulse) + run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params) + self.assertEqual(run235.binners['trainId'].values[0], + np.float32(7585.52)) # expected fails with self.assertRaises(FileNotFoundError) as cm: @@ -74,25 +88,31 @@ class TestDSSC(unittest.TestCase): "'/gpfs/exfel/exp/SCS/201901/p002212/raw/r2354'" self.assertEqual(str(cm.exception), err_msg) - def test_tmpfiles(self): - params = {'bin_variable_name': 'PP800_PhaseShifter', - 'framepattern': ['pumped', 'unpumped']} - run235 = tbdet.DSSCBinner(2212, 235, - use_xgm=True, is_dark=False, **params) - self.assertEqual(run235.binfile, './tmp/scan.h5') - self.assertTrue(os.path.isfile('./tmp/mask.h5')) def test_binning(self): - params = {'bin_variable_name': 'PP800_PhaseShifter', - 'is_dark': True, - 'framepattern': ['pumped', 'unpumped']} - - testrun = tbdet.DSSCBinner(2212, 232, **params) + # dark + proposal_nb = 2212 + run_nb = 232 + run = tb.load_run(proposal_nb, run_nb, include='*DA*') + run_info = tbdet.load_dssc_info(proposal_nb, run_nb) + bins_trainId = tb.get_array(run, + 'PP800_PhaseShifter', + 0.03) + bins_pulse = ['pumped', 'unpumped'] * 10 + + binner1 = tbdet.create_dssc_bins("trainId", + run_info['trainIds'], + bins_trainId.values) + binner2 = tbdet.create_dssc_bins("pulse", + np.linspace(0,19,20, dtype=int), + bins_pulse) + binners = {'trainId': binner1, 'pulse': binner2} + run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) mod_list = [0,1] - data = testrun.bin_data(use_joblib=False, modules=mod_list) - self.assertEqual(data.run, 232) - data = testrun.bin_data(use_joblib=True, modules=mod_list) - self.assertEqual(data.run, 232) + data = run232.bin_data(use_joblib=False, modules=mod_list) + self.assertIsNotNone(data.data) + data = run232.bin_data(use_joblib=True, modules=mod_list) + self.assertIsNotNone(data.data) def list_suites(): diff --git a/src/toolbox_scs/test/test_dssc_methods.py b/src/toolbox_scs/test/test_dssc_methods.py index 50d098b71f34a7b58ae10639498d9608fbbe3461..5e28a72330cd4523f6549373d701dcb979c6901f 100644 --- a/src/toolbox_scs/test/test_dssc_methods.py +++ b/src/toolbox_scs/test/test_dssc_methods.py @@ -13,7 +13,7 @@ import extra_data as ed import toolbox_scs as tb import toolbox_scs.detectors as tbdet from toolbox_scs.detectors.dssc_processing import (load_chunk_data, - prepare_module_empty, merge_chunk_data) + create_empty_dataset) from toolbox_scs.util.exceptions import ToolBoxFileError logging.basicConfig(level=logging.DEBUG) @@ -22,18 +22,26 @@ log_root = logging.getLogger(__name__) suites = {"metafunctions": ( "test_info", + "test_binners", "test_calcindices", "test_createpulsemask", - "test_storage", + #"test_storage", + "test_createempty", "test_loadmergechunk", ), - "image-processing": ( + "binning-pulse": ( "test_info", + "test_binners", + "test_calcindices", + "test_createpulsemask", "test_processmodule", ), - "intra-train-processing": ( + "binning-train": ( "test_info", - "test_intratrain", + "test_binners", + "test_calcindices", + "test_createpulsemask", + "test_processmodule2", ) } @@ -63,25 +71,25 @@ class TestDSSC(unittest.TestCase): cls._proposal = 2212 cls._run_nr = 235 cls._is_dark = False - cls._framepattern = ['pumped', 'unpumped'] - cls._maxframes = None + + cls._bins_trainId_name = 'PP800_PhaseShifter' cls._stepsize = .04 - cls._scan_variable_name = 'PP800_PhaseShifter' + + cls._bins_pulse = ['pumped', 'unpumped'] * 10 + cls._maxframes = None + cls._xgm_min = 0 cls._xgm_max = np.inf - cls._scanfile = './tmp/scan.h5' - cls._maskfile = './tmp/mask.h5' # --------------------------------------------------------------------- setup_tmp_dir() cls._run = tb.load_run(cls._proposal, cls._run_nr, include='*DA*') - cls._scan_variable = tb.get_binned_array(cls._run, - cls._scan_variable_name, - cls._stepsize) - cls._scan_variable.to_netcdf(cls._scanfile, - group='data', mode='w', engine='h5netcdf') + cls._bins_trainId = tb.get_array(cls._run, + cls._bins_trainId_name, + cls._stepsize) + cls._xgm = tbdet.load_xgm(cls._run) log_root.info("Finished global setup, start tests") @@ -95,16 +103,33 @@ class TestDSSC(unittest.TestCase): def test_info(self): cls = self.__class__ - info = tbdet.load_dssc_info(cls._proposal, cls._run_nr) - fpt = info['frames_per_train'] - self.assertEqual(fpt, 20) - cls._fpt = fpt + cls._info = tbdet.load_dssc_info(cls._proposal, cls._run_nr) + self.assertEqual(cls._info['frames_per_train'], 20) + + + def test_binners(self): + cls = self.__class__ + # create 3 different binners manually + bin1 = tbdet.create_dssc_bins("trainId", + cls._info['trainIds'], + cls._bins_trainId.values) + bin2 = tbdet.create_dssc_bins("pulse", + np.linspace(0,19,20, dtype=int), + cls._bins_pulse) + bin3 = tbdet.create_dssc_bins("x", + np.linspace(1,128,128, dtype=int), + np.linspace(1,128,128, dtype=int)) + + # format binner dictionary. The 4th binner is constructed automatically + cls._dssc_binners = {"trainId": bin1, + "pulse": bin2, + "x": bin3} + self.assertIsNotNone(cls._dssc_binners) def test_calcindices(self): cls = self.__class__ - xgm_frame_coords = tbdet.calc_xgm_frame_indices( - cls._xgm.shape[1], cls._framepattern) + xgm_frame_coords = tbdet.calc_xgm_frame_indices(cls._bins_pulse) self.assertIsNotNone(xgm_frame_coords) cls._xgm['pulse'] = xgm_frame_coords @@ -112,33 +137,27 @@ class TestDSSC(unittest.TestCase): def test_createpulsemask(self): cls = self.__class__ - data = np.ones([len(cls._run.train_ids), cls._fpt], dtype=bool) + data = np.ones([len(cls._run.train_ids), + cls._info['frames_per_train']], dtype=bool) dimensions = ['trainId', 'pulse'] coordinates = {'trainId': cls._run.train_ids, - 'pulse': range(cls._fpt)} + 'pulse': range(cls._info['frames_per_train'])} pulsemask = xr.DataArray(data, dims=dimensions, coords=coordinates) - + valid = (cls._xgm > cls._xgm_min) * (cls._xgm < cls._xgm_max) - pulsemask = valid.combine_first(pulsemask).astype(bool) - - pulsemask.to_netcdf(cls._maskfile, - group='data', mode='w', engine='h5netcdf') - self.assertIsNotNone(pulsemask) + cls._pulsemask = valid.combine_first(pulsemask).astype(bool) + + self.assertIsNotNone(cls._pulsemask) + + + def test_createempty(self): + cls = self.__class__ + cls._empty_data = create_empty_dataset(cls._info, cls._dssc_binners) + self.assertIsNotNone(cls._empty_data.dims['trainId']) def test_loadmergechunk(self): cls = self.__class__ - try: - pulsemask = xr.open_dataarray(cls._maskfile, 'data', - engine='h5netcdf') - except: - pulsemask = None - - # construct empty - empty_data = prepare_module_empty(cls._scan_variable, - cls._framepattern) - self.assertIsNotNone(empty_data.dims['scan_variable']) - # load single chunk module = 1 chunksize = 512 @@ -151,41 +170,45 @@ class TestDSSC(unittest.TestCase): chunks = np.arange(ntrains, step=chunksize) start_index = chunks[0] - sel = collection.select_trains( - ed.by_index[start_index:start_index + chunksize]) - chunk_data = load_chunk_data(sel, sourcename) - self.assertIsNotNone(chunk_data) - log_root.debug(f"Loaded {ntrains} trains for {sourcename}") - - # merge into empty - sum_count = xr.full_like(chunk_data[..., 0, 0], fill_value=1) - if pulsemask is not None: - chunk_data = chunk_data.where(pulsemask) - sum_count = sum_count.where(pulsemask) - - chunk_data = tbdet.split_frames(chunk_data, cls._framepattern) - sum_count = tbdet.split_frames(sum_count, cls._framepattern, - prefix='sum_count_') - chunk_data = xr.merge([chunk_data, sum_count]) - chunk_data['scan_variable'] = cls._scan_variable - chunk_data = chunk_data.groupby('scan_variable').sum('trainId') - merged_data = merge_chunk_data(empty_data, chunk_data, - cls._framepattern) - self.assertIsNotNone(merged_data) - + module_data = create_empty_dataset(cls._info, cls._dssc_binners) + + for chunk in chunks[0:2]: + sel = collection.select_trains( + ed.by_index[chunk:chunk + chunksize]) + chunk_data = load_chunk_data(sel, sourcename) + self.assertIsNotNone(chunk_data) + log_root.debug(f"Module {module}: " + f"loaded trains {chunk}:{chunk + chunksize}") + + # pulse masking, ToDo: drop unbinned dimensions for hist + chunk_hist = xr.full_like(chunk_data, fill_value=1) + if cls._pulsemask is not None: + chunk_data = chunk_data.where(cls._pulsemask) + chunk_hist = chunk_hist.where(cls._pulsemask) + chunk_data = chunk_data.to_dataset(name='data') + chunk_data['hist'] = chunk_hist + + # apply predefined binning + for b in cls._dssc_binners: + chunk_data[b+"_binned"] = cls._dssc_binners[b] + chunk_data = chunk_data.groupby(b+"_binned").sum(b) + chunk_data = chunk_data.rename(name_dict={b+"_binned":b}) + + # merge dsets and format + for var in ['data', 'hist']: + module_data[var] = xr.concat([module_data[var], + chunk_data[var]], + dim='tmp').sum('tmp') + #print(module_data) + + #module_data = module_data.transpose('trainId', 'pulse', 'x', 'y') + self.assertIsNotNone(module_data) def test_processmodule(self): cls = self.__class__ - max_GB = 300 - r_nb = 232 - info = tbdet.load_dssc_info(cls._proposal, r_nb) - fpt = info['frames_per_train'] - binfile = './tmp/scan.h5' - framepattern = ['pumped', 'unpumped'] - chunksize = int(max_GB * 128 // fpt) - chunksize = min(512, chunksize) - print('processing', chunksize, 'trains per chunk') + chunksize = 512 + print('processing', chunksize, 'trains per chunk') jobs = [] for m in range(2): @@ -194,8 +217,9 @@ class TestDSSC(unittest.TestCase): run_nr=cls._run_nr, module=m, chunksize=chunksize, - binfile=binfile, - framepattern=framepattern + info=cls._info, + dssc_binners=cls._dssc_binners, + pulsemask=cls._pulsemask )) print(f'start processing modules:', strftime('%X')) @@ -204,53 +228,82 @@ class TestDSSC(unittest.TestCase): module_data = pool.map(tbdet.bin_data_multipr, jobs) print('finished processing modules:', strftime('%X')) + module_data = xr.concat(module_data, dim='module') + print(module_data) self.assertIsNotNone(module_data) - def test_intratrain(self): + def test_processmodule2(self): cls = self.__class__ - max_GB = 300 - r_nb = 89 - info = tbdet.load_dssc_info(cls._proposal, r_nb) - fpt = info['frames_per_train'] - chunksize = int(max_GB * 128 // fpt) - chunksize = min(512, chunksize) - print('processing', chunksize, 'trains per chunk') + + chunksize = 512 + info = tbdet.load_dssc_info(2212, 89) + bin1 = tbdet.create_dssc_bins("trainId", + info['trainIds'], + np.ones(1691)) + binners = {'trainId': bin1} jobs = [] for m in range(2): jobs.append(dict( proposal=cls._proposal, - run_nr=r_nb, + run_nr=89, module=m, chunksize=chunksize, - fpt=fpt, + info=info, + dssc_binners=binners, )) - print(f'start processing modules:', strftime('%X')) + with multiprocessing.Pool(16) as pool: + module_data = pool.map(tbdet.bin_data_multipr, jobs) + + print('finished processing modules:', strftime('%X')) + module_data = xr.concat(module_data, dim='module') + module_data = module_data.squeeze() + print(module_data) + self.assertIsNotNone(module_data) + + + def test_intratrain(self): + cls = self.__class__ + chunksize = 512 + print(f'processing {chunksize} trains per chunk') + + jobs = [] + for m in range(1): + jobs.append(dict( + proposal=2212, + run_nr=89, + module=m, + chunksize=chunksize, + fpt=75, + )) + + print('start processing modules:', {strftime('%X')}) with multiprocessing.Pool(16) as pool: module_data = pool.map(tbdet.sum_trains_multipr, jobs) - print('finished processing modules:', strftime('%X')) + print('finished processing modules:', {strftime('%X')}) + module_data = xr.concat(module_data, dim='module') + module_data = module_data.squeeze() + print(module_data) self.assertIsNotNone(module_data) - + def test_storage(self): cls = self.__class__ - tbdet.save_to_file(cls._scan_variable, 'tmp/scan2.h5') - tbdet.save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') - tbdet.save_to_file(cls._scan_variable, 'tmp/scan3.h5', + tbdet.save_to_file(cls._bins_trainId, 'tmp/scan2.h5') + tbdet.save_to_file(cls._bins_trainId, 'scan3.h5', path = './tmp/') + tbdet.save_to_file(cls._bins_trainId, 'tmp/scan3.h5', overwrite = True) - scandata = tbdet.load_from_file('tmp/scan3.h5') + bins_trainId = tbdet.load_from_file('tmp/scan3.h5') - self.assertIsNotNone(scandata) - maskdata = tbdet.load_from_file('tmp/mask.h5') - self.assertIsNotNone(maskdata) + self.assertIsNotNone(bins_trainId) with self.assertRaises(ToolBoxFileError) as cm: - tbdet.save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') + tbdet.save_to_file(cls._bins_trainId, 'scan3.h5', path = './tmp/') self.assertEqual(cm.exception.value, './tmp/scan3.h5') diff --git a/src/toolbox_scs/test/test_top_level.py b/src/toolbox_scs/test/test_top_level.py index 76db5e98fac29513879bffe278fc2787f7bd15e7..9fd549609fdcc29fcefbfa97571a1988c5ffa269 100644 --- a/src/toolbox_scs/test/test_top_level.py +++ b/src/toolbox_scs/test/test_top_level.py @@ -88,13 +88,13 @@ class TestToolbox(unittest.TestCase): # Normal use mnemonic = 'PP800_PhaseShifter' - scan_variable = tb.get_binned_array(cls._ed_run, mnemonic, 0.5) - self.assertTrue = (scan_variable) + data = tb.get_array(cls._ed_run, mnemonic, 0.5) + self.assertTrue = (data) # unknown mnemonic mnemonic = 'blabla' with self.assertRaises(ToolBoxValueError) as cm: - scan_variable = tb.get_binned_array(cls._ed_run, mnemonic, 0.5) + scan_variable = tb.get_array(cls._ed_run, mnemonic, 0.5) excp = cm.exception self.assertEqual(excp.value, mnemonic)