diff --git a/VERSION b/VERSION index 21e8796a09d4f26935ffc4879147879a153f0193..9686a2dfa9a8c70dc792f351213dde698abd0068 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.0.3 +1.1a3 diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index 5059545ee600c791fa6147d42424b2f7dfeb88ef..818982711c1184848d3a3b4bbf601d4f52a13a6f 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -3,12 +3,12 @@ from .xgm import ( from .tim import ( load_TIM,) from .dssc_data import ( - save_xarray, load_xarray, get_data_formatted) + save_xarray, load_xarray, get_data_formatted, save_attributes_h5) from .dssc_misc import ( - load_dssc_info, create_dssc_bins, load_geom, quickmask_DSSC_ASIC, - calc_xgm_frame_indices, get_xgm_formatted, load_mask) + load_dssc_info, create_dssc_bins, quickmask_DSSC_ASIC, + get_xgm_formatted, load_mask) from .dssc_processing import ( - bin_data) + process_dssc_data) from .dssc import ( DSSCBinner, DSSCFormatter, DSSCAnalyzer) from .azimuthal_integrator import AzimuthalIntegrator @@ -23,11 +23,11 @@ __all__ = ( "create_dssc_bins", "calc_xgm_frame_indices", "get_xgm_formatted", - "bin_data", + "process_dssc_data", "save_xarray", "load_xarray", "get_data_formatted", - "load_geom", + "save_attributes_h5", "quickmask_DSSC_ASIC", "load_mask", # Classes diff --git a/src/toolbox_scs/detectors/azimuthal_integrator.py b/src/toolbox_scs/detectors/azimuthal_integrator.py index fb56490c1e6b8256976bc725e46b7626d2853bdf..ea39aca87c9d65f289f4fd1579ab2d1e64b5aa07 100644 --- a/src/toolbox_scs/detectors/azimuthal_integrator.py +++ b/src/toolbox_scs/detectors/azimuthal_integrator.py @@ -10,26 +10,30 @@ class AzimuthalIntegrator(object): Create a reusable integrator for repeated azimuthal integration of similar images. Calculates array indices for a given parameter set that allows fast recalculation. - + + Copyright (c) 2019, Michael Schneider + Copyright (c) 2020, SCS-team + license: BSD 3-Clause License (see LICENSE_BSD for more info) + Parameters ========== imageshape : tuple of ints The shape of the images to be integrated over. - + center : tuple of ints center coordinates in pixels - + polar_range : tuple of ints start and stop polar angle (in degrees) to restrict integration to wedges - + dr : int, default 2 radial width of the integration slices. Takes non-square DSSC pixels into account. - + aspect: float, default 204/236 for DSSC aspect ratio of the pixel pitch - + Returns ======= ai : azimuthal_integrator instance @@ -75,7 +79,7 @@ class AzimuthalIntegrator(object): (dist_array < dist) self.flat_indices.append(self.index_array[ring_mask]) self.distance = np.append(self.distance, dist) - + def __call__(self, image): assert self.shape == image.shape, 'image shape does not match' image_flat = image.flatten() diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index 189f7d7209a7c185c4bf67880095150a2355dcf1..a5aa64bb8d28cf2a25430e7c782cbf64eb5af3c2 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -25,9 +25,9 @@ from .dssc_data import ( save_xarray, load_xarray, save_attributes_h5, search_files, get_data_formatted) from .dssc_misc import ( - load_dssc_info, get_xgm_formatted, load_geom) + load_dssc_info, get_xgm_formatted) from .dssc_processing import ( - bin_data, create_empty_dataset) + process_dssc_data, create_empty_dataset) __all__ = ["DSSCBinner", "DSSCFormatter", "DSSCAnalyzer"] log = logging.getLogger(__name__) @@ -36,8 +36,7 @@ log = logging.getLogger(__name__) class DSSCBinner: def __init__(self, proposal_nr, run_nr, binners={}, - use_xgm=False, xgm_threshold=(0, np.inf), xgm_bins=None, - use_tim=False + xgm_name='SCS_SA3', ): """ A dssc binner object. Loads and bins the dssc data according to the @@ -53,13 +52,10 @@ class DSSCBinner: binners: dictionary dictionary containing binners constructed using the 'create_dssc_bins' tbdet-method. - use_xgm: bool - in case run is not a dark the xgm data can be used to create a - pulsemask for additional data filtering. - xgm_threshold: tuple - the lower and upper boundary of xgm values rendering valid data. - use_tim: bool - -> to be implemented. Same role as 'use_xgm' + xgm_name: str + a valid mnemonic key of the XGM data to be used to mask the dssc + frames. Since the xgm is used in several methods its name can be + set here globally. Returns ------- @@ -80,30 +76,15 @@ class DSSCBinner: self.runnr = run_nr self.info = load_dssc_info(proposal_nr, run_nr) self.run = load_run(proposal_nr, run_nr) - self.xgmthreshold = xgm_threshold - self.binners = binners - + self.binners = {} + for b in binners: + self.add_binner(b, binners[b]) + self.xgm_name = xgm_name # --------------------------------------------------------------------- # Prepare pulse mask for additional data reduction next to binning # --------------------------------------------------------------------- self.xgm = None - fpt = self.info['frames_per_train'] - data = np.ones([len(self.run.train_ids), fpt], dtype=bool) - self.pulsemask = xr.DataArray(data, - dims=['trainId', 'pulse'], - coords={'trainId': self.run.train_ids, - 'pulse': range(fpt)}) - if use_xgm: - self.xgm = get_xgm_formatted(self.run, xgm_bins) - valid = (self.xgm > self.xgmthreshold[0]) * \ - (self.xgm < self.xgmthreshold[1]) - self.pulsemask = \ - valid.combine_first(self.pulsemask).astype(bool) - - nrejected = int(valid.size - valid.sum()) - percent_rejected = 100 * nrejected / valid.size - log.info(f'rejecting {nrejected} out of {valid.size} pulses' - f'({percent_rejected:.1f}%) due to xgm threshold') + self.pulsemask = None log.debug("Constructed DSSC object") @@ -135,6 +116,35 @@ class DSSCBinner: log.info(msg+", no binner created") raise ToolBoxValueError(msg, name) + def load_xgm(self, xgm_coord_stride=1): + """ + load xgm data and construct coordinate array according to corresponding + dssc frame number. + """ + self.xgm = get_xgm_formatted(self.run, self.xgm_name, xgm_coord_stride) + + def create_xgm_mask(self, + xgm_threshold=(0, np.inf), normevery=1): + """ + creates a mask for dssc frames according to measured xgm intensity. + """ + fpt = self.info['frames_per_train'] + n_trains = self.info['number_of_trains'] + trainIds = self.info['trainIds'] + data = np.ones([n_trains, fpt], dtype=bool) + self.pulsemask = xr.DataArray(data, + dims=['trainId', 'pulse'], + coords={'trainId': trainIds, + 'pulse': range(fpt)}) + + self.load_xgm(xgm_coord_stride=normevery) + valid = (self.xgm > xgm_threshold[0]) * \ + (self.xgm < xgm_threshold[1]) + self.pulsemask = \ + (valid.combine_first(self.pulsemask).astype(bool))[:,0:fpt] + + log.info(f'created pulse mask used during processing') + def get_info(self): """ Returns the expected shape of the binned dataset, in case binners have @@ -151,8 +161,6 @@ class DSSCBinner: """ Bin the xgm data according to the binners of the dssc data. The result can eventually be merged into the final dataset by the DSSCFormatter. - The xgm data is used internally to create a mask to filter certain dssc - images. Returns ------- @@ -166,15 +174,19 @@ class DSSCBinner: xgm_data[b+"_binned"] = self.binners[b] xgm_data = xgm_data.groupby(b+"_binned").mean(b) xgm_data = xgm_data.rename(name_dict={b+"_binned": b}) - return xgm_data + return xgm_data.transpose('trainId', 'pulse') else: - log.warning("no xgm data has been loaded.") + log.warning("no xgm data. Use load_xgm() to load the xgm data.") pass # ------------------------------------------------------------------------- # Data processing # ------------------------------------------------------------------------- - def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None): + def process_data(self, modules=[], filepath='./', + chunksize=512, backend='loky', n_jobs=None, + dark_image=None, + xgm_normalization=False, normevery=1 + ): """ Load and bin dssc data according to self.bins @@ -187,14 +199,20 @@ class DSSCBinner: experimental and not appropriately implemented in the dbdet member function 'bin_data'. n_jobs: int - number of cpu's used per sub-process. Note that when using the - default backend there is no need to adjust this parameter with the - current implementation. + inversely proportional of the number of cpu's available for one + job. Tasks within one job can grab a maximum of n_CPU_tot/n_jobs of + cpu's. + Note that when using the default backend there is no need to adjust + this parameter with the current implementation. modules: list of ints a list containing the module numbers that should be processed. If empty, all modules are processed. chunksize: int The number of trains that should be read in one iterative step. + dark_image: xarray.DataArray + DataArray with dimensions compatible with the loaded dssc data. + normevery: int + integer indicating which out of normevery frame will be normalized. Returns ------- @@ -214,26 +232,32 @@ class DSSCBinner: module_jobs = [] for m in mod_list: + dark = dark_image + if dark_image is not None: + dark = dark_image.sel(module=m) module_jobs.append(dict( proposal=self.proposal, run_nr=self.runnr, module=m, chunksize=chunksize, + path=filepath, info=self.info, dssc_binners=self.binners, + pulsemask=self.pulsemask, + dark_image=dark, + xgm_normalization=xgm_normalization, + xgm_mnemonic=self.xgm_name, + normevery=normevery, )) data = None log.info(f'using parallelization backend {backend}') - data = joblib.Parallel(n_jobs=njobs, backend=backend) \ - (joblib.delayed(bin_data)(**module_jobs[i]) \ + joblib.Parallel(n_jobs=njobs, backend=backend) \ + (joblib.delayed(process_dssc_data)(**module_jobs[i]) \ for i in range(len(mod_list))) - data = xr.concat(data, dim='module') - data = data.assign_coords(module=("module", np.array(mod_list))) log.info(f'Binning done') - return data class DSSCFormatter: diff --git a/src/toolbox_scs/detectors/dssc_data.py b/src/toolbox_scs/detectors/dssc_data.py index f38afe6148c2ff7316cbcff5add0023cb593b1f7..fa918bb39321d3ef2f052e940397c26ffe803b8f 100644 --- a/src/toolbox_scs/detectors/dssc_data.py +++ b/src/toolbox_scs/detectors/dssc_data.py @@ -157,6 +157,11 @@ def get_data_formatted(filenames=[], data_list=[]): elif any(data_list) is True: data = data_list + mod_list = [] + for d in data: + if 'module' in d.attrs.keys(): + mod_list.append(d.attrs['module']) + if type(data[0]).__module__ == 'xarray.core.dataset': data = xr.concat(data, dim='module') elif type(data[0]).__module__ == 'pandas.core.frame': @@ -164,7 +169,10 @@ def get_data_formatted(filenames=[], data_list=[]): elif type(data[0]).__module__ == 'dask.dataframe.core': pass + if mod_list is not None: + data = data.assign_coords(module=("module", mod_list)) data = data.sortby("module") + data.attrs.clear() return data.transpose('trainId', 'pulse', 'module', 'x', 'y') diff --git a/src/toolbox_scs/detectors/dssc_misc.py b/src/toolbox_scs/detectors/dssc_misc.py index 5f9a2f68f253dd37ce56d845f0880579ce394819..285bea62a4b901ee40ab1bb9c82ac2ed98d5877a 100644 --- a/src/toolbox_scs/detectors/dssc_misc.py +++ b/src/toolbox_scs/detectors/dssc_misc.py @@ -11,8 +11,8 @@ from imageio import imread import extra_data as ed from extra_geom import DSSC_1MGeometry -from .xgm import load_xgm -#from .dssc_processing import split_frames as _split_frames + +from ..constants import mnemonics as _mnemonics log = logging.getLogger(__name__) @@ -35,11 +35,11 @@ def load_dssc_info(proposal, run_nr): info : dictionary {'dims': tuple, 'frames_per_train': int, 'total_frames': int} """ - - module = ed.open_run(proposal, run_nr, include='*DSSC00*') - info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf') + module = ed.open_run(proposal, run_nr, include='*DSSC01*') + info = module.detector_info('SCS_DET_DSSC1M-1/DET/1CH0:xtdf') + info["number_of_trains"] = len(module.train_ids) info["trainIds"] = module.train_ids - log.debug("Fetched information for DSSC module nr. 0.") + log.debug("Fetched information for DSSC module nr. 1.") return info @@ -95,89 +95,37 @@ def create_dssc_bins(name, coordinates, bins): 'trainId, x, or y') -def calc_xgm_frame_indices(pulse_bins): - """ - Returns a coordinate array for XGM data. The coordinates correspond to - DSSC frame numbers which are not darks. - - Parameters - ---------- - pulse_bins: list - bins along which the pulse dimension will be binned - - Returns - ------- - frame_indices: numpy.ndarray - coordinate array corresponding to DSSC frame numbers which are not - darks. - """ - - frame_indices = [] - for i, p in enumerate(pulse_bins): - if 'dark' not in p: - frame_indices.append(i) - - log.debug("Constructed coordinate array for XGM data.") - return frame_indices - - -def get_xgm_formatted(run, pulse_bins): +def get_xgm_formatted(run, xgm_name, xgm_coord_stride=1): """ Load the xgm data and define coordinates along the pulse dimension. - + Parameters ---------- run: extra_data.DataCollection DataCollection object providing access to the xgm data to be loaded - pulse_bins: list, numpy.ndarray - bins along the pulse dimension not containing darks - + xgm_name: str + valid mnemonic of a xgm source + xgm_coord_stride: int + defines which dssc frames should be normalized using data from the xgm. + Returns ------- xgm: xarray.DataArray xgm data with coordinate 'pulse'. """ - xgm = load_xgm(run) - if pulse_bins is None: - xgm_frame_coords = np.linspace(0, xgm.shape[1]-1, xgm.shape[1], - dtype=np.uint64) - else: - xgm_frame_coords = calc_xgm_frame_indices(pulse_bins) + log.debug('load raw xgm data') + xgm = run.get_array(*_mnemonics[xgm_name].values()) + log.debug('format xgm coordinates') + xgm_frame_coords = np.linspace(0, + (xgm.shape[1]-1)*xgm_coord_stride, + xgm.shape[1], + dtype=np.uint64) + xgm = xgm.rename(new_name_or_name_dict={'XGMbunchId':'pulse'}) xgm['pulse'] = xgm_frame_coords + log.info('loaded formatted xgm data.') return xgm -def load_geom(geopath=None, quad_pos=None): - """ - Loads and return the DSSC geometry. - - Parameters - ---------- - geopath: str - path to the h5 geometry file. If None uses a default file. - quad_pos: list of quadrants tuple position. If None uses a default - position. - - Returns - ------- - geom: extra_geom.DSSC_1MGeometry - loaded geometry object - """ - if quad_pos is None: - quad_pos = [(-124.100, 3.112), # TR - (-133.068, -110.604), # BR - (0.988, -125.236), # BL - (4.528, -4.912) # TL - ] - - if geopath is None: - geopath = '/gpfs/exfel/sw/software/git/EXtra-geom/' \ - 'docs/dssc_geo_june19.h5' - - geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(geopath, quad_pos) - return geom - - def quickmask_DSSC_ASIC(geom, poslist): ''' Returns a mask for the given DSSC geometry with ASICs given in poslist diff --git a/src/toolbox_scs/detectors/dssc_processing.py b/src/toolbox_scs/detectors/dssc_processing.py index 7ce2927435635f72889980ee057a870e7ebbbc84..256e13f4a7d64c439accf6c19dd8fd64e17d28a2 100644 --- a/src/toolbox_scs/detectors/dssc_processing.py +++ b/src/toolbox_scs/detectors/dssc_processing.py @@ -4,6 +4,7 @@ comment: contributions should comply with pep8 code structure guidelines. """ import logging +import os from tqdm import tqdm import numpy as np @@ -12,6 +13,9 @@ import pandas as pd import extra_data as ed +from .dssc_misc import get_xgm_formatted +from ..constants import mnemonics as _mnemonics +from .dssc_data import save_xarray log = logging.getLogger(__name__) @@ -25,7 +29,7 @@ def create_empty_dataset(run_info, binners = {}): Parameters ---------- - + Returns ------- @@ -100,12 +104,46 @@ def load_chunk_data(sel, sourcename, maxframes=None): ).unstack('trainId_pulse') data = data.transpose('trainId', 'pulse', 'x', 'y') - log.debug(f"loaded chunk data") + log.debug(f"loaded and formatted chunk of dssc data") return data.loc[{'pulse': np.s_[:maxframes]}] -def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners, - pulsemask=None): +def _load_chunk_xgm(sel, xgm_mnemonic, stride): + """ + Load selected xgm data. + + Copyright (c) 2020, SCS-team + + Parameters + ---------- + sel: extra_data.DataCollection + a DataCollection or a subset of a DataCollection obtained by its + select_trains() method + xgm_mnemonic: str + a valid mnemonic for an xgm source from the tb.mnemonic dictionary + stride: int + indicating which dssc frames should be normalized using the xgm data. + + Returns + ------- + xarray.DataArray + """ + d = sel.get_array(*_mnemonics[xgm_mnemonic].values()) + d = d.rename(new_name_or_name_dict={'XGMbunchId':'pulse'}) + frame_coords = np.linspace(0,(d.shape[1]-1)*stride, d.shape[1], dtype=int) + d = d.assign_coords({'pulse':frame_coords}) + log.debug(f"loaded and formatted chunk of xgm data") + return d + + +def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners, + path='./', + pulsemask=None, + dark_image=None, + xgm_normalization=False, + xgm_mnemonic='SCS_SA3', + normevery=1 + ): """ Collects and reduces DSSC data for a single module. @@ -124,70 +162,115 @@ def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners, info: dictionary dictionary containing keys 'dims', 'frames_per_train', 'total_frames', 'trainIds' + dssc_binners: dictionary + a dictionary containing binner objects created by the tbdet member + function "create_binner()" pulsemask : numpy.ndarray array of booleans to be used to mask dssc data according to xgm data. + dark_image: xarray.DataArray + an xarray dataarray with matching coordinates with the loaded data. If + dark_image is not None it will be substracted from each individual dssc + frame. + xgm_normalization: bool + true if the data should be divided by the corresponding xgm value. + xgm_mnemonic: str + Mnemonic of the xgm data to be used for normalization. + normevery: int + One out of normevery dssc frames will be normalized. Returns ------- module_data: xarray.Dataset xarray datastructure containing data binned according to bins. """ - - sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - collection = ed.open_run(proposal, run_nr, - include=f'*DSSC{module:02d}*') - ntrains = len(collection.train_ids) + # metadata definition + ntrains = info['number_of_trains'] + fpt = info['frames_per_train'] chunks = np.arange(ntrains, step=chunksize) - log.info(f"Processing dssc module {module}: start") + sourcename_dssc = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - # progress bar - if module == 15: - pbar = tqdm(total=len(chunks)) + # open extra_data run objects + collection_DSSC = ed.open_run(proposal, run_nr, + include=f'*DSSC{module:02d}*') + collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*') + log.info(f"Processing dssc module {module}: start") - # create empty dataset to be filled iteratively + # create empty dataset for dssc data to be filled iteratively module_data = create_empty_dataset(info, dssc_binners) - # load data chunk by chunk and merge + # load data chunk by chunk and merge result into prepared empty dataset for chunk in chunks: - sel = collection.select_trains( - ed.by_index[chunk:chunk + chunksize]) - nframes = sel.detector_info(sourcename)['total_frames'] - if nframes > 0: - log.debug(f"Module {module}: " - f"load trains {chunk}:{chunk + chunksize}") - - chunk_data = load_chunk_data(sel, sourcename) - chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1) - - # prefiltering -> xgm pulse masking, and related creation of - # histogram subset - if pulsemask is not None: - chunk_data = chunk_data.where(pulsemask) - chunk_hist = chunk_hist.where(pulsemask) - chunk_data = chunk_data.to_dataset(name='data') - chunk_data['hist'] = chunk_hist - - # data reduction -> apply binners - log.debug(f'Module {module}: apply binning to chunk_data') - for b in dssc_binners: - chunk_data[b+"_binned"] = dssc_binners[b] - chunk_data = chunk_data.groupby(b+"_binned").sum(b) - chunk_data = chunk_data.rename(name_dict={b+"_binned":b}) - - # add chunk data to loaded data - for var in ['data', 'hist']: - module_data[var] = xr.concat([module_data[var], - chunk_data[var]], - dim='tmp').sum('tmp') - - log.debug(f"Module {module}: " - f"processed trains {chunk}:{chunk + chunksize}") - - if module == 15: - pbar.update(1) - + log.debug(f"Module {module}: " + f"load trains {chunk}:{chunk + chunksize}") + + sel = collection_DSSC.select_trains( + ed.by_index[chunk:chunk + chunksize]) + chunk_data = load_chunk_data(sel, sourcename_dssc) + chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1) + + # --------------------------------------------------------------------- + # optional blocks -> ToDo: see merge request !89 + # --------------------------------------------------------------------- + # option 1: prefiltering -> xgm pulse masking + if pulsemask is not None: + log.debug(f'Module {module}: drop out-of-bounds frames') + # relatively slow. ToDo -> wrap using np.ufunc + chunk_data = chunk_data.where(pulsemask) + chunk_hist = chunk_hist.where(pulsemask) + + # option 2: substraction of dark image/s + if dark_image is not None: + log.debug(f'Module {module}: substract dark') + chunk_data.values = chunk_data.values - dark_image.values + # slower: using xarray directly + #chunk_data = chunk_data - dark_image + + # option 3: normalize dssc frames by their xgm values + if xgm_normalization: + log.debug(f'Module {module}: load and format xgm data') + sel_DA1 = collection_DA1.select_trains( + ed.by_index[chunk:chunk + chunksize]) + chunk_xgm = _load_chunk_xgm(sel_DA1, xgm_mnemonic, normevery) + + log.debug(f'Module {module}: normalize chunk_data using xgm') + # the following line uses numpys fast vectorized array operation + # there is overhead coming from rewriting the xarrayDataarray + chunk_data.values[:,0::normevery,:,:] = \ + np.divide(chunk_data[:,0::normevery,:,:], chunk_xgm) + # slow code using xarray directly + #chunk_data = chunk_data / chunk_xgm + # --------------------------------------------------------------------- + # end optional blocks: xarray operations from here on. + # --------------------------------------------------------------------- + + # data reduction -> apply binners + log.debug(f'Module {module}: apply binning to chunk_data') + chunk_data = chunk_data.to_dataset(name='data') + chunk_data['hist'] = chunk_hist + for b in dssc_binners: + chunk_data[b+"_binned"] = dssc_binners[b] + chunk_data = chunk_data.groupby(b+"_binned").sum(b) + chunk_data = chunk_data.rename(name_dict={b+"_binned":b}) + + # add chunk data to loaded data + log.debug(f'Module {module}: merge junk data') + for var in ['data', 'hist']: + module_data[var] = xr.concat([module_data[var], + chunk_data[var]], + dim='tmp').sum('tmp') + + log.debug(f"Module {module}: " + f"processed trains {chunk}:{chunk + chunksize}") + + log.debug(f'Module {module}: calculate mean') module_data['data'] = module_data['data'] / module_data['hist'] module_data = module_data.transpose('trainId', 'pulse', 'x', 'y') + module_data.attrs['module'] = module + + log.info(f'saving module {module}') + if not os.path.isdir(path): + os.mkdir(path) + fname = f'run_{run_nr}_module{module}.h5' + save_xarray(path+fname, module_data, mode='a') log.info(f"processing module {module}: done") - return module_data diff --git a/src/toolbox_scs/detectors/xgm.py b/src/toolbox_scs/detectors/xgm.py index 7d71e0fcb10c2303d54447d1457cc0e858d6049d..20e65f529f082e223112157569ac24a267ac0781 100644 --- a/src/toolbox_scs/detectors/xgm.py +++ b/src/toolbox_scs/detectors/xgm.py @@ -14,11 +14,13 @@ import matplotlib.pyplot as plt from scipy.signal import find_peaks import extra_data as ed +from ..constants import mnemonics as _mnemonics + log = logging.getLogger(__name__) -def load_xgm(run): +def load_xgm(run, xgm_mnemonic='SCS_SA3'): """ Loads XGM data from karabo_data.DataCollection @@ -38,25 +40,18 @@ def load_xgm(run): >>> run = tb.run_by_proposal(2212, 235) >>> xgm_data = tbdet.load_xgm(run) """ - '''''' - nbunches = run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', - 'sase3.nPulses.value') + nbunches = run.get_array(*_mnemonics['sase3'].values()) nbunches = np.unique(nbunches) if len(nbunches) == 1: nbunches = nbunches[0] else: - log.warning(f'not all trains have same length DSSC data ' - f'nbunches: {nbunches}') + log.info('change of pulse pattern in sase3 during the run.') nbunches = max(nbunches) - - log.info(f'SASE3 bunches per train: {nbunches}') - xgm = run.get_array('SCS_BLU_XGM/XGM/DOOCS:output', - 'data.intensitySa3TD', - roi=ed.by_index[:nbunches], - extra_dims=['pulse']) - return xgm - + log.info(f'maximum number of bunches per train in sase3: {nbunches}') + xgm = run.get_array(*_mnemonics[xgm_mnemonic].values(), + roi=ed.by_index[:nbunches]) + return xgm def cleanXGMdata(data, npulses=None, sase3First=True): diff --git a/src/toolbox_scs/test/test_detectors_common.py b/src/toolbox_scs/test/test_detectors_common.py deleted file mode 100644 index 7e54d491670ccda2879c97017cebda0e71734afd..0000000000000000000000000000000000000000 --- a/src/toolbox_scs/test/test_detectors_common.py +++ /dev/null @@ -1,121 +0,0 @@ -import unittest -import logging -import os -import sys -import argparse - - -import toolbox_scs as tb -import toolbox_scs.detectors as tbdet -from toolbox_scs.util.exceptions import * - -logging.basicConfig(level=logging.DEBUG) -log_root = logging.getLogger(__name__) - -suites = {"packaging": ( - "test_init",), - "xgm": ( - "test_loadxgm", - "test_cleanxgm", - "test_matchxgmtim",), - "tim": ( - "test_loadtim",) - } - - -def list_suites(): - print("""\nPossible test suites:\n-------------------------""") - for key in suites: - print(key) - print("-------------------------\n") - - -class TestDetectors(unittest.TestCase): - @classmethod - def setUpClass(cls): - log_root.info("Start global setup.") - cls._run = tb.load_run(2212, 235) - - fields = ["sase1", "sase3", "npulses_sase3", - "npulses_sase1", "MCP2apd", "SCS_SA3", "nrj"] - cls._tb_data = tb.load(fields, 235, 2212) - - log_root.info("Finished global setup, start tests.") - - @classmethod - def tearDownClass(cls): - pass - - def setUp(self): - pass - - def tearDown(self): - pass - - def test_init(self): - self.assertEqual(tbdet.__name__, "toolbox_scs.detectors") - - def test_loadxgm(self): - cls = self.__class__ - xgm_data = tbdet.load_xgm(cls._run) - self.assertTrue(xgm_data.values[0][-1]) - - def test_cleanxgm(self): - cls = self.__class__ - data = tbdet.cleanXGMdata(cls._tb_data) - self.assertEqual(data['sa3_pId'].values[-1], 19) - - def test_matchxgmtim(self): - cls = self.__class__ - data = tbdet.matchXgmTimPulseId(cls._tb_data) - self.assertEqual(data['npulses_sase3'].values[0], 20) - - def test_loadtim(self): - cls = self.__class__ - data = tbdet.load_TIM(cls._run) - self.assertEqual(data.name, 'MCP2apd') - - - -def suite(*tests): - suite = unittest.TestSuite() - for test in tests: - suite.addTest(TestDetectors(test)) - return suite - - -def main(*cliargs): - try: - for test_suite in cliargs: - if test_suite in suites: - runner = unittest.TextTestRunner(verbosity=2) - runner.run(suite(*suites[test_suite])) - else: - log_root.warning( - "Unknown suite: '{}'".format(test_suite)) - pass - except Exception as err: - log_root.error("Unecpected error: {}".format(err), - exc_info=True) - pass - - - - - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--list-suites', - action='store_true', - help='list possible test suites') - parser.add_argument('--run-suites', metavar='S', - nargs='+', action='store', - help='a list of valid test suites') - args = parser.parse_args() - - if args.list_suites: - list_suites() - - if args.run_suites: - main(*args.run_suites) diff --git a/src/toolbox_scs/test/test_dssc_cls.py b/src/toolbox_scs/test/test_dssc_cls.py index 746529fa7b6a19dafbe692f09dde0d175386e192..c43b45f5218c2e05abb25d5251ec2257a626939c 100644 --- a/src/toolbox_scs/test/test_dssc_cls.py +++ b/src/toolbox_scs/test/test_dssc_cls.py @@ -7,6 +7,7 @@ from time import strftime import numpy as np import xarray as xr +import extra_data as ed import toolbox_scs as tb import toolbox_scs.detectors as tbdet @@ -18,11 +19,8 @@ log_root = logging.getLogger(__name__) suites = {"no-processing": ( "test_create", ), - "processing-quick": ( - "test_binning_quick", - ), - "processing-full": ( - "test_binning_xgm", + "processing": ( + "test_normalization_all2", ) } @@ -75,20 +73,18 @@ class TestDSSC(unittest.TestCase): np.linspace(0,19,20, dtype=int), bins_pulse) binners = {'trainId': binner1, 'pulse': binner2} - params = {'binners': binners, - 'use_xgm': True, - 'xgm_threshold' : (0, np.inf), - 'xgm_bins': bins_pulse} + params = {'binners': binners} # normal run235 = tbdet.DSSCBinner(proposal_nb, run_nb) del(run235) - run235 = tbdet.DSSCBinner(2212, 235, use_xgm=True) - run235 = tbdet.DSSCBinner(2212, 235, - use_xgm=True, - xgm_bins=bins_pulse) - run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params) - print(run235.xgm) + + run235 = tbdet.DSSCBinner(2212, 235) + run235.add_binner('trainId', binner1) + run235.add_binner('pulse', binner2) + xgm_threshold=(300.0, 8000.0) + run235.create_xgm_mask(xgm_threshold, 1) + self.assertEqual(run235.binners['trainId'].values[0], np.float32(7585.52)) @@ -100,68 +96,58 @@ class TestDSSC(unittest.TestCase): self.assertEqual(str(cm.exception), err_msg) - def test_binning_quick(self): + def test_normalization_all2(self): + proposal_nb = 2530 + # dark - proposal_nb = 2212 - run_nb = 232 - run = tb.load_run(proposal_nb, run_nb, include='*DA*') + run_nb = 49 run_info = tbdet.load_dssc_info(proposal_nb, run_nb) - bins_trainId = tb.get_array(run, - 'PP800_PhaseShifter', - 0.03) - bins_pulse = ['pumped', 'unpumped'] * 10 + fpt = run_info['frames_per_train'] + n_trains = run_info['number_of_trains'] + trainIds = run_info['trainIds'] + + buckets_train = np.zeros(n_trains) + buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] binner1 = tbdet.create_dssc_bins("trainId", - run_info['trainIds'], - bins_trainId.values) + trainIds, + buckets_train) binner2 = tbdet.create_dssc_bins("pulse", - np.linspace(0,19,20, dtype=int), - bins_pulse) + np.linspace(0,fpt-1,fpt, dtype=int), + buckets_pulse) binners = {'trainId': binner1, 'pulse': binner2} - run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) - mod_list = [2,15] - data = run232.bin_data(modules=mod_list) - self.assertIsNotNone(data.data) - data = run232.bin_data(backend='multiprocessing', modules=mod_list) - self.assertIsNotNone(data.data) - - tbdet.save_xarray(cls._module_data, './tmp/run235.h5') - data = tbdet.load_xarray('./tmp/run232.h5') - self.assertIsNotNone(data.data) - + bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) + dark = bin_obj.process_data(modules=[15], chunksize=248) - def test_binning_xgm(self): - proposal_nb = 2212 - run_nb = 235 - - run = tb.load_run(proposal_nb, run_nb, include='*DA*') + # run to normalize + run_nb = 50 run_info = tbdet.load_dssc_info(proposal_nb, run_nb) + fpt = run_info['frames_per_train'] + n_trains = run_info['number_of_trains'] + trainIds = run_info['trainIds'] - bins_trainId = tb.get_array(run, - 'PP800_PhaseShifter', - 0.03) - bins_pulse = ['pumped', 'unpumped'] * 10 + buckets_train = np.zeros(n_trains) + buckets_pulse = ['image', 'dark'] * 99 + ['image_last'] binner1 = tbdet.create_dssc_bins("trainId", - run_info['trainIds'], - bins_trainId.values) + trainIds, + buckets_train) binner2 = tbdet.create_dssc_bins("pulse", - np.linspace(0,19,20, dtype=int), - bins_pulse) + np.linspace(0,fpt-1,fpt, dtype=int), + buckets_pulse) binners = {'trainId': binner1, 'pulse': binner2} + bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners) - params = {'binners':binners, - 'use_xgm':True, - 'xgm_threshold':(0, np.inf), - 'xgm_bins':bins_pulse - } + bin_params = {'modules':[15], + 'chunksize':248, + 'xgm_normalization':True, + 'normevery':2, + 'dark_image':dark['data'][:,0,0,:,:] + } - run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params) - data = run235.bin_data(backend='multiprocessing', modules=[3]) + data = bin_obj.process_data(**bin_params) self.assertIsNotNone(data.data) - xgm_binned = run235.get_xgm_binned() - self.assertIsNotNone(xgm_binned) def list_suites(): diff --git a/src/toolbox_scs/test/test_dssc_methods.py b/src/toolbox_scs/test/test_dssc_methods.py deleted file mode 100644 index d5c804a27799556f509cd07b3d901ffeb9c2392f..0000000000000000000000000000000000000000 --- a/src/toolbox_scs/test/test_dssc_methods.py +++ /dev/null @@ -1,370 +0,0 @@ -import unittest -import logging -import os -import argparse -import shutil -import joblib -from time import strftime - -import numpy as np -import xarray as xr - -import extra_data as ed -import toolbox_scs as tb -import toolbox_scs.detectors as tbdet -from toolbox_scs.detectors.dssc_processing import (load_chunk_data, - create_empty_dataset) -from toolbox_scs.util.exceptions import ToolBoxFileError - -logging.basicConfig(level=logging.DEBUG) -log_root = logging.getLogger(__name__) - - -suites = {"metafunctions": ( - "test_info", - "test_binners", - "test_calcindices", - "test_createpulsemask", - "test_createempty", - "test_loadmergechunk", - "test_storage", - ), - "binning-pulse-train": ( - "test_info", - "test_binners", - "test_calcindices", - "test_createpulsemask", - "test_processmodule", - ), - "binning-train": ( - "test_info", - "test_binners", - "test_calcindices", - "test_createpulsemask", - "test_processmodule2", - ) - } - - -_temp_dirs = ['tmp'] - - -def setup_tmp_dir(): - for d in _temp_dirs: - if not os.path.isdir(d): - os.mkdir(d) - - -def cleanup_tmp_dir(): - for d in _temp_dirs: - shutil.rmtree(d, ignore_errors=True) - log_root.info(f'remove {d}') - - -class TestDSSC(unittest.TestCase): - @classmethod - def setUpClass(cls): - log_root.info("Start global setup") - # --------------------------------------------------------------------- - # global test settings - # --------------------------------------------------------------------- - - setup_tmp_dir() - - # --------------------------------------------------------------------- - log_root.info("Finished global setup, start tests") - - - @classmethod - def tearDownClass(cls): - log_root.info("Clean up test environment....") - cleanup_tmp_dir() - - - def test_info(self): - proposal = 2212 - run_nr = 235 - info = tbdet.load_dssc_info(proposal, run_nr) - self.assertEqual(info['frames_per_train'], 20) - - - def test_binners(self): - cls = self.__class__ - proposal = 2212 - run_nr = 235 - cls._dssc_info_235 = tbdet.load_dssc_info(proposal, run_nr) - cls._run_235 = tb.load_run(proposal, run_nr, include='*DA*') - - # create 3 different binners manually - bins_trainId_name = 'PP800_PhaseShifter' - stepsize = .04 - bins_trainId = tb.get_array(cls._run_235, - 'PP800_PhaseShifter', - stepsize) - bins_pulse = ['pumped', 'unpumped'] * 10 - - - bin1 = tbdet.create_dssc_bins("trainId", - cls._dssc_info_235['trainIds'], - bins_trainId.values) - bin2 = tbdet.create_dssc_bins("pulse", - np.linspace(0,19,20, dtype=int), - bins_pulse) - bin3 = tbdet.create_dssc_bins("x", - np.linspace(1,128,128, dtype=int), - np.linspace(1,128,128, dtype=int)) - # create binner with invalid name - with self.assertRaises(ValueError) as err: - bin2 = tbdet.create_dssc_bins("blabla", - np.linspace(0,19,20, dtype=int), - bins_pulse) - - # format binner dictionary. The 4th binner is constructed automatically - binners = {"trainId": bin1, - "pulse": bin2, - "x": bin3} - self.assertIsNotNone(binners) - cls._binners_235 = {'trainId':bin1,'pulse':bin2} - - - def test_calcindices(self): - cls = self.__class__ - # first test including darks - bins_pulse = ['pumped', 'unpumped', 'pumped_dark', 'unpumped_dark'] * 5 - xgm_frame_coords = tbdet.calc_xgm_frame_indices(bins_pulse) - self.assertIsNotNone(xgm_frame_coords) - - # another test without darks - cls._xgm = tbdet.load_xgm(cls._run_235) - bins_pulse = ['pumped', 'unpumped'] * 10 - xgm_frame_coords = tbdet.calc_xgm_frame_indices(bins_pulse) - self.assertIsNotNone(xgm_frame_coords) - cls._xgm['pulse'] = xgm_frame_coords - - - def test_createpulsemask(self): - cls = self.__class__ - xgm_min = 0 - xgm_max = np.inf - - data = np.ones([len(cls._run_235.train_ids), - cls._dssc_info_235['frames_per_train']], dtype=bool) - dimensions = ['trainId', 'pulse'] - coordinates = {'trainId': cls._run_235.train_ids, - 'pulse': range(cls._dssc_info_235['frames_per_train'])} - pulsemask = xr.DataArray(data, dims=dimensions, coords=coordinates) - - valid = (cls._xgm > xgm_min) * (cls._xgm < xgm_max) - cls._pulsemask = valid.combine_first(pulsemask).astype(bool) - - self.assertIsNotNone(cls._pulsemask) - - - def test_createempty(self): - cls = self.__class__ - - # standard dset - empty_data = create_empty_dataset(cls._dssc_info_235, cls._binners_235) - self.assertIsNotNone(empty_data.dims['trainId']) - - # bin along pulse dimension only - bins_pulse = ['pumped', 'unpumped', 'pumped_dark', 'unpumped_dark'] * 5 - binner_pulse = tbdet.create_dssc_bins("pulse", - np.linspace(0,19,20, dtype=int), - bins_pulse) - empty_data = create_empty_dataset(cls._dssc_info_235, - {'pulse':binner_pulse}) - self.assertEqual(empty_data.pulse.values[1], 'pumped_dark') - - - def test_loadmergechunk(self): - cls = self.__class__ - # load single chunk - proposal = 2212 - run_nr = 235 - module = 1 - chunksize = 512 - sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - - collection = ed.open_run(proposal, run_nr, - include=f'*DSSC{module:02d}*') - - binners = cls._binners_235 - info = cls._dssc_info_235 - pulsemask = cls._pulsemask - - ntrains = len(collection.train_ids) - chunks = np.arange(ntrains, step=chunksize) - start_index = chunks[0] - - module_data = create_empty_dataset(info, binners) - - for chunk in chunks[0:2]: - sel = collection.select_trains( - ed.by_index[chunk:chunk + chunksize]) - log_root.debug(f"Module {module}: " - f"loading trains {chunk}:{chunk + chunksize}") - chunk_data = load_chunk_data(sel, sourcename) - self.assertIsNotNone(chunk_data) - - # pulse masking, and creation of related hist subset. - chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1) - if pulsemask is not None: - chunk_data = chunk_data.where(pulsemask) - chunk_hist = chunk_hist.where(pulsemask) - chunk_data = chunk_data.to_dataset(name='data') - chunk_data['hist'] = chunk_hist - - # apply predefined binning - log_root.debug(f'Module {module}: apply binning to chunk_data') - for b in binners: - chunk_data[b+"_binned"] = binners[b] - chunk_data = chunk_data.groupby(b+"_binned").sum(b) - chunk_data = chunk_data.rename(name_dict={b+"_binned":b}) - # ToDo: Avoid creation of unnecessary data when binning along x,y - - # merge dsets and format - log_root.debug(f'Module {module}: merge data into prepared dset') - for var in ['data', 'hist']: - module_data[var] = xr.concat([module_data[var], - chunk_data[var]], - dim='tmp').sum('tmp') - - #module_data = module_data.transpose('trainId', 'pulse', 'x', 'y') - cls._module_data = module_data - self.assertIsNotNone(module_data) - - def test_processmodule(self): - cls = self.__class__ - - chunksize = 512 - backend = 'multiprocessing' - mod_list = [1, 15] - n_jobs = len(mod_list) - log_root.info(f'processing {chunksize} trains per chunk') - log_root.info(f'using parallelization backend {backend}') - - module_jobs = [] - for m in mod_list: - module_jobs.append(dict( - proposal=2212, - run_nr=235, - module=m, - chunksize=chunksize, - info=cls._dssc_info_235, - dssc_binners=cls._binners_235, - pulsemask=cls._pulsemask - )) - - print('start processing modules:', strftime('%X')) - - data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \ - (joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \ - for i in range(len(mod_list))) - - print('finished processing modules:', strftime('%X')) - data = xr.concat(data, dim='module') - print(data) - self.assertIsNotNone(data) - - - def test_processmodule2(self): - cls = self.__class__ - - chunksize = 512 - backend = 'multiprocessing' - mod_list = [15, 3] - n_jobs = len(mod_list) - log_root.info(f'processing {chunksize} trains per chunk') - log_root.info(f'using parallelization backend {backend}') - - info = tbdet.load_dssc_info(2212, 89) - bin1 = tbdet.create_dssc_bins("trainId", - info['trainIds'], - np.ones(1691)) - binners = {'trainId': bin1} - - module_jobs = [] - for m in mod_list: - module_jobs.append(dict( - proposal=2212, - run_nr=89, - module=m, - chunksize=chunksize, - info=info, - dssc_binners=binners, - )) - - print('start processing modules:', strftime('%X')) - data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \ - (joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \ - for i in range(len(mod_list))) - print('finished processing modules:', strftime('%X')) - - data = xr.concat(data, dim='module') - data = data.squeeze() - print(data) - self.assertIsNotNone(data) - - - def test_storage(self): - cls = self.__class__ - - tbdet.save_xarray('./tmp/run235.h5', cls._module_data) - tbdet.save_xarray('./tmp/run235.h5', cls._module_data, - mode = 'w') - run235 = tbdet.load_xarray('./tmp/run235.h5') - - self.assertIsNotNone(run235) - #run235.close() - - with self.assertRaises(ToolBoxFileError) as cm: - tbdet.save_xarray('./tmp/run235.h5', - cls._module_data.isel(pulse=0), - mode = 'a') - -def list_suites(): - print("\nPossible test suites:\n" + "-" * 79) - for key in suites: - print(key) - print("-" * 79 + "\n") - - -def suite(*tests): - suite = unittest.TestSuite() - for test in tests: - suite.addTest(TestDSSC(test)) - return suite - - -def main(*cliargs): - try: - for test_suite in cliargs: - if test_suite in suites: - runner = unittest.TextTestRunner(verbosity=2) - runner.run(suite(*suites[test_suite])) - else: - log_root.warning( - "Unknown suite: '{}'".format(test_suite)) - pass - except Exception as err: - log_root.error("Unecpected error: {}".format(err), - exc_info=True) - pass - - -if __name__ == '__main__': - parser = argparse.ArgumentParser() - parser.add_argument('--list-suites', - action='store_true', - help='list possible test suites') - parser.add_argument('--run-suites', metavar='S', - nargs='+', action='store', - help='a list of valid test suites') - args = parser.parse_args() - - if args.list_suites: - list_suites() - - if args.run_suites: - main(*args.run_suites)