From 0bef5c63456a9b127e457d191b65527259dc9f8b Mon Sep 17 00:00:00 2001 From: Rafael Gort <rafael.gort@xfel.eu> Date: Wed, 6 May 2020 22:20:26 +0200 Subject: [PATCH] Added file handlers, prepared skeleton for updated dssc class, plot related routines to be integrated and tested. --- .../detectors/{DSSC_cls.py => DSSC_bkp.py} | 0 src/toolbox_scs/detectors/__init__.py | 4 +- src/toolbox_scs/detectors/dssc.py | 424 ++---------------- src/toolbox_scs/detectors/dssc_data.py | 58 ++- .../plot_dssc.py => detectors/dssc_plot.py} | 10 +- src/toolbox_scs/detectors/dssc_routines.py | 404 +++++++++++++++++ src/toolbox_scs/test/test_detectors_dssc.py | 119 +++-- src/toolbox_scs/test/test_misc.py | 115 +++++ src/toolbox_scs/util/exceptions.py | 12 +- 9 files changed, 694 insertions(+), 452 deletions(-) rename src/toolbox_scs/detectors/{DSSC_cls.py => DSSC_bkp.py} (100%) rename src/toolbox_scs/{plot/plot_dssc.py => detectors/dssc_plot.py} (81%) create mode 100644 src/toolbox_scs/detectors/dssc_routines.py create mode 100644 src/toolbox_scs/test/test_misc.py diff --git a/src/toolbox_scs/detectors/DSSC_cls.py b/src/toolbox_scs/detectors/DSSC_bkp.py similarity index 100% rename from src/toolbox_scs/detectors/DSSC_cls.py rename to src/toolbox_scs/detectors/DSSC_bkp.py diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index 1b83ec9..4890887 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -2,9 +2,10 @@ from .xgm import ( load_xgm, cleanXGMdata, matchXgmTimPulseId) from .tim import ( load_TIM,) -from .dssc import ( +from .dssc_routines import ( load_dssc_info, calc_xgm_frame_indices, process_dssc_module, split_frames, process_intra_train) +from .dssc import DSSC __all__ = ( # Functions @@ -17,5 +18,6 @@ __all__ = ( "process_intra_train", "split_frames", # Classes + DSSC # Variables ) diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 427206e..43d0927 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -1,393 +1,61 @@ -# -*- coding: utf-8 -*- -''' -@author: Michael Schneider, with input from XFEL-CAS and SCS beamline staff +from .dssc_routines import ( + load_dssc_info as dssc_info, + calc_xgm_frame_indices as xgm_frame_indices +) +from .dssc_data import save_to_file, load_from_file +from .dssc_plot import xgm_threshold -karabo_data: https://github.com/European-XFEL/karabo_data -SCS ToolBox: https://git.xfel.eu/gitlab/SCS/ToolBox -contributions should comply with pep8 code structure guidelines. +class DSSC: + def __init__(self): + # setup_dir() + pass -Copyright (c) 2019, Michael Schneider -All rights reserved. -''' -import os -import logging -from glob import glob -from time import strftime, sleep -from tqdm import tqdm + def __del__(self): + # cleanup_dir() + pass -import numpy as np -import xarray as xr -import pandas as pd + # ------------------------------------------------------------------------- + # Data handling + # ------------------------------------------------------------------------- + def open_run(self): + pass -import extra_data as ed + def load_geom(self): + pass + def load_scan(self): + pass -log = logging.getLogger(__name__) + def load_mask(self): + pass + def load_xgm(self): + pass -def load_dssc_info(proposal, run_nr): - """ - Loads the first data file for DSSC module 0 (this is hardcoded) - and returns the detector_info dictionary + def xgm_filter(self): + pass - Parameters - ---------- - proposal: str, int - number of proposal - run_nr: str, int - number of run + def load_binned_data(self): + pass - Returns - ------- - info : dictionary - {'dims': tuple, 'frames_per_train': int, 'total_frames': int} - """ + def save_binned_data(self): + pass + # ------------------------------------------------------------------------- + # Data processing + # ------------------------------------------------------------------------- + def process_bin(self): + pass - module = ed.open_run(proposal, run_nr, include='*DSSC00*') - info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf') - log.debug("Fetched information for DSSC module nr. 0.") - return info + def azimuthal_integration(self): + pass -def calc_xgm_frame_indices(nbunches, framepattern): - """ - Returns a coordinate array for XGM data. The coordinates correspond to - DSSC frame numbers and depend on the number of FEL pulses per train - ("nbunches") and the framepattern. In framepattern, dark DSSC frame - names (i.e., without FEL pulse) _must_ include "dark" as a substring. + # ------------------------------------------------------------------------- + # Data visualization -> tbdet member functions + # ------------------------------------------------------------------------- + #def plot_xgm_threshold(self): + # pass - Parameters - ---------- - nbunches: int - number of bunches per train - framepattern: list - experimental pattern - - Returns - ------- - frame_indices: numpy.ndarray - coordinate array corresponding to DSSC frame numbers - - """ - - n_frames = len(framepattern) - n_data_frames = np.sum(['dark' not in p for p in framepattern]) - frame_max = nbunches * n_frames // n_data_frames - - frame_indices = [] - for i, p in enumerate(framepattern): - if 'dark' not in p: - frame_indices.append(np.arange(i, frame_max, n_frames)) - - log.debug("Constructed coordinate array for XGM data.") - return np.sort(np.concatenate(frame_indices)) - - -def prepare_module_empty(scan_variable, framepattern): - """ - Create empty (zero-valued) DataArray for a single DSSC module - to iteratively add data to. - - Parameters - ---------- - scan_variable : xarray.DataArray - xarray DataArray containing the specified scan variable using - the trainId as coordinate. - framepattern: list of strings - example: ['pumped', 'unpumped'] - - Returns - ------- - module_data: xarray.Dataset - empty DataArray - """ - - len_scan = len(np.unique(scan_variable)) - dims = ['scan_variable', 'x', 'y'] - coords = {'scan_variable': np.unique(scan_variable)} - shape = [len_scan, 128, 512] - - empty = xr.DataArray(np.zeros(shape, dtype=float), - dims=dims, coords=coords) - empty_sum_count = xr.DataArray(np.zeros(len_scan, dtype=int), - dims=['scan_variable']) - module_data = xr.Dataset() - - for name in framepattern: - module_data[name] = empty.copy() - module_data['sum_count_' + name] = empty_sum_count.copy() - - log.debug("Prepared empty data array for single dssc module") - return module_data - - -def load_chunk_data(sel, sourcename, maxframes=None): - """ - Load selected DSSC data. The flattened multi-index (trains+pulses) is - unraveled before returning the data. - - Parameters - ---------- - sel: extra_data.DataCollection - a DataCollection or a subset of a DataCollection obtained by its - select_trains() method - sourcename: str - - Returns - ------- - xarray.DataArray - """ - - info = sel.detector_info(sourcename) - fpt = info['frames_per_train'] - frames_total = info['total_frames'] - data = sel.get_array(sourcename, 'image.data', - extra_dims=['_empty_', 'x', 'y'] - ).squeeze() - - tids = np.unique(data.trainId) - data = data.rename(dict(trainId='trainId_pulse')) - - midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)], - names=('trainId', 'pulse')) - data = xr.DataArray(data, - dict(trainId_pulse=midx) - ).unstack('trainId_pulse') - data = data.transpose('trainId', 'pulse', 'x', 'y') - - return data.loc[{'pulse': np.s_[:maxframes]}] - - -def merge_chunk_data(module_data, chunk_data, framepattern): - """ - Merge chunk data with prepared dataset for entire module. - Aligns on "scan_variable" and sums values for variables - ['pumped', 'unpumped', 'sum_count'] - Concatenates the data along a new dimension ('tmp') and uses - the sum() method for automatic dtype conversion - - Parameters - ---------- - module_data: xarray.Dataset - module data array to be filled - chunk_data: xarray.Dataset - loaded chunk of data to be merged into module_data - framepattern: list of strings - example: ['pumped', 'unpumped', 'sum_count'] - - Returns - ------- - module_data: xarray.Dataset - merged module data: - """ - - where = dict(scan_variable=chunk_data.scan_variable) - for name in framepattern: - for prefix in ['', 'sum_count_']: - var = prefix + name - summed = xr.concat([module_data[var].loc[where], chunk_data[var]], - dim='tmp').sum('tmp') - module_data[var].loc[where] = summed - - log.debug("Merged chunked data") - return module_data - - -def split_frames(data, pattern, prefix=''): - """ - Split frames according to "pattern" (possibly repeating) and average over - resulting splits. - - Parameters - ---------- - data: - pattern: - A list of frame names (order matters!). Examples: - # 4 DSSC frames, 2 FEL pulses - pattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark'] - # 2 FEL frames, no intermediate darks - pattern = ['pumped', 'unpumped'] - # no splitting, average over all frames - pattern = ['image'] - - Returns - ------- - dataset: xarray.DataArray - a dataset with data variables named prefix + framename - """ - - n = len(pattern) - dataset = xr.Dataset() - for i, name in enumerate(pattern): - dataset[prefix + name] = data.loc[{'pulse': np.s_[i::n]}].mean('pulse') - return dataset - - -def process_intra_train(job): - """ - Aggregate DSSC data (chunked, to fit into memory) for a single module. - Averages over all trains, but keeps all pulses. - Designed for the multiprocessing module - expects a job dictionary with - the following keys: - proposal : (int) proposal number - run : (int) run number - module : (int) DSSC module to process - chunksize : (int) number of trains to process simultaneously - fpt : (int) frames per train - """ - proposal = job['proposal'] - run_nr = job['run_nr'] - module = job['module'] - chunksize = job['chunksize'] - fpt = job['fpt'] - maxframes = job.get('maxframes', None) # optional - - log.info(f"Processing dssc module {module}: start") - - sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - collection = ed.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') - - fpt = min(fpt, maxframes) if maxframes is not None else fpt - dims = ['pulse', 'x', 'y'] - coords = {'pulse': np.arange(fpt, dtype=int)} - shape = [fpt, 128, 512] - module_data = xr.DataArray(np.zeros(shape, dtype=float), dims=dims, - coords=coords) - module_data = module_data.to_dataset(name='image') - module_data['sum_count'] = xr.DataArray(np.zeros(fpt, dtype=int), - dims=['pulse']) - - ntrains = len(collection.train_ids) - chunks = np.arange(ntrains, step=chunksize) - if module == 15: - pbar = tqdm(total=len(chunks)) - for start_index in chunks: - log.debug(f"Module {module}: " - f"load trains {start_index}:{start_index + chunksize}") - sel = collection.select_trains( - ed.by_index[start_index:start_index + chunksize]) - data = load_chunk_data(sel, sourcename, maxframes) - data = data.to_dataset(name='image') - - data['sum_count'] = xr.full_like(data.image[..., 0, 0], fill_value=1) - data = data.sum('trainId') - - for var in ['image', 'sum_count']: - # concatenating and using the sum() method automatically takes care - # of dtype casting if necessary - module_data[var] = xr.concat([module_data[var], data[var]], - dim='tmp').sum('tmp') - if module == 15: - pbar.update(1) - - module_data['image'] = module_data['image'] / module_data.sum_count - log.info(f"processing module {module}: done") - return module_data - - -def process_dssc_module(job): - """ - Aggregate DSSC data (chunked, to fit into memory) for a single module. - Groups by "scan_variable" in given scanfile - use dummy scan_variable to - average over all trains. This implies, that only trains found in the - scanfile are considered. - - Parameters - ---------- - job:Â dictionary - Designed for the multiprocessing module - expects a job dictionary with - the following keys: - proposal : int - proposal number - run : int - run number - module : int - DSSC module to process - chunksize : int - number of trains to process simultaneously - scanfile : str - name of hdf5 file with xarray.DataArray containing the - scan variable and trainIds - framepattern : list of str - names for the (possibly repeating) intra-train pulses. See - split_dssc_data - pulsemask : str - name of hdf5 file with boolean xarray.DataArray to - select/reject trains and pulses - - Returns - ------- - module_data: xarray.Dataset - - - """ - - proposal = job['proposal'] - run_nr = job['run_nr'] - module = job['module'] - chunksize = job['chunksize'] - scanfile = job['scanfile'] - framepattern = job.get('framepattern', ['image']) - maskfile = job.get('maskfile', None) - - sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - collection = ed.open_run(proposal, run_nr, - include=f'*DSSC{module:02d}*') - ntrains = len(collection.train_ids) - - log.info(f"Processing dssc module {module}: start") - - # read preprocessed scan variable from file - scan = xr.open_dataarray(scanfile, 'data', engine='h5netcdf') - - # read binary pulse/train mask - e.g. from XGM thresholding - if maskfile is not None: - pulsemask = xr.open_dataarray(maskfile, 'data', engine='h5netcdf') - else: - pulsemask = None - - module_data = prepare_module_empty(scan, framepattern) - chunks = np.arange(ntrains, step=chunksize) - - # progress bar - if module == 15: - pbar = tqdm(total=len(chunks)) - - # load data chunk by chunk and merge - for start_index in chunks: - sel = collection.select_trains( - ed.by_index[start_index:start_index + chunksize]) - nframes = sel.detector_info(sourcename)['total_frames'] - if nframes > 0: # some chunks have no DSSC data at all - log.debug(f"Module {module}: " - f"load trains {start_index}:{start_index + chunksize}") - data = load_chunk_data(sel, sourcename) - sum_count = xr.full_like(data[..., 0, 0], fill_value=1) - - if pulsemask is not None: - data = data.where(pulsemask) - sum_count = sum_count.where(pulsemask) - - data = split_frames(data, framepattern) - sum_count = split_frames(sum_count, framepattern, - prefix='sum_count_') - data = xr.merge([data, sum_count]) - - # aligns on trainId, drops non-matching trains - data['scan_variable'] = scan - data = data.groupby('scan_variable').sum('trainId') - - log.debug(f"Module {module}: " - f"merge trains {start_index}:{start_index + chunksize}") - module_data = merge_chunk_data(module_data, data, framepattern) - - if module == 15: - pbar.update(1) - - for name in framepattern: - module_data[name] = module_data[name] / \ - module_data['sum_count_' + name] - - log.info(f"processing module {module}: done") - return module_data \ No newline at end of file + #def plot_xgm_hist(self): + # pass \ No newline at end of file diff --git a/src/toolbox_scs/detectors/dssc_data.py b/src/toolbox_scs/detectors/dssc_data.py index 568abb3..568adf8 100644 --- a/src/toolbox_scs/detectors/dssc_data.py +++ b/src/toolbox_scs/detectors/dssc_data.py @@ -1,22 +1,36 @@ -def safe_mask(): - pass - - -def load_mask(): - pass - - -def safe_dark(): - pass - - -def load_dark(): - pass - - -def safe_processed(): - pass - - -def load_processed(): - pass +import os +import logging +import xarray as xr + +from ..util.exceptions import ToolBoxFileError + +log = logging.getLogger(__name__) + + +def _save_file(data, fname, overwrite): + f_exists = os.path.isfile(fname) + if (f_exists and overwrite ==True): + data.to_netcdf(fname, group='data', mode='w', engine='h5netcdf') + log.warning(f"File {fname} existed: overwritten") + log.info(f"Stored data in file {fname}") + elif f_exists and overwrite == False: + msg = "File already exists." + raise ToolBoxFileError(msg, fname) + else: + data.to_netcdf(fname, group='data', mode='w', engine='h5netcdf') + log.info(f"Stored data in file {fname}") + + +def save_to_file(data, fname, path = './', overwrite = False): + fpath = os.path.join(path, fname) + try: + _save_file(data, fpath, overwrite) + except ToolBoxFileError as err: + err_msg = err.message + log.warning(f"Could not store data: {err_msg}") + err.message = f"{err_msg} set kwarg overwrite = True" + raise err + + +def load_from_file(fname): + return xr.open_dataarray(fname, 'data', engine='h5netcdf') diff --git a/src/toolbox_scs/plot/plot_dssc.py b/src/toolbox_scs/detectors/dssc_plot.py similarity index 81% rename from src/toolbox_scs/plot/plot_dssc.py rename to src/toolbox_scs/detectors/dssc_plot.py index 7475bc4..bd9789f 100644 --- a/src/toolbox_scs/plot/plot_dssc.py +++ b/src/toolbox_scs/detectors/dssc_plot.py @@ -1,11 +1,11 @@ """ DSSC visualization routines - Plotting sub-routines frequently done in combination with dssc analysis. - The initial code is based on: https://github.com/dscran/dssc_process - /blob/master/example_image_process_pulsemask.ipynb +Plotting sub-routines frequently done in combination with dssc analysis. +The initial code is based on: https://github.com/dscran/dssc_process +/blob/master/example_image_process_pulsemask.ipynb - Todo: For visualization of statistical information we could eventually - switch to seaborn: https://seaborn.pydata.org/ +Todo: For visualization of statistical information we could eventually +switch to seaborn: https://seaborn.pydata.org/ """ from time import strftime diff --git a/src/toolbox_scs/detectors/dssc_routines.py b/src/toolbox_scs/detectors/dssc_routines.py new file mode 100644 index 0000000..cc16436 --- /dev/null +++ b/src/toolbox_scs/detectors/dssc_routines.py @@ -0,0 +1,404 @@ +# -*- coding: utf-8 -*- +''' +@author: Michael Schneider, with input from XFEL-CAS and SCS beamline staff + +karabo_data: https://github.com/European-XFEL/karabo_data +SCS ToolBox: https://git.xfel.eu/gitlab/SCS/ToolBox + +contributions should comply with pep8 code structure guidelines. + +Copyright (c) 2019, Michael Schneider +All rights reserved. +''' +import logging +from tqdm import tqdm + +import numpy as np +import xarray as xr +import pandas as pd + +import extra_data as ed + + +log = logging.getLogger(__name__) + + +def load_dssc_info(proposal, run_nr): + """ + Loads the first data file for DSSC module 0 (this is hardcoded) + and returns the detector_info dictionary + + Parameters + ---------- + proposal: str, int + number of proposal + run_nr: str, int + number of run + + Returns + ------- + info : dictionary + {'dims': tuple, 'frames_per_train': int, 'total_frames': int} + """ + + module = ed.open_run(proposal, run_nr, include='*DSSC00*') + info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf') + log.debug("Fetched information for DSSC module nr. 0.") + return info + + +def calc_xgm_frame_indices(nbunches, framepattern): + """ + Returns a coordinate array for XGM data. The coordinates correspond to + DSSC frame numbers and depend on the number of FEL pulses per train + ("nbunches") and the framepattern. In framepattern, dark DSSC frame + names (i.e., without FEL pulse) _must_ include "dark" as a substring. + + Parameters + ---------- + nbunches: int + number of bunches per train + framepattern: list + experimental pattern + + Returns + ------- + frame_indices: numpy.ndarray + coordinate array corresponding to DSSC frame numbers + + """ + + n_frames = len(framepattern) + n_data_frames = np.sum(['dark' not in p for p in framepattern]) + frame_max = nbunches * n_frames // n_data_frames + + frame_indices = [] + for i, p in enumerate(framepattern): + if 'dark' not in p: + frame_indices.append(np.arange(i, frame_max, n_frames)) + + log.debug("Constructed coordinate array for XGM data.") + return np.sort(np.concatenate(frame_indices)) + + +def prepare_module_empty(scan_variable, framepattern): + """ + Create empty (zero-valued) DataArray for a single DSSC module + to iteratively add data to. + + Parameters + ---------- + scan_variable : xarray.DataArray + xarray DataArray containing the specified scan variable using + the trainId as coordinate. + framepattern: list of strings + example: ['pumped', 'unpumped'] + + Returns + ------- + module_data: xarray.Dataset + empty DataArray + """ + + len_scan = len(np.unique(scan_variable)) + dims = ['scan_variable', 'x', 'y'] + coords = {'scan_variable': np.unique(scan_variable)} + shape = [len_scan, 128, 512] + + empty = xr.DataArray(np.zeros(shape, dtype=float), + dims=dims, coords=coords) + empty_sum_count = xr.DataArray(np.zeros(len_scan, dtype=int), + dims=['scan_variable']) + module_data = xr.Dataset() + + for name in framepattern: + module_data[name] = empty.copy() + module_data['sum_count_' + name] = empty_sum_count.copy() + + log.debug("Prepared empty data array for single dssc module") + return module_data + + +def load_chunk_data(sel, sourcename, maxframes=None): + """ + Load selected DSSC data. The flattened multi-index (trains+pulses) is + unraveled before returning the data. + + Parameters + ---------- + sel: extra_data.DataCollection + a DataCollection or a subset of a DataCollection obtained by its + select_trains() method + sourcename: str + + Returns + ------- + xarray.DataArray + """ + + info = sel.detector_info(sourcename) + fpt = info['frames_per_train'] + data = sel.get_array(sourcename, 'image.data', + extra_dims=['_empty_', 'x', 'y'] + ).squeeze() + + tids = np.unique(data.trainId) + data = data.rename(dict(trainId='trainId_pulse')) + + midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)], + names=('trainId', 'pulse')) + data = xr.DataArray(data, + dict(trainId_pulse=midx) + ).unstack('trainId_pulse') + data = data.transpose('trainId', 'pulse', 'x', 'y') + + return data.loc[{'pulse': np.s_[:maxframes]}] + + +def merge_chunk_data(module_data, chunk_data, framepattern): + """ + Merge chunk data with prepared dataset for entire module. + Aligns on "scan_variable" and sums values for variables + ['pumped', 'unpumped', 'sum_count'] + Concatenates the data along a new dimension ('tmp') and uses + the sum() method for automatic dtype conversion + + Parameters + ---------- + module_data: xarray.Dataset + module data array to be filled + chunk_data: xarray.Dataset + loaded chunk of data to be merged into module_data + framepattern: list of strings + example: ['pumped', 'unpumped', 'sum_count'] + + Returns + ------- + module_data: xarray.Dataset + merged module data: + """ + + where = dict(scan_variable=chunk_data.scan_variable) + for name in framepattern: + for prefix in ['', 'sum_count_']: + var = prefix + name + summed = xr.concat([module_data[var].loc[where], chunk_data[var]], + dim='tmp').sum('tmp') + module_data[var].loc[where] = summed + + log.debug("Merged chunked data") + return module_data + + +def split_frames(data, pattern, prefix=''): + """ + Split frames according to "pattern" (possibly repeating) and average over + resulting splits. + + Parameters + ---------- + data: + pattern: + A list of frame names (order matters!). Examples: + # 4 DSSC frames, 2 FEL pulses + pattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark'] + # 2 FEL frames, no intermediate darks + pattern = ['pumped', 'unpumped'] + # no splitting, average over all frames + pattern = ['image'] + + Returns + ------- + dataset: xarray.DataArray + a dataset with data variables named prefix + framename + """ + + n = len(pattern) + dataset = xr.Dataset() + for i, name in enumerate(pattern): + dataset[prefix + name] = data.loc[{'pulse': np.s_[i::n]}].mean('pulse') + return dataset + + +def process_intra_train(job): + """ + Aggregate DSSC data (chunked, to fit into memory) for a single module. + Averages over all trains, but keeps all pulses. + + Parameters + ---------- + job: dictionary + Designed for the multiprocessing module - expects a job dictionary with + the following keys: + proposal : int + proposal number + run : int + run number + module : int + DSSC module to process + chunksize : int + number of trains to process simultaneously + fpt : int + frames per train + + Returns + ------- + module_data: xarray.Dataset + + """ + + proposal = job['proposal'] + run_nr = job['run_nr'] + module = job['module'] + chunksize = job['chunksize'] + fpt = job['fpt'] + maxframes = job.get('maxframes', None) + + log.info(f"Processing dssc module {module}: start") + + sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' + collection = ed.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') + fpt = min(fpt, maxframes) if maxframes is not None else fpt + dims = ['pulse', 'x', 'y'] + coords = {'pulse': np.arange(fpt, dtype=int)} + shape = [fpt, 128, 512] + module_data = xr.DataArray(np.zeros(shape, dtype=float), + dims=dims, coords=coords) + module_data = module_data.to_dataset(name='image') + module_data['sum_count'] = xr.DataArray(np.zeros(fpt, dtype=int), + dims=['pulse']) + ntrains = len(collection.train_ids) + chunks = np.arange(ntrains, step=chunksize) + + if module == 15: + pbar = tqdm(total=len(chunks)) + + for start_index in chunks: + log.debug(f"Module {module}: " + f"load trains {start_index}:{start_index + chunksize}") + + sel = collection.select_trains( + ed.by_index[start_index:start_index + chunksize]) + data = load_chunk_data(sel, sourcename, maxframes) + data = data.to_dataset(name='image') + + data['sum_count'] = xr.full_like(data.image[..., 0, 0], fill_value=1) + data = data.sum('trainId') + + for var in ['image', 'sum_count']: + module_data[var] = xr.concat([module_data[var], data[var]], + dim='tmp').sum('tmp') + log.debug(f"Module {module}: merged chunked data") + + if module == 15: + pbar.update(1) + + module_data['image'] = module_data['image'] / module_data.sum_count + log.info(f"processing module {module}: done") + return module_data + + +def process_dssc_module(job): + """ + Aggregate DSSC data (chunked, to fit into memory) for a single module. + Groups by "scan_variable" in given scanfile - use dummy scan_variable to + average over all trains. This implies, that only trains found in the + scanfile are considered. + + Parameters + ---------- + job:Â dictionary + Designed for the multiprocessing module - expects a job dictionary with + the following keys: + proposal : int + proposal number + run : int + run number + module : int + DSSC module to process + chunksize : int + number of trains to process simultaneously + scanfile : str + name of hdf5 file with xarray.DataArray containing the + scan variable and trainIds + framepattern : list of str + names for the (possibly repeating) intra-train pulses. See + split_dssc_data + pulsemask : str + name of hdf5 file with boolean xarray.DataArray to + select/reject trains and pulses + + Returns + ------- + module_data: xarray.Dataset + + """ + + proposal = job['proposal'] + run_nr = job['run_nr'] + module = job['module'] + chunksize = job['chunksize'] + scanfile = job['scanfile'] + framepattern = job.get('framepattern', ['image']) + maskfile = job.get('maskfile', None) + + sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' + collection = ed.open_run(proposal, run_nr, + include=f'*DSSC{module:02d}*') + ntrains = len(collection.train_ids) + + log.info(f"Processing dssc module {module}: start") + + # read preprocessed scan variable from file + scan = xr.open_dataarray(scanfile, 'data', engine='h5netcdf') + + # read binary pulse/train mask - e.g. from XGM thresholding + if maskfile is not None: + pulsemask = xr.open_dataarray(maskfile, 'data', engine='h5netcdf') + else: + pulsemask = None + + module_data = prepare_module_empty(scan, framepattern) + chunks = np.arange(ntrains, step=chunksize) + + # progress bar + if module == 15: + pbar = tqdm(total=len(chunks)) + + # load data chunk by chunk and merge + for start_index in chunks: + sel = collection.select_trains( + ed.by_index[start_index:start_index + chunksize]) + nframes = sel.detector_info(sourcename)['total_frames'] + if nframes > 0: # some chunks have no DSSC data at all + log.debug(f"Module {module}: " + f"load trains {start_index}:{start_index + chunksize}") + data = load_chunk_data(sel, sourcename) + sum_count = xr.full_like(data[..., 0, 0], fill_value=1) + + if pulsemask is not None: + data = data.where(pulsemask) + sum_count = sum_count.where(pulsemask) + + data = split_frames(data, framepattern) + sum_count = split_frames(sum_count, framepattern, + prefix='sum_count_') + data = xr.merge([data, sum_count]) + + # aligns on trainId, drops non-matching trains + data['scan_variable'] = scan + data = data.groupby('scan_variable').sum('trainId') + + log.debug(f"Module {module}: " + f"merge trains {start_index}:{start_index + chunksize}") + module_data = merge_chunk_data(module_data, data, framepattern) + + if module == 15: + pbar.update(1) + + for name in framepattern: + module_data[name] = module_data[name] / \ + module_data['sum_count_' + name] + + log.info(f"processing module {module}: done") + return module_data diff --git a/src/toolbox_scs/test/test_detectors_dssc.py b/src/toolbox_scs/test/test_detectors_dssc.py index c38374a..3672822 100644 --- a/src/toolbox_scs/test/test_detectors_dssc.py +++ b/src/toolbox_scs/test/test_detectors_dssc.py @@ -9,39 +9,31 @@ from time import strftime import numpy as np import xarray as xr +import extra_data as ed import toolbox_scs as tb import toolbox_scs.detectors as tbdet -from toolbox_scs.detectors.dssc import (load_chunk_data, prepare_module_empty, - split_frames, merge_chunk_data) -import extra_data as ed +from toolbox_scs.detectors.dssc_routines import (load_chunk_data, + prepare_module_empty, split_frames, merge_chunk_data) +from toolbox_scs.detectors.dssc_data import save_to_file, load_from_file +from toolbox_scs.util.exceptions import ToolBoxFileError logging.basicConfig(level=logging.DEBUG) log_root = logging.getLogger(__name__) -# ----------------------------------------------------------------------------- -# global test settings (based on https://github.com/dscran/dssc_process/blob/ -# master/example_image_process_pulsemask.ipynb) -# ----------------------------------------------------------------------------- -proposal = 2212 -run_nr = 235 -is_dark = False -framepattern = ['pumped', 'unpumped'] -maxframes = None -stepsize = .03 -scan_variable = 'PP800_PhaseShifter' -xgm_min = 0 -xgm_max = np.inf -# ----------------------------------------------------------------------------- - - -suites = {"no-processing": ( +suites = {"metafunctions": ( + "test_info", + "test_calcindices", + "test_createpulsemask", + "test_storage" + ), + "processing-related-methods": ( "test_info", "test_calcindices", "test_createpulsemask", "test_loadmergechunk", ), - "full-processing": ( + "image-processing": ( "test_info", "test_calcindices", "test_createpulsemask", @@ -74,41 +66,59 @@ class TestDSSC(unittest.TestCase): @classmethod def setUpClass(cls): log_root.info("Start global setup") - setup_tmp_dir() + # --------------------------------------------------------------------- + # global test settings (based on https://github.com/dscran/dssc_process + # /blob/master/example_image_process_pulsemask.ipynb) + # --------------------------------------------------------------------- + cls._proposal = 2212 + cls._run_nr = 235 + cls._is_dark = False + cls._framepattern = ['pumped', 'unpumped'] + cls._maxframes = None + cls._stepsize = .04 + cls._scan_variable_name = 'PP800_PhaseShifter' + cls._xgm_min = 0 + cls._xgm_max = np.inf cls._scanfile = './tmp/scan.h5' cls._maskfile = './tmp/mask.h5' + # --------------------------------------------------------------------- + + setup_tmp_dir() + - cls._run = tb.load_run(proposal, run_nr, include='*DA*') + cls._run = tb.load_run(cls._proposal, cls._run_nr, include='*DA*') cls._scan_variable = tb.load_scan_variable(cls._run, - scan_variable, stepsize) - cls._scan_variable.to_netcdf(cls._scanfile, group='data', mode='w', - engine='h5netcdf') + cls._scan_variable_name, + cls._stepsize) + cls._scan_variable.to_netcdf(cls._scanfile, + group='data', mode='w', engine='h5netcdf') cls._xgm = tbdet.load_xgm(cls._run) log_root.info("Finished global setup, start tests") + @classmethod def tearDownClass(cls): log_root.info("Clean up test environment....") cleanup_tmp_dir() + def test_info(self): cls = self.__class__ - info = tbdet.load_dssc_info(proposal, run_nr) + info = tbdet.load_dssc_info(cls._proposal, cls._run_nr) fpt = info['frames_per_train'] self.assertEqual(fpt, 20) cls._fpt = fpt - @unittest.skipIf(is_dark is True, "Dark file given, skip xgm data") + def test_calcindices(self): cls = self.__class__ xgm_frame_coords = tbdet.calc_xgm_frame_indices( - cls._xgm.shape[1], framepattern) + cls._xgm.shape[1], cls._framepattern) self.assertIsNotNone(xgm_frame_coords) - print(xgm_frame_coords) cls._xgm['pulse'] = xgm_frame_coords - @unittest.skipIf(is_dark is True, "No xgm data") + def test_createpulsemask(self): cls = self.__class__ @@ -118,7 +128,7 @@ class TestDSSC(unittest.TestCase): 'pulse': range(cls._fpt)} pulsemask = xr.DataArray(data, dims=dimensions, coords=coordinates) - valid = (cls._xgm > xgm_min) * (cls._xgm < xgm_max) + valid = (cls._xgm > cls._xgm_min) * (cls._xgm < cls._xgm_max) pulsemask = valid.combine_first(pulsemask).astype(bool) pulsemask.to_netcdf(cls._maskfile, @@ -135,7 +145,8 @@ class TestDSSC(unittest.TestCase): pulsemask = None # construct empty - empty_data = prepare_module_empty(cls._scan_variable, framepattern) + empty_data = prepare_module_empty(cls._scan_variable, + cls._framepattern) self.assertIsNotNone(empty_data.dims['scan_variable']) # load single chunk @@ -143,7 +154,7 @@ class TestDSSC(unittest.TestCase): chunksize = 512 sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - collection = ed.open_run(proposal, run_nr, + collection = ed.open_run(cls._proposal, cls._run_nr, include=f'*DSSC{module:02d}*') ntrains = len(collection.train_ids) @@ -162,13 +173,14 @@ class TestDSSC(unittest.TestCase): chunk_data = chunk_data.where(pulsemask) sum_count = sum_count.where(pulsemask) - chunk_data = split_frames(chunk_data, framepattern) - sum_count = split_frames(sum_count, framepattern, + chunk_data = split_frames(chunk_data, cls._framepattern) + sum_count = split_frames(sum_count, cls._framepattern, prefix='sum_count_') chunk_data = xr.merge([chunk_data, sum_count]) chunk_data['scan_variable'] = cls._scan_variable chunk_data = chunk_data.groupby('scan_variable').sum('trainId') - merged_data = merge_chunk_data(empty_data, chunk_data, framepattern) + merged_data = merge_chunk_data(empty_data, chunk_data, + cls._framepattern) self.assertIsNotNone(merged_data) @@ -182,14 +194,14 @@ class TestDSSC(unittest.TestCase): jobs = [] for m in range(2): jobs.append(dict( - proposal=proposal, - run_nr=run_nr, + proposal=cls._proposal, + run_nr=cls._run_nr, module=m, chunksize=chunksize, scanfile=cls._scanfile, - framepattern=framepattern, - maskfile=None if is_dark else cls._maskfile, - maxframes=maxframes,)) + framepattern=cls._framepattern, + maskfile=None if cls._is_dark else cls._maskfile, + maxframes=cls._maxframes,)) print(f'start processing modules:', strftime('%X')) @@ -201,9 +213,11 @@ class TestDSSC(unittest.TestCase): def test_intratrain(self): + cls = self.__class__ max_GB = 300 r_nb = 89 - fpt = tbdet.load_dssc_info(proposal, r_nb)['frames_per_train'] + info = tbdet.load_dssc_info(cls._proposal, r_nb) + fpt = info['frames_per_train'] chunksize = int(max_GB * 128 // fpt) chunksize = min(512, chunksize) print('processing', chunksize, 'trains per chunk') @@ -211,7 +225,7 @@ class TestDSSC(unittest.TestCase): jobs = [] for m in range(2): jobs.append(dict( - proposal=proposal, + proposal=cls._proposal, run_nr=r_nb, module=m, chunksize=chunksize, @@ -226,6 +240,23 @@ class TestDSSC(unittest.TestCase): print('finished processing modules:', strftime('%X')) self.assertIsNotNone(module_data) + + def test_storage(self): + cls = self.__class__ + + save_to_file(cls._scan_variable, 'tmp/scan2.h5') + save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') + save_to_file(cls._scan_variable, 'tmp/scan3.h5', overwrite = True) + scandata = load_from_file('tmp/scan3.h5') + + self.assertIsNotNone(scandata) + maskdata = load_from_file('tmp/mask.h5') + self.assertIsNotNone(maskdata) + + with self.assertRaises(ToolBoxFileError) as cm: + save_to_file(cls._scan_variable, 'scan3.h5', path = './tmp/') + self.assertEqual(cm.exception.value, './tmp/scan3.h5') + def list_suites(): print("\nPossible test suites:\n" + "-" * 79) diff --git a/src/toolbox_scs/test/test_misc.py b/src/toolbox_scs/test/test_misc.py new file mode 100644 index 0000000..8644aa6 --- /dev/null +++ b/src/toolbox_scs/test/test_misc.py @@ -0,0 +1,115 @@ +import unittest +import logging +import os +import sys +import argparse + +import toolbox_scs as tb +import toolbox_scs.misc as tbm +from toolbox_scs.util.exceptions import ToolBoxPathError + +# ----------------------------------------------------------------------------- +# global test settings +# ----------------------------------------------------------------------------- +proposalNB = 2511 +runNB = 176 +# ----------------------------------------------------------------------------- + +suites = {"bunch-pattern-decoding": ( + "test_isppl", + "test_issase1", + "test_issase3", + "test_extractBunchPattern", + "test_pulsePatternInfo", + ) + } + +class TestDataAccess(unittest.TestCase): + @classmethod + def setUpClass(cls): + run = tb.load_run(proposalNB, runNB) + mnemonic = tb.mnemonics["bunchPatternTable"] + cls._bpt = run.get_array(*mnemonic.values()) + + @classmethod + def tearDownClass(cls): + pass + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_isppl(self): + cls = self.__class__ + bpt_decoded = tbm.is_ppl(cls._bpt) + self.assertEqual(bpt_decoded.values[0][0],1) + + def test_issase1(self): + cls = self.__class__ + bpt_decoded = tbm.is_sase_3(cls._bpt) + self.assertEqual(bpt_decoded.values[0][0],0) + + def test_issase3(self): + cls = self.__class__ + bpt_decoded = tbm.is_sase_3(cls._bpt) + self.assertEqual(bpt_decoded.values[0][0],0) + + def test_extractBunchPattern(self): + cls = self.__class__ + bpt_decoded = tbm.extractBunchPattern(cls._bpt, + 'scs_ppl') + self.assertIsNotNone(bpt_decoded) + self.assertEqual(bpt_decoded[0].values[0,1],80) + + def test_pulsePatternInfo(self): + pass + + +def list_suites(): + print("\nPossible test suites:\n" + "-" * 79) + for key in suites: + print(key) + print("-" * 79 + "\n") + +def suite(*tests): + suite = unittest.TestSuite() + for test in tests: + suite.addTest(TestDataAccess(test)) + return suite + + +def start_tests(*cliargs): + logging.basicConfig(level=logging.DEBUG) + log_root = logging.getLogger(__name__) + try: + for test_suite in cliargs: + if test_suite in suites: + runner = unittest.TextTestRunner(verbosity=2) + runner.run(suite(*suites[test_suite])) + else: + log_root.warning( + "Unknown suite: '{}'".format(test_suite)) + pass + except Exception as err: + log_root.error("Unecpected error: {}".format(err), + exc_info=True) + pass + + +if __name__ == '__main__': + parser = argparse.ArgumentParser() + parser.add_argument('--list-suites', + action='store_true', + help='list possible test suites') + parser.add_argument('--run-suites', metavar='S', + nargs='+', action='store', + help='a list of valid test suites') + args = parser.parse_args() + + if args.list_suites: + list_suites() + + if args.run_suites: + start_tests(*args.run_suites) diff --git a/src/toolbox_scs/util/exceptions.py b/src/toolbox_scs/util/exceptions.py index 59f504b..9b8049a 100644 --- a/src/toolbox_scs/util/exceptions.py +++ b/src/toolbox_scs/util/exceptions.py @@ -1,5 +1,7 @@ class ToolBoxError(Exception): - """ Parent Toolbox exception.""" + """ + Parent Toolbox exception. (to be defined) + """ pass @@ -18,4 +20,10 @@ class ToolBoxTypeError(ToolBoxError): class ToolBoxValueError(ToolBoxError): def __init__(self, msg = "", val = None): self.value = val - self.message = msg + " unknown value: " + val \ No newline at end of file + self.message = msg + " unknown value: " + val + + +class ToolBoxFileError(ToolBoxError): + def __init__(self, msg = "", val = ''): + self.value = val + self.message = f"file: {val}, {msg}" \ No newline at end of file -- GitLab