Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • SCS/ToolBox
  • kluyvert/ToolBox
2 results
Show changes
Commits on Source (19)
Showing with 307 additions and 750 deletions
1.0.3
1.1a3
......@@ -3,12 +3,12 @@ from .xgm import (
from .tim import (
load_TIM,)
from .dssc_data import (
save_xarray, load_xarray, get_data_formatted)
save_xarray, load_xarray, get_data_formatted, save_attributes_h5)
from .dssc_misc import (
load_dssc_info, create_dssc_bins, load_geom, quickmask_DSSC_ASIC,
calc_xgm_frame_indices, get_xgm_formatted, load_mask)
load_dssc_info, create_dssc_bins, quickmask_DSSC_ASIC,
get_xgm_formatted, load_mask)
from .dssc_processing import (
bin_data)
process_dssc_data)
from .dssc import (
DSSCBinner, DSSCFormatter, DSSCAnalyzer)
from .azimuthal_integrator import AzimuthalIntegrator
......@@ -23,11 +23,11 @@ __all__ = (
"create_dssc_bins",
"calc_xgm_frame_indices",
"get_xgm_formatted",
"bin_data",
"process_dssc_data",
"save_xarray",
"load_xarray",
"get_data_formatted",
"load_geom",
"save_attributes_h5",
"quickmask_DSSC_ASIC",
"load_mask",
# Classes
......
......@@ -10,26 +10,30 @@ class AzimuthalIntegrator(object):
Create a reusable integrator for repeated azimuthal integration of
similar images. Calculates array indices for a given parameter set that
allows fast recalculation.
Copyright (c) 2019, Michael Schneider
Copyright (c) 2020, SCS-team
license: BSD 3-Clause License (see LICENSE_BSD for more info)
Parameters
==========
imageshape : tuple of ints
The shape of the images to be integrated over.
center : tuple of ints
center coordinates in pixels
polar_range : tuple of ints
start and stop polar angle (in degrees) to restrict integration to
wedges
dr : int, default 2
radial width of the integration slices. Takes non-square DSSC
pixels into account.
aspect: float, default 204/236 for DSSC
aspect ratio of the pixel pitch
Returns
=======
ai : azimuthal_integrator instance
......@@ -75,7 +79,7 @@ class AzimuthalIntegrator(object):
(dist_array < dist)
self.flat_indices.append(self.index_array[ring_mask])
self.distance = np.append(self.distance, dist)
def __call__(self, image):
assert self.shape == image.shape, 'image shape does not match'
image_flat = image.flatten()
......
......@@ -25,9 +25,9 @@ from .dssc_data import (
save_xarray, load_xarray, save_attributes_h5,
search_files, get_data_formatted)
from .dssc_misc import (
load_dssc_info, get_xgm_formatted, load_geom)
load_dssc_info, get_xgm_formatted)
from .dssc_processing import (
bin_data, create_empty_dataset)
process_dssc_data, create_empty_dataset)
__all__ = ["DSSCBinner", "DSSCFormatter", "DSSCAnalyzer"]
log = logging.getLogger(__name__)
......@@ -36,8 +36,7 @@ log = logging.getLogger(__name__)
class DSSCBinner:
def __init__(self, proposal_nr, run_nr,
binners={},
use_xgm=False, xgm_threshold=(0, np.inf), xgm_bins=None,
use_tim=False
xgm_name='SCS_SA3',
):
"""
A dssc binner object. Loads and bins the dssc data according to the
......@@ -53,13 +52,10 @@ class DSSCBinner:
binners: dictionary
dictionary containing binners constructed using the
'create_dssc_bins' tbdet-method.
use_xgm: bool
in case run is not a dark the xgm data can be used to create a
pulsemask for additional data filtering.
xgm_threshold: tuple
the lower and upper boundary of xgm values rendering valid data.
use_tim: bool
-> to be implemented. Same role as 'use_xgm'
xgm_name: str
a valid mnemonic key of the XGM data to be used to mask the dssc
frames. Since the xgm is used in several methods its name can be
set here globally.
Returns
-------
......@@ -80,30 +76,15 @@ class DSSCBinner:
self.runnr = run_nr
self.info = load_dssc_info(proposal_nr, run_nr)
self.run = load_run(proposal_nr, run_nr)
self.xgmthreshold = xgm_threshold
self.binners = binners
self.binners = {}
for b in binners:
self.add_binner(b, binners[b])
self.xgm_name = xgm_name
# ---------------------------------------------------------------------
# Prepare pulse mask for additional data reduction next to binning
# ---------------------------------------------------------------------
self.xgm = None
fpt = self.info['frames_per_train']
data = np.ones([len(self.run.train_ids), fpt], dtype=bool)
self.pulsemask = xr.DataArray(data,
dims=['trainId', 'pulse'],
coords={'trainId': self.run.train_ids,
'pulse': range(fpt)})
if use_xgm:
self.xgm = get_xgm_formatted(self.run, xgm_bins)
valid = (self.xgm > self.xgmthreshold[0]) * \
(self.xgm < self.xgmthreshold[1])
self.pulsemask = \
valid.combine_first(self.pulsemask).astype(bool)
nrejected = int(valid.size - valid.sum())
percent_rejected = 100 * nrejected / valid.size
log.info(f'rejecting {nrejected} out of {valid.size} pulses'
f'({percent_rejected:.1f}%) due to xgm threshold')
self.pulsemask = None
log.debug("Constructed DSSC object")
......@@ -135,6 +116,35 @@ class DSSCBinner:
log.info(msg+", no binner created")
raise ToolBoxValueError(msg, name)
def load_xgm(self, xgm_coord_stride=1):
"""
load xgm data and construct coordinate array according to corresponding
dssc frame number.
"""
self.xgm = get_xgm_formatted(self.run, self.xgm_name, xgm_coord_stride)
def create_xgm_mask(self,
xgm_threshold=(0, np.inf), normevery=1):
"""
creates a mask for dssc frames according to measured xgm intensity.
"""
fpt = self.info['frames_per_train']
n_trains = self.info['number_of_trains']
trainIds = self.info['trainIds']
data = np.ones([n_trains, fpt], dtype=bool)
self.pulsemask = xr.DataArray(data,
dims=['trainId', 'pulse'],
coords={'trainId': trainIds,
'pulse': range(fpt)})
self.load_xgm(xgm_coord_stride=normevery)
valid = (self.xgm > xgm_threshold[0]) * \
(self.xgm < xgm_threshold[1])
self.pulsemask = \
(valid.combine_first(self.pulsemask).astype(bool))[:,0:fpt]
log.info(f'created pulse mask used during processing')
def get_info(self):
"""
Returns the expected shape of the binned dataset, in case binners have
......@@ -151,8 +161,6 @@ class DSSCBinner:
"""
Bin the xgm data according to the binners of the dssc data. The result
can eventually be merged into the final dataset by the DSSCFormatter.
The xgm data is used internally to create a mask to filter certain dssc
images.
Returns
-------
......@@ -166,15 +174,19 @@ class DSSCBinner:
xgm_data[b+"_binned"] = self.binners[b]
xgm_data = xgm_data.groupby(b+"_binned").mean(b)
xgm_data = xgm_data.rename(name_dict={b+"_binned": b})
return xgm_data
return xgm_data.transpose('trainId', 'pulse')
else:
log.warning("no xgm data has been loaded.")
log.warning("no xgm data. Use load_xgm() to load the xgm data.")
pass
# -------------------------------------------------------------------------
# Data processing
# -------------------------------------------------------------------------
def bin_data(self, modules=[], chunksize=512, backend='loky', n_jobs=None):
def process_data(self, modules=[], filepath='./',
chunksize=512, backend='loky', n_jobs=None,
dark_image=None,
xgm_normalization=False, normevery=1
):
"""
Load and bin dssc data according to self.bins
......@@ -187,14 +199,20 @@ class DSSCBinner:
experimental and not appropriately implemented in the dbdet member
function 'bin_data'.
n_jobs: int
number of cpu's used per sub-process. Note that when using the
default backend there is no need to adjust this parameter with the
current implementation.
inversely proportional of the number of cpu's available for one
job. Tasks within one job can grab a maximum of n_CPU_tot/n_jobs of
cpu's.
Note that when using the default backend there is no need to adjust
this parameter with the current implementation.
modules: list of ints
a list containing the module numbers that should be processed. If
empty, all modules are processed.
chunksize: int
The number of trains that should be read in one iterative step.
dark_image: xarray.DataArray
DataArray with dimensions compatible with the loaded dssc data.
normevery: int
integer indicating which out of normevery frame will be normalized.
Returns
-------
......@@ -214,26 +232,32 @@ class DSSCBinner:
module_jobs = []
for m in mod_list:
dark = dark_image
if dark_image is not None:
dark = dark_image.sel(module=m)
module_jobs.append(dict(
proposal=self.proposal,
run_nr=self.runnr,
module=m,
chunksize=chunksize,
path=filepath,
info=self.info,
dssc_binners=self.binners,
pulsemask=self.pulsemask,
dark_image=dark,
xgm_normalization=xgm_normalization,
xgm_mnemonic=self.xgm_name,
normevery=normevery,
))
data = None
log.info(f'using parallelization backend {backend}')
data = joblib.Parallel(n_jobs=njobs, backend=backend) \
(joblib.delayed(bin_data)(**module_jobs[i]) \
joblib.Parallel(n_jobs=njobs, backend=backend) \
(joblib.delayed(process_dssc_data)(**module_jobs[i]) \
for i in range(len(mod_list)))
data = xr.concat(data, dim='module')
data = data.assign_coords(module=("module", np.array(mod_list)))
log.info(f'Binning done')
return data
class DSSCFormatter:
......
......@@ -157,6 +157,11 @@ def get_data_formatted(filenames=[], data_list=[]):
elif any(data_list) is True:
data = data_list
mod_list = []
for d in data:
if 'module' in d.attrs.keys():
mod_list.append(d.attrs['module'])
if type(data[0]).__module__ == 'xarray.core.dataset':
data = xr.concat(data, dim='module')
elif type(data[0]).__module__ == 'pandas.core.frame':
......@@ -164,7 +169,10 @@ def get_data_formatted(filenames=[], data_list=[]):
elif type(data[0]).__module__ == 'dask.dataframe.core':
pass
if mod_list is not None:
data = data.assign_coords(module=("module", mod_list))
data = data.sortby("module")
data.attrs.clear()
return data.transpose('trainId', 'pulse', 'module', 'x', 'y')
......
......@@ -11,8 +11,8 @@ from imageio import imread
import extra_data as ed
from extra_geom import DSSC_1MGeometry
from .xgm import load_xgm
#from .dssc_processing import split_frames as _split_frames
from ..constants import mnemonics as _mnemonics
log = logging.getLogger(__name__)
......@@ -35,11 +35,11 @@ def load_dssc_info(proposal, run_nr):
info : dictionary
{'dims': tuple, 'frames_per_train': int, 'total_frames': int}
"""
module = ed.open_run(proposal, run_nr, include='*DSSC00*')
info = module.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')
module = ed.open_run(proposal, run_nr, include='*DSSC01*')
info = module.detector_info('SCS_DET_DSSC1M-1/DET/1CH0:xtdf')
info["number_of_trains"] = len(module.train_ids)
info["trainIds"] = module.train_ids
log.debug("Fetched information for DSSC module nr. 0.")
log.debug("Fetched information for DSSC module nr. 1.")
return info
......@@ -95,89 +95,37 @@ def create_dssc_bins(name, coordinates, bins):
'trainId, x, or y')
def calc_xgm_frame_indices(pulse_bins):
"""
Returns a coordinate array for XGM data. The coordinates correspond to
DSSC frame numbers which are not darks.
Parameters
----------
pulse_bins: list
bins along which the pulse dimension will be binned
Returns
-------
frame_indices: numpy.ndarray
coordinate array corresponding to DSSC frame numbers which are not
darks.
"""
frame_indices = []
for i, p in enumerate(pulse_bins):
if 'dark' not in p:
frame_indices.append(i)
log.debug("Constructed coordinate array for XGM data.")
return frame_indices
def get_xgm_formatted(run, pulse_bins):
def get_xgm_formatted(run, xgm_name, xgm_coord_stride=1):
"""
Load the xgm data and define coordinates along the pulse dimension.
Parameters
----------
run: extra_data.DataCollection
DataCollection object providing access to the xgm data to be loaded
pulse_bins: list, numpy.ndarray
bins along the pulse dimension not containing darks
xgm_name: str
valid mnemonic of a xgm source
xgm_coord_stride: int
defines which dssc frames should be normalized using data from the xgm.
Returns
-------
xgm: xarray.DataArray
xgm data with coordinate 'pulse'.
"""
xgm = load_xgm(run)
if pulse_bins is None:
xgm_frame_coords = np.linspace(0, xgm.shape[1]-1, xgm.shape[1],
dtype=np.uint64)
else:
xgm_frame_coords = calc_xgm_frame_indices(pulse_bins)
log.debug('load raw xgm data')
xgm = run.get_array(*_mnemonics[xgm_name].values())
log.debug('format xgm coordinates')
xgm_frame_coords = np.linspace(0,
(xgm.shape[1]-1)*xgm_coord_stride,
xgm.shape[1],
dtype=np.uint64)
xgm = xgm.rename(new_name_or_name_dict={'XGMbunchId':'pulse'})
xgm['pulse'] = xgm_frame_coords
log.info('loaded formatted xgm data.')
return xgm
def load_geom(geopath=None, quad_pos=None):
"""
Loads and return the DSSC geometry.
Parameters
----------
geopath: str
path to the h5 geometry file. If None uses a default file.
quad_pos: list of quadrants tuple position. If None uses a default
position.
Returns
-------
geom: extra_geom.DSSC_1MGeometry
loaded geometry object
"""
if quad_pos is None:
quad_pos = [(-124.100, 3.112), # TR
(-133.068, -110.604), # BR
(0.988, -125.236), # BL
(4.528, -4.912) # TL
]
if geopath is None:
geopath = '/gpfs/exfel/sw/software/git/EXtra-geom/' \
'docs/dssc_geo_june19.h5'
geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(geopath, quad_pos)
return geom
def quickmask_DSSC_ASIC(geom, poslist):
'''
Returns a mask for the given DSSC geometry with ASICs given in poslist
......
......@@ -4,6 +4,7 @@
comment: contributions should comply with pep8 code structure guidelines.
"""
import logging
import os
from tqdm import tqdm
import numpy as np
......@@ -12,6 +13,9 @@ import pandas as pd
import extra_data as ed
from .dssc_misc import get_xgm_formatted
from ..constants import mnemonics as _mnemonics
from .dssc_data import save_xarray
log = logging.getLogger(__name__)
......@@ -25,7 +29,7 @@ def create_empty_dataset(run_info, binners = {}):
Parameters
----------
Returns
-------
......@@ -100,12 +104,46 @@ def load_chunk_data(sel, sourcename, maxframes=None):
).unstack('trainId_pulse')
data = data.transpose('trainId', 'pulse', 'x', 'y')
log.debug(f"loaded chunk data {sel.train_ids[0]}:{sel.train_ids[1]}")
log.debug(f"loaded and formatted chunk of dssc data")
return data.loc[{'pulse': np.s_[:maxframes]}]
def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners,
pulsemask=None):
def _load_chunk_xgm(sel, xgm_mnemonic, stride):
"""
Load selected xgm data.
Copyright (c) 2020, SCS-team
Parameters
----------
sel: extra_data.DataCollection
a DataCollection or a subset of a DataCollection obtained by its
select_trains() method
xgm_mnemonic: str
a valid mnemonic for an xgm source from the tb.mnemonic dictionary
stride: int
indicating which dssc frames should be normalized using the xgm data.
Returns
-------
xarray.DataArray
"""
d = sel.get_array(*_mnemonics[xgm_mnemonic].values())
d = d.rename(new_name_or_name_dict={'XGMbunchId':'pulse'})
frame_coords = np.linspace(0,(d.shape[1]-1)*stride, d.shape[1], dtype=int)
d = d.assign_coords({'pulse':frame_coords})
log.debug(f"loaded and formatted chunk of xgm data")
return d
def process_dssc_data(proposal, run_nr, module, chunksize, info, dssc_binners,
path='./',
pulsemask=None,
dark_image=None,
xgm_normalization=False,
xgm_mnemonic='SCS_SA3',
normevery=1
):
"""
Collects and reduces DSSC data for a single module.
......@@ -124,70 +162,115 @@ def bin_data(proposal, run_nr, module, chunksize, info, dssc_binners,
info: dictionary
dictionary containing keys 'dims', 'frames_per_train', 'total_frames',
'trainIds'
dssc_binners: dictionary
a dictionary containing binner objects created by the tbdet member
function "create_binner()"
pulsemask : numpy.ndarray
array of booleans to be used to mask dssc data according to xgm data.
dark_image: xarray.DataArray
an xarray dataarray with matching coordinates with the loaded data. If
dark_image is not None it will be substracted from each individual dssc
frame.
xgm_normalization: bool
true if the data should be divided by the corresponding xgm value.
xgm_mnemonic: str
Mnemonic of the xgm data to be used for normalization.
normevery: int
One out of normevery dssc frames will be normalized.
Returns
-------
module_data: xarray.Dataset
xarray datastructure containing data binned according to bins.
"""
sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
collection = ed.open_run(proposal, run_nr,
include=f'*DSSC{module:02d}*')
ntrains = len(collection.train_ids)
# metadata definition
ntrains = info['number_of_trains']
fpt = info['frames_per_train']
chunks = np.arange(ntrains, step=chunksize)
log.info(f"Processing dssc module {module}: start")
sourcename_dssc = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
# progress bar
if module == 15:
pbar = tqdm(total=len(chunks))
# open extra_data run objects
collection_DSSC = ed.open_run(proposal, run_nr,
include=f'*DSSC{module:02d}*')
collection_DA1 = ed.open_run(proposal, run_nr, include='*DA01*')
log.info(f"Processing dssc module {module}: start")
# create empty dataset to be filled iteratively
# create empty dataset for dssc data to be filled iteratively
module_data = create_empty_dataset(info, dssc_binners)
# load data chunk by chunk and merge
# load data chunk by chunk and merge result into prepared empty dataset
for chunk in chunks:
sel = collection.select_trains(
ed.by_index[chunk:chunk + chunksize])
nframes = sel.detector_info(sourcename)['total_frames']
if nframes > 0:
log.debug(f"Module {module}: "
f"load trains {chunk}:{chunk + chunksize}")
chunk_data = load_chunk_data(sel, sourcename)
chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
# prefiltering -> xgm pulse masking, and related creation of
# histogram subset
if pulsemask is not None:
chunk_data = chunk_data.where(pulsemask)
chunk_hist = chunk_hist.where(pulsemask)
chunk_data = chunk_data.to_dataset(name='data')
chunk_data['hist'] = chunk_hist
# data reduction -> apply binners
log.debug(f'Module {module}: apply binning to chunk_data')
for b in dssc_binners:
chunk_data[b+"_binned"] = dssc_binners[b]
chunk_data = chunk_data.groupby(b+"_binned").sum(b)
chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
# add chunk data to loaded data
for var in ['data', 'hist']:
module_data[var] = xr.concat([module_data[var],
chunk_data[var]],
dim='tmp').sum('tmp')
log.debug(f"Module {module}: "
f"processed trains {chunk}:{chunk + chunksize}")
if module == 15:
pbar.update(1)
log.debug(f"Module {module}: "
f"load trains {chunk}:{chunk + chunksize}")
sel = collection_DSSC.select_trains(
ed.by_index[chunk:chunk + chunksize])
chunk_data = load_chunk_data(sel, sourcename_dssc)
chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
# ---------------------------------------------------------------------
# optional blocks -> ToDo: see merge request !89
# ---------------------------------------------------------------------
# option 1: prefiltering -> xgm pulse masking
if pulsemask is not None:
log.debug(f'Module {module}: drop out-of-bounds frames')
# relatively slow. ToDo -> wrap using np.ufunc
chunk_data = chunk_data.where(pulsemask)
chunk_hist = chunk_hist.where(pulsemask)
# option 2: substraction of dark image/s
if dark_image is not None:
log.debug(f'Module {module}: substract dark')
chunk_data.values = chunk_data.values - dark_image.values
# slower: using xarray directly
#chunk_data = chunk_data - dark_image
# option 3: normalize dssc frames by their xgm values
if xgm_normalization:
log.debug(f'Module {module}: load and format xgm data')
sel_DA1 = collection_DA1.select_trains(
ed.by_index[chunk:chunk + chunksize])
chunk_xgm = _load_chunk_xgm(sel_DA1, xgm_mnemonic, normevery)
log.debug(f'Module {module}: normalize chunk_data using xgm')
# the following line uses numpys fast vectorized array operation
# there is overhead coming from rewriting the xarrayDataarray
chunk_data.values[:,0::normevery,:,:] = \
np.divide(chunk_data[:,0::normevery,:,:], chunk_xgm)
# slow code using xarray directly
#chunk_data = chunk_data / chunk_xgm
# ---------------------------------------------------------------------
# end optional blocks: xarray operations from here on.
# ---------------------------------------------------------------------
# data reduction -> apply binners
log.debug(f'Module {module}: apply binning to chunk_data')
chunk_data = chunk_data.to_dataset(name='data')
chunk_data['hist'] = chunk_hist
for b in dssc_binners:
chunk_data[b+"_binned"] = dssc_binners[b]
chunk_data = chunk_data.groupby(b+"_binned").sum(b)
chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
# add chunk data to loaded data
log.debug(f'Module {module}: merge junk data')
for var in ['data', 'hist']:
module_data[var] = xr.concat([module_data[var],
chunk_data[var]],
dim='tmp').sum('tmp')
log.debug(f"Module {module}: "
f"processed trains {chunk}:{chunk + chunksize}")
log.debug(f'Module {module}: calculate mean')
module_data['data'] = module_data['data'] / module_data['hist']
module_data = module_data.transpose('trainId', 'pulse', 'x', 'y')
module_data.attrs['module'] = module
log.info(f'saving module {module}')
if not os.path.isdir(path):
os.mkdir(path)
fname = f'run_{run_nr}_module{module}.h5'
save_xarray(path+fname, module_data, mode='a')
log.info(f"processing module {module}: done")
return module_data
......@@ -14,11 +14,13 @@ import matplotlib.pyplot as plt
from scipy.signal import find_peaks
import extra_data as ed
from ..constants import mnemonics as _mnemonics
log = logging.getLogger(__name__)
def load_xgm(run):
def load_xgm(run, xgm_mnemonic='SCS_SA3'):
"""
Loads XGM data from karabo_data.DataCollection
......@@ -38,25 +40,18 @@ def load_xgm(run):
>>> run = tb.run_by_proposal(2212, 235)
>>> xgm_data = tbdet.load_xgm(run)
"""
''''''
nbunches = run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER',
'sase3.nPulses.value')
nbunches = run.get_array(*_mnemonics['sase3'].values())
nbunches = np.unique(nbunches)
if len(nbunches) == 1:
nbunches = nbunches[0]
else:
log.warning(f'not all trains have same length DSSC data '
f'nbunches: {nbunches}')
log.info('change of pulse pattern in sase3 during the run.')
nbunches = max(nbunches)
log.info(f'SASE3 bunches per train: {nbunches}')
xgm = run.get_array('SCS_BLU_XGM/XGM/DOOCS:output',
'data.intensitySa3TD',
roi=ed.by_index[:nbunches],
extra_dims=['pulse'])
return xgm
log.info(f'maximum number of bunches per train in sase3: {nbunches}')
xgm = run.get_array(*_mnemonics[xgm_mnemonic].values(),
roi=ed.by_index[:nbunches])
return xgm
def cleanXGMdata(data, npulses=None, sase3First=True):
......
import unittest
import logging
import os
import sys
import argparse
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
from toolbox_scs.util.exceptions import *
logging.basicConfig(level=logging.DEBUG)
log_root = logging.getLogger(__name__)
suites = {"packaging": (
"test_init",),
"xgm": (
"test_loadxgm",
"test_cleanxgm",
"test_matchxgmtim",),
"tim": (
"test_loadtim",)
}
def list_suites():
print("""\nPossible test suites:\n-------------------------""")
for key in suites:
print(key)
print("-------------------------\n")
class TestDetectors(unittest.TestCase):
@classmethod
def setUpClass(cls):
log_root.info("Start global setup.")
cls._run = tb.load_run(2212, 235)
fields = ["sase1", "sase3", "npulses_sase3",
"npulses_sase1", "MCP2apd", "SCS_SA3", "nrj"]
cls._tb_data = tb.load(fields, 235, 2212)
log_root.info("Finished global setup, start tests.")
@classmethod
def tearDownClass(cls):
pass
def setUp(self):
pass
def tearDown(self):
pass
def test_init(self):
self.assertEqual(tbdet.__name__, "toolbox_scs.detectors")
def test_loadxgm(self):
cls = self.__class__
xgm_data = tbdet.load_xgm(cls._run)
self.assertTrue(xgm_data.values[0][-1])
def test_cleanxgm(self):
cls = self.__class__
data = tbdet.cleanXGMdata(cls._tb_data)
self.assertEqual(data['sa3_pId'].values[-1], 19)
def test_matchxgmtim(self):
cls = self.__class__
data = tbdet.matchXgmTimPulseId(cls._tb_data)
self.assertEqual(data['npulses_sase3'].values[0], 20)
def test_loadtim(self):
cls = self.__class__
data = tbdet.load_TIM(cls._run)
self.assertEqual(data.name, 'MCP2apd')
def suite(*tests):
suite = unittest.TestSuite()
for test in tests:
suite.addTest(TestDetectors(test))
return suite
def main(*cliargs):
try:
for test_suite in cliargs:
if test_suite in suites:
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite(*suites[test_suite]))
else:
log_root.warning(
"Unknown suite: '{}'".format(test_suite))
pass
except Exception as err:
log_root.error("Unecpected error: {}".format(err),
exc_info=True)
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--list-suites',
action='store_true',
help='list possible test suites')
parser.add_argument('--run-suites', metavar='S',
nargs='+', action='store',
help='a list of valid test suites')
args = parser.parse_args()
if args.list_suites:
list_suites()
if args.run_suites:
main(*args.run_suites)
......@@ -7,6 +7,7 @@ from time import strftime
import numpy as np
import xarray as xr
import extra_data as ed
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
......@@ -18,11 +19,8 @@ log_root = logging.getLogger(__name__)
suites = {"no-processing": (
"test_create",
),
"processing-quick": (
"test_binning_quick",
),
"processing-full": (
"test_binning_xgm",
"processing": (
"test_normalization_all2",
)
}
......@@ -75,20 +73,18 @@ class TestDSSC(unittest.TestCase):
np.linspace(0,19,20, dtype=int),
bins_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
params = {'binners': binners,
'use_xgm': True,
'xgm_threshold' : (0, np.inf),
'xgm_bins': bins_pulse}
params = {'binners': binners}
# normal
run235 = tbdet.DSSCBinner(proposal_nb, run_nb)
del(run235)
run235 = tbdet.DSSCBinner(2212, 235, use_xgm=True)
run235 = tbdet.DSSCBinner(2212, 235,
use_xgm=True,
xgm_bins=bins_pulse)
run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params)
print(run235.xgm)
run235 = tbdet.DSSCBinner(2212, 235)
run235.add_binner('trainId', binner1)
run235.add_binner('pulse', binner2)
xgm_threshold=(300.0, 8000.0)
run235.create_xgm_mask(xgm_threshold, 1)
self.assertEqual(run235.binners['trainId'].values[0],
np.float32(7585.52))
......@@ -100,68 +96,58 @@ class TestDSSC(unittest.TestCase):
self.assertEqual(str(cm.exception), err_msg)
def test_binning_quick(self):
def test_normalization_all2(self):
proposal_nb = 2530
# dark
proposal_nb = 2212
run_nb = 232
run = tb.load_run(proposal_nb, run_nb, include='*DA*')
run_nb = 49
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
bins_trainId = tb.get_array(run,
'PP800_PhaseShifter',
0.03)
bins_pulse = ['pumped', 'unpumped'] * 10
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
buckets_train = np.zeros(n_trains)
buckets_pulse = ['image', 'dark'] * 99 + ['image_last']
binner1 = tbdet.create_dssc_bins("trainId",
run_info['trainIds'],
bins_trainId.values)
trainIds,
buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
run232 = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
mod_list = [2,15]
data = run232.bin_data(modules=mod_list)
self.assertIsNotNone(data.data)
data = run232.bin_data(backend='multiprocessing', modules=mod_list)
self.assertIsNotNone(data.data)
tbdet.save_xarray(cls._module_data, './tmp/run235.h5')
data = tbdet.load_xarray('./tmp/run232.h5')
self.assertIsNotNone(data.data)
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
dark = bin_obj.process_data(modules=[15], chunksize=248)
def test_binning_xgm(self):
proposal_nb = 2212
run_nb = 235
run = tb.load_run(proposal_nb, run_nb, include='*DA*')
# run to normalize
run_nb = 50
run_info = tbdet.load_dssc_info(proposal_nb, run_nb)
fpt = run_info['frames_per_train']
n_trains = run_info['number_of_trains']
trainIds = run_info['trainIds']
bins_trainId = tb.get_array(run,
'PP800_PhaseShifter',
0.03)
bins_pulse = ['pumped', 'unpumped'] * 10
buckets_train = np.zeros(n_trains)
buckets_pulse = ['image', 'dark'] * 99 + ['image_last']
binner1 = tbdet.create_dssc_bins("trainId",
run_info['trainIds'],
bins_trainId.values)
trainIds,
buckets_train)
binner2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
np.linspace(0,fpt-1,fpt, dtype=int),
buckets_pulse)
binners = {'trainId': binner1, 'pulse': binner2}
bin_obj = tbdet.DSSCBinner(proposal_nb, run_nb, binners=binners)
params = {'binners':binners,
'use_xgm':True,
'xgm_threshold':(0, np.inf),
'xgm_bins':bins_pulse
}
bin_params = {'modules':[15],
'chunksize':248,
'xgm_normalization':True,
'normevery':2,
'dark_image':dark['data'][:,0,0,:,:]
}
run235 = tbdet.DSSCBinner(proposal_nb, run_nb, **params)
data = run235.bin_data(backend='multiprocessing', modules=[3])
data = bin_obj.process_data(**bin_params)
self.assertIsNotNone(data.data)
xgm_binned = run235.get_xgm_binned()
self.assertIsNotNone(xgm_binned)
def list_suites():
......
import unittest
import logging
import os
import argparse
import shutil
import joblib
from time import strftime
import numpy as np
import xarray as xr
import extra_data as ed
import toolbox_scs as tb
import toolbox_scs.detectors as tbdet
from toolbox_scs.detectors.dssc_processing import (load_chunk_data,
create_empty_dataset)
from toolbox_scs.util.exceptions import ToolBoxFileError
logging.basicConfig(level=logging.DEBUG)
log_root = logging.getLogger(__name__)
suites = {"metafunctions": (
"test_info",
"test_binners",
"test_calcindices",
"test_createpulsemask",
"test_createempty",
"test_loadmergechunk",
"test_storage",
),
"binning-pulse-train": (
"test_info",
"test_binners",
"test_calcindices",
"test_createpulsemask",
"test_processmodule",
),
"binning-train": (
"test_info",
"test_binners",
"test_calcindices",
"test_createpulsemask",
"test_processmodule2",
)
}
_temp_dirs = ['tmp']
def setup_tmp_dir():
for d in _temp_dirs:
if not os.path.isdir(d):
os.mkdir(d)
def cleanup_tmp_dir():
for d in _temp_dirs:
shutil.rmtree(d, ignore_errors=True)
log_root.info(f'remove {d}')
class TestDSSC(unittest.TestCase):
@classmethod
def setUpClass(cls):
log_root.info("Start global setup")
# ---------------------------------------------------------------------
# global test settings
# ---------------------------------------------------------------------
setup_tmp_dir()
# ---------------------------------------------------------------------
log_root.info("Finished global setup, start tests")
@classmethod
def tearDownClass(cls):
log_root.info("Clean up test environment....")
cleanup_tmp_dir()
def test_info(self):
proposal = 2212
run_nr = 235
info = tbdet.load_dssc_info(proposal, run_nr)
self.assertEqual(info['frames_per_train'], 20)
def test_binners(self):
cls = self.__class__
proposal = 2212
run_nr = 235
cls._dssc_info_235 = tbdet.load_dssc_info(proposal, run_nr)
cls._run_235 = tb.load_run(proposal, run_nr, include='*DA*')
# create 3 different binners manually
bins_trainId_name = 'PP800_PhaseShifter'
stepsize = .04
bins_trainId = tb.get_array(cls._run_235,
'PP800_PhaseShifter',
stepsize)
bins_pulse = ['pumped', 'unpumped'] * 10
bin1 = tbdet.create_dssc_bins("trainId",
cls._dssc_info_235['trainIds'],
bins_trainId.values)
bin2 = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
bin3 = tbdet.create_dssc_bins("x",
np.linspace(1,128,128, dtype=int),
np.linspace(1,128,128, dtype=int))
# create binner with invalid name
with self.assertRaises(ValueError) as err:
bin2 = tbdet.create_dssc_bins("blabla",
np.linspace(0,19,20, dtype=int),
bins_pulse)
# format binner dictionary. The 4th binner is constructed automatically
binners = {"trainId": bin1,
"pulse": bin2,
"x": bin3}
self.assertIsNotNone(binners)
cls._binners_235 = {'trainId':bin1,'pulse':bin2}
def test_calcindices(self):
cls = self.__class__
# first test including darks
bins_pulse = ['pumped', 'unpumped', 'pumped_dark', 'unpumped_dark'] * 5
xgm_frame_coords = tbdet.calc_xgm_frame_indices(bins_pulse)
self.assertIsNotNone(xgm_frame_coords)
# another test without darks
cls._xgm = tbdet.load_xgm(cls._run_235)
bins_pulse = ['pumped', 'unpumped'] * 10
xgm_frame_coords = tbdet.calc_xgm_frame_indices(bins_pulse)
self.assertIsNotNone(xgm_frame_coords)
cls._xgm['pulse'] = xgm_frame_coords
def test_createpulsemask(self):
cls = self.__class__
xgm_min = 0
xgm_max = np.inf
data = np.ones([len(cls._run_235.train_ids),
cls._dssc_info_235['frames_per_train']], dtype=bool)
dimensions = ['trainId', 'pulse']
coordinates = {'trainId': cls._run_235.train_ids,
'pulse': range(cls._dssc_info_235['frames_per_train'])}
pulsemask = xr.DataArray(data, dims=dimensions, coords=coordinates)
valid = (cls._xgm > xgm_min) * (cls._xgm < xgm_max)
cls._pulsemask = valid.combine_first(pulsemask).astype(bool)
self.assertIsNotNone(cls._pulsemask)
def test_createempty(self):
cls = self.__class__
# standard dset
empty_data = create_empty_dataset(cls._dssc_info_235, cls._binners_235)
self.assertIsNotNone(empty_data.dims['trainId'])
# bin along pulse dimension only
bins_pulse = ['pumped', 'unpumped', 'pumped_dark', 'unpumped_dark'] * 5
binner_pulse = tbdet.create_dssc_bins("pulse",
np.linspace(0,19,20, dtype=int),
bins_pulse)
empty_data = create_empty_dataset(cls._dssc_info_235,
{'pulse':binner_pulse})
self.assertEqual(empty_data.pulse.values[1], 'pumped_dark')
def test_loadmergechunk(self):
cls = self.__class__
# load single chunk
proposal = 2212
run_nr = 235
module = 1
chunksize = 512
sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf'
collection = ed.open_run(proposal, run_nr,
include=f'*DSSC{module:02d}*')
binners = cls._binners_235
info = cls._dssc_info_235
pulsemask = cls._pulsemask
ntrains = len(collection.train_ids)
chunks = np.arange(ntrains, step=chunksize)
start_index = chunks[0]
module_data = create_empty_dataset(info, binners)
for chunk in chunks[0:2]:
sel = collection.select_trains(
ed.by_index[chunk:chunk + chunksize])
log_root.debug(f"Module {module}: "
f"loading trains {chunk}:{chunk + chunksize}")
chunk_data = load_chunk_data(sel, sourcename)
self.assertIsNotNone(chunk_data)
# pulse masking, and creation of related hist subset.
chunk_hist = xr.full_like(chunk_data[:,:,0,0], fill_value=1)
if pulsemask is not None:
chunk_data = chunk_data.where(pulsemask)
chunk_hist = chunk_hist.where(pulsemask)
chunk_data = chunk_data.to_dataset(name='data')
chunk_data['hist'] = chunk_hist
# apply predefined binning
log_root.debug(f'Module {module}: apply binning to chunk_data')
for b in binners:
chunk_data[b+"_binned"] = binners[b]
chunk_data = chunk_data.groupby(b+"_binned").sum(b)
chunk_data = chunk_data.rename(name_dict={b+"_binned":b})
# ToDo: Avoid creation of unnecessary data when binning along x,y
# merge dsets and format
log_root.debug(f'Module {module}: merge data into prepared dset')
for var in ['data', 'hist']:
module_data[var] = xr.concat([module_data[var],
chunk_data[var]],
dim='tmp').sum('tmp')
#module_data = module_data.transpose('trainId', 'pulse', 'x', 'y')
cls._module_data = module_data
self.assertIsNotNone(module_data)
def test_processmodule(self):
cls = self.__class__
chunksize = 512
backend = 'multiprocessing'
mod_list = [1, 15]
n_jobs = len(mod_list)
log_root.info(f'processing {chunksize} trains per chunk')
log_root.info(f'using parallelization backend {backend}')
module_jobs = []
for m in mod_list:
module_jobs.append(dict(
proposal=2212,
run_nr=235,
module=m,
chunksize=chunksize,
info=cls._dssc_info_235,
dssc_binners=cls._binners_235,
pulsemask=cls._pulsemask
))
print('start processing modules:', strftime('%X'))
data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \
(joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \
for i in range(len(mod_list)))
print('finished processing modules:', strftime('%X'))
data = xr.concat(data, dim='module')
print(data)
self.assertIsNotNone(data)
def test_processmodule2(self):
cls = self.__class__
chunksize = 512
backend = 'multiprocessing'
mod_list = [15, 3]
n_jobs = len(mod_list)
log_root.info(f'processing {chunksize} trains per chunk')
log_root.info(f'using parallelization backend {backend}')
info = tbdet.load_dssc_info(2212, 89)
bin1 = tbdet.create_dssc_bins("trainId",
info['trainIds'],
np.ones(1691))
binners = {'trainId': bin1}
module_jobs = []
for m in mod_list:
module_jobs.append(dict(
proposal=2212,
run_nr=89,
module=m,
chunksize=chunksize,
info=info,
dssc_binners=binners,
))
print('start processing modules:', strftime('%X'))
data = joblib.Parallel(n_jobs=n_jobs, backend=backend) \
(joblib.delayed(tbdet.bin_data)(**module_jobs[i]) \
for i in range(len(mod_list)))
print('finished processing modules:', strftime('%X'))
data = xr.concat(data, dim='module')
data = data.squeeze()
print(data)
self.assertIsNotNone(data)
def test_storage(self):
cls = self.__class__
tbdet.save_xarray('./tmp/run235.h5', cls._module_data)
tbdet.save_xarray('./tmp/run235.h5', cls._module_data,
mode = 'w')
run235 = tbdet.load_xarray('./tmp/run235.h5')
self.assertIsNotNone(run235)
#run235.close()
with self.assertRaises(ToolBoxFileError) as cm:
tbdet.save_xarray('./tmp/run235.h5',
cls._module_data.isel(pulse=0),
mode = 'a')
def list_suites():
print("\nPossible test suites:\n" + "-" * 79)
for key in suites:
print(key)
print("-" * 79 + "\n")
def suite(*tests):
suite = unittest.TestSuite()
for test in tests:
suite.addTest(TestDSSC(test))
return suite
def main(*cliargs):
try:
for test_suite in cliargs:
if test_suite in suites:
runner = unittest.TextTestRunner(verbosity=2)
runner.run(suite(*suites[test_suite]))
else:
log_root.warning(
"Unknown suite: '{}'".format(test_suite))
pass
except Exception as err:
log_root.error("Unecpected error: {}".format(err),
exc_info=True)
pass
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--list-suites',
action='store_true',
help='list possible test suites')
parser.add_argument('--run-suites', metavar='S',
nargs='+', action='store',
help='a list of valid test suites')
args = parser.parse_args()
if args.list_suites:
list_suites()
if args.run_suites:
main(*args.run_suites)