diff --git a/setup.py b/setup.py index 5bd5110aef41384518e56f2673a0a2ac28a2b927..21108ad425decdac5c20dcd959da768a19261a6d 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,7 @@ setup(name='toolbox_scs', package_data={}, install_requires=[ 'xarray>=0.13.0', 'numpy', 'matplotlib', - 'pandas', 'scipy', 'h5py', + 'pandas', 'scipy', 'h5py', 'h5netcdf', 'extra_data', 'euxfel_bunch_pattern', ], ) diff --git a/src/toolbox_scs/detectors/DSSC_bkp.py b/src/toolbox_scs/detectors/DSSC_cls.py similarity index 100% rename from src/toolbox_scs/detectors/DSSC_bkp.py rename to src/toolbox_scs/detectors/DSSC_cls.py diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index 6b985d8552170fee0e6f241a9d7473941640ff89..42007e11da35738419945ca70f055912c76b1d4a 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -3,7 +3,7 @@ from .xgm import ( from .tim import ( load_TIM,) from .dssc import ( - load_dssc_info, prepare_module_empty) + load_dssc_info, calc_xgm_frame_indices, process_dssc_module, split_frames) __all__ = ( # Functions @@ -11,7 +11,9 @@ __all__ = ( "load_TIM", "matchXgmTimPulseId", "load_dssc_info", - "prepare_module_empty", + "calc_xgm_frame_indices", + "process_dssc_module", + "split_frames", # Classes # Variables ) diff --git a/src/toolbox_scs/detectors/dssc.py b/src/toolbox_scs/detectors/dssc.py index e6de1aa718b948dcc79c8afe0422d3227f94e709..601f01ca1f446a720d29d066b271398f143266d9 100644 --- a/src/toolbox_scs/detectors/dssc.py +++ b/src/toolbox_scs/detectors/dssc.py @@ -20,6 +20,7 @@ import numpy as np import xarray as xr import pandas as pd +import extra_data as ed from ..load import run_by_proposal as _open_run @@ -50,6 +51,39 @@ def load_dssc_info(proposal, run_nr): return info +def calc_xgm_frame_indices(nbunches, framepattern): + """ + Returns a coordinate array for XGM data. The coordinates correspond to + DSSC frame numbers and depend on the number of FEL pulses per train + ("nbunches") and the framepattern. In framepattern, dark DSSC frame + names (i.e., without FEL pulse) _must_ include "dark" as a substring. + + Parameters + ---------- + nbunches: int + number of bunches per train + framepattern: list + experimental pattern + + Returns + ------- + frame_indices: numpy.ndarray + coordinate array corresponding to DSSC frame numbers + + """ + + n_frames = len(framepattern) + n_data_frames = np.sum(['dark' not in p for p in framepattern]) + frame_max = nbunches * n_frames // n_data_frames + + frame_indices = [] + for i, p in enumerate(framepattern): + if 'dark' not in p: + frame_indices.append(np.arange(i, frame_max, n_frames)) + + return np.sort(np.concatenate(frame_indices)) + + def prepare_module_empty(scan_variable, framepattern): """ Create empty (zero-valued) DataArray for a single DSSC module @@ -57,11 +91,16 @@ def prepare_module_empty(scan_variable, framepattern): Parameters ---------- - + scan_variable : xarray.DataArray + xarray DataArray containing the specified scan variable using + the trainId as coordinate. + framepattern: list of strings + example: ['pumped', 'unpumped'] Returns ------- - + module_data: xarray.Dataset + empty DataArray """ len_scan = len(np.unique(scan_variable)) @@ -84,19 +123,24 @@ def prepare_module_empty(scan_variable, framepattern): def load_chunk_data(sel, sourcename, maxframes=None): '''Load DSSC data (sel is a DataCollection or a subset of a DataCollection - obtained by its select_trains() method). The flattened multi-index (trains+pulses) - is unraveled before returning the data. + obtained by its select_trains() method). The flattened multi-index + (trains+pulses) is unraveled before returning the data. ''' info = sel.detector_info(sourcename) fpt = info['frames_per_train'] frames_total = info['total_frames'] - data = sel.get_array(sourcename, 'image.data', extra_dims=['_empty_', 'x', 'y']).squeeze() + data = sel.get_array(sourcename, 'image.data', + extra_dims=['_empty_', 'x', 'y'] + ).squeeze() tids = np.unique(data.trainId) data = data.rename(dict(trainId='trainId_pulse')) - midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)], names=('trainId', 'pulse')) - data = xr.DataArray(data, dict(trainId_pulse=midx)).unstack('trainId_pulse') + midx = pd.MultiIndex.from_product([sorted(tids), range(fpt)], + names=('trainId', 'pulse')) + data = xr.DataArray(data, + dict(trainId_pulse=midx) + ).unstack('trainId_pulse') data = data.transpose('trainId', 'pulse', 'x', 'y') return data.loc[{'pulse': np.s_[:maxframes]}] @@ -117,13 +161,16 @@ def merge_chunk_data(module_data, chunk_data, framepattern): def split_frames(data, pattern, prefix=''): - '''Split frames according to "pattern" (possibly repeating) and average over resulting splits. + """ + Split frames according to "pattern" (possibly repeating) and average over + resulting splits. "pattern" is a list of frame names (order matters!). Examples: - pattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark'] # 4 DSSC frames, 2 FEL pulses + pattern = ['pumped', 'pumped_dark', 'unpumped', 'unpumped_dark'] # 4 + DSSC frames, 2 FEL pulses pattern = ['pumped', 'unpumped'] # 2 FEL frames, no intermediate darks pattern = ['image'] # no splitting, average over all frames Returns a dataset with data variables named prefix + framename - ''' + """ n = len(pattern) dataset = xr.Dataset() for i, name in enumerate(pattern): @@ -131,29 +178,11 @@ def split_frames(data, pattern, prefix=''): return dataset -def calc_xgm_frame_indices(nbunches, framepattern): - ''' - Returns a coordinate array for XGM data. The coordinates correspond to DSSC - frame numbers and depend on the number of FEL pulses per train ("nbunches") - and the framepattern. In framepattern, dark DSSC frame names (i.e., without - FEL pulse) _must_ include "dark" as a substring. - ''' - n_frames = len(framepattern) - n_data_frames = np.sum(['dark' not in p for p in framepattern]) - frame_max = nbunches * n_frames // n_data_frames - - frame_indices = [] - for i, p in enumerate(framepattern): - if 'dark' not in p: - frame_indices.append(np.arange(i, frame_max, n_frames)) - - return np.sort(np.concatenate(frame_indices)) - - def process_intra_train(job): '''Aggregate DSSC data (chunked, to fit into memory) for a single module. Averages over all trains, but keeps all pulses. - Designed for the multiprocessing module - expects a job dictionary with the following keys: + Designed for the multiprocessing module - expects a job dictionary with + the following keys: proposal : (int) proposal number run : (int) run number module : (int) DSSC module to process @@ -201,18 +230,42 @@ def process_intra_train(job): def process_dssc_module(job): - '''Aggregate DSSC data (chunked, to fit into memory) for a single module. - Groups by "scan_variable" in given scanfile - use dummy scan_variable to average - over all trains. This implies, that only trains found in the scanfile are considered. - Designed for the multiprocessing module - expects a job dictionary with the following keys: - proposal : (int) proposal number - run : (int) run number - module : (int) DSSC module to process - chunksize : (int) number of trains to process simultaneously - scanfile : (str) name of hdf5 file with xarray.DataArray containing the scan variable and trainIds - framepattern : (list of str) names for the (possibly repeating) intra-train pulses. See split_dssc_data - pulsemask : (str) name of hdf5 file with boolean xarray.DataArray to select/reject trains and pulses - ''' + """ + Aggregate DSSC data (chunked, to fit into memory) for a single module. + Groups by "scan_variable" in given scanfile - use dummy scan_variable to + average over all trains. This implies, that only trains found in the + scanfile are considered. + + Parameters + ---------- + job:Â dictionary + Designed for the multiprocessing module - expects a job dictionary with + the following keys: + proposal : int + proposal number + run : int + run number + module : int + DSSC module to process + chunksize : int + number of trains to process simultaneously + scanfile : str + name of hdf5 file with xarray.DataArray containing the + scan variable and trainIds + framepattern : list of str + names for the (possibly repeating) intra-train pulses. See + split_dssc_data + pulsemask : str + name of hdf5 file with boolean xarray.DataArray to + select/reject trains and pulses + + Returns + ------- + module_data: xarray.Dataset + + + """ + proposal = job['proposal'] run_nr = job['run_nr'] module = job['module'] @@ -220,29 +273,31 @@ def process_dssc_module(job): scanfile = job['scanfile'] framepattern = job.get('framepattern', ['image']) maskfile = job.get('maskfile', None) - + sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' - - collection = kd.open_run(proposal, run_nr, include=f'*DSSC{module:02d}*') - + collection = ed.open_run(proposal, run_nr, + include=f'*DSSC{module:02d}*') ntrains = len(collection.train_ids) - - # read preprocessed scan variable from file - selection and (possibly) rounding already done. - scan = xr.open_dataarray(scanfile, 'data', autoclose=True) + + # read preprocessed scan variable from file - selection and (possibly) + # rounding already done. + scan = xr.open_dataarray(scanfile, 'data', engine='h5netcdf') # read binary pulse/train mask - e.g. from XGM thresholding if maskfile is not None: - pulsemask = xr.open_dataarray(maskfile, 'data', autoclose=True) + pulsemask = xr.open_dataarray(maskfile, 'data', engine='h5netcdf') else: pulsemask = None - + module_data = prepare_module_empty(scan, framepattern) chunks = np.arange(ntrains, step=chunksize) if module == 15: # quick and dirty progress bar pbar = tqdm(total=len(chunks)) + for start_index in chunks: - sel = collection.select_trains(kd.by_index[start_index:start_index + chunksize]) + sel = collection.select_trains( + ed.by_index[start_index:start_index + chunksize]) nframes = sel.detector_info(sourcename)['total_frames'] if nframes > 0: # some chunks have no DSSC data at all data = load_chunk_data(sel, sourcename) @@ -250,18 +305,19 @@ def process_dssc_module(job): if pulsemask is not None: data = data.where(pulsemask) sum_count = sum_count.where(pulsemask) - + data = split_frames(data, framepattern) - sum_count = split_frames(sum_count, framepattern, prefix='sum_count_') + sum_count = split_frames(sum_count, framepattern, + prefix='sum_count_') data = xr.merge([data, sum_count]) - - data['scan_variable'] = scan # aligns on trainId, drops non-matching trains + + # aligns on trainId, drops non-matching trains + data['scan_variable'] = scan data = data.groupby('scan_variable').sum('trainId') module_data = merge_chunk_data(module_data, data, framepattern) if module == 15: pbar.update(1) - + for name in framepattern: module_data[name] = module_data[name] / module_data['sum_count_' + name] - return module_data - + return module_data \ No newline at end of file diff --git a/src/toolbox_scs/load.py b/src/toolbox_scs/load.py index ff8604261d0852c11b516398bbddaa353f629b09..947dea088b734ddeb776e99334dcc61c35be25ad 100644 --- a/src/toolbox_scs/load.py +++ b/src/toolbox_scs/load.py @@ -1,7 +1,8 @@ # -*- coding: utf-8 -*- """ Toolbox for SCS. - Various utilities function to quickly process data measured at the SCS instruments. + Various utilities function to quickly process data measured at the SCS + instruments. Copyright (2019) SCS Team. """ @@ -240,8 +241,7 @@ def load_scan_variable(run, mnemonic, stepsize=None): dims=['trainId'], coords={'trainId': run.train_ids}) elif mnemonic in _mnemonics_ld: mnem = _mnemonics_ld[mnemonic] - data = run.get_array(mnem['source'], - mnem['key'], mnem['dim']) + data = run.get_array(*mnem.values()) else: raise ToolBoxValueError("Invalid mnemonic given", mnemonic) diff --git a/src/toolbox_scs/plot/plot_dssc.py b/src/toolbox_scs/plot/plot_dssc.py index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..7475bc411dd4dab1ab1cc0a27415960469274b25 100644 --- a/src/toolbox_scs/plot/plot_dssc.py +++ b/src/toolbox_scs/plot/plot_dssc.py @@ -0,0 +1,64 @@ +""" DSSC visualization routines + + Plotting sub-routines frequently done in combination with dssc analysis. + The initial code is based on: https://github.com/dscran/dssc_process + /blob/master/example_image_process_pulsemask.ipynb + + Todo: For visualization of statistical information we could eventually + switch to seaborn: https://seaborn.pydata.org/ +""" + +from time import strftime + +import matplotlib.pyplot as plt +import numpy as np +import xarray as xr + + + +def xgm_threshold(xgm, scan, + xgm_min = None, xgm_max = None, + run_nr = '', + safe_fig = False): + + fig, [ax1, ax2] = plt.subplots(nrows=2, sharex=True) + + ax1.plot(xgm.trainId, xgm, 'o', c='C0', ms=1) + ax1.set_ylabel('xgm') + if xgm_min: + ax1.axhline(xgm_min, c='r') + if xgm_max: + ax1.axhline(xgm_max, c='r') + + ax2.plot(scan.trainId, scan) + ax2.set_ylabel('scan variable') + ax2.set_xlabel('trainId') + + ax1.set_title(f'run: {run_nr}') + + if safe_fig == True: + tstamp = strftime('%y%m%d_%H%M') + fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) + + +def trains_per_step(scan, + run_nr = '', + safe_fig = False): + + counts = xr.DataArray(np.ones(len(scan)), + dims=['scan_variable'], + coords={'scan_variable': scan.values}, + name='counts') + + counts = counts.groupby('scan_variable').sum() + + fig, ax = plt.subplots() + ax.plot(counts.scan_variable, counts, 'o', ms=4) + ax.set_xlabel('scan variable') + ax.set_ylabel('number of trains') + ax.set_title(f'run {run_nr}') + ax.grid(True) + + if safe_fig == True: + tstamp = strftime('%y%m%d_%H%M') + fig.savefig(f'images/run{run_nr}_scan_{tstamp}.png', dpi=200) \ No newline at end of file diff --git a/src/toolbox_scs/test/test_detectors_dssc.py b/src/toolbox_scs/test/test_detectors_dssc.py index df974b6bd261aebac854946591eedc622b3dac0d..bd4a38df96aed6e128aeffb7f42d3cc14f8901e6 100644 --- a/src/toolbox_scs/test/test_detectors_dssc.py +++ b/src/toolbox_scs/test/test_detectors_dssc.py @@ -4,20 +4,26 @@ import os import sys import argparse import shutil +import multiprocessing +from time import strftime +import numpy as np +import xarray as xr import toolbox_scs as tb import toolbox_scs.detectors as tbdet +from toolbox_scs.detectors.dssc import (load_chunk_data, prepare_module_empty) from toolbox_scs.util.exceptions import * +import extra_data as ed logging.basicConfig(level=logging.DEBUG) log_root = logging.getLogger(__name__) -# --------------------------------------------------------------------------------- -# global test settings (based on https://github.com/dscran/dssc_process/blob/master -# /example_image_process_pulsemask.ipynb) -# --------------------------------------------------------------------------------- +#------------------------------------------------------------------------------- +# global test settings (based on https://github.com/dscran/dssc_process/blob/ +# master/example_image_process_pulsemask.ipynb) +#------------------------------------------------------------------------------- proposal = 2212 run_nr = 235 is_dark = False @@ -25,62 +31,165 @@ framepattern = ['pumped', 'unpumped'] maxframes = None stepsize = .03 scan_variable = 'PP800_PhaseShifter' -# --------------------------------------------------------------------------------- +xgm_min = 0 +xgm_max = np.inf +# ------------------------------------------------------------------------------ -suites = {"preparation": ( + +suites = {"no-processing": ( + "test_info", + "test_calcindices", + "test_maskpulses", + "test_prepareempty", + "test_loadchunkdata", + "test_splitframes", + ), + "full": ( "test_info", + "test_calcindices", + "test_maskpulses", "test_prepareempty", + "test_loadchunkdata", + "test_splitframes", + "test_processmodule", ) } -def list_suites(): - print("""\nPossible test suites:\n-------------------------""") - for key in suites: - print(key) - print("-------------------------\n") - +_temp_dirs = ['tmp', 'images', 'processed_runs'] def setup_tmp_dir(): - for d in ['tmp', 'images', 'processed_runs']: + for d in _temp_dirs: if not os.path.isdir(d): os.mkdir(d) def cleanup_tmp_dir(): - for d in ['tmp', 'images', 'processed_runs']: + for d in _temp_dirs: shutil.rmtree(d, ignore_errors=True) + log_root.info(f'remove {d}') class TestDSSC(unittest.TestCase): @classmethod def setUpClass(cls): log_root.info("Start global setup") - setup_tmp_dir() - - - cls._run = tb.run_by_proposal(proposal, run_nr, include='*DA*') cls._scanfile = './tmp/scan.h5' cls._maskfile = './tmp/mask.h5' - cls._scan_variable = tb.load_scan_variable( - cls._run, scan_variable, stepsize) - #cls._scan_variable.to_netcdf(cls._scanfile, group='data', mode='w') + cls._run = tb.run_by_proposal(proposal, run_nr, include='*DA*') + cls._scan_variable = tb.load_scan_variable(cls._run, + scan_variable, stepsize) + cls._scan_variable.to_netcdf(cls._scanfile, group='data', mode='w', + engine='h5netcdf') + cls._xgm = tbdet.load_xgm(cls._run) log_root.info("Finished global setup, start tests") @classmethod def tearDownClass(cls): + log_root.info("Clean up test environment....") cleanup_tmp_dir() def test_info(self): + cls = self.__class__ info = tbdet.load_dssc_info(proposal, run_nr) - self.assertEqual(info['frames_per_train'], 20) + fpt = info['frames_per_train'] + self.assertEqual(fpt, 20) + cls._fpt = fpt + + + @unittest.skipIf(is_dark == True, "Dark file given, skip xgm data") + def test_calcindices(self): + cls = self.__class__ + xgm_frame_coords = tbdet.calc_xgm_frame_indices( + cls._xgm.shape[1], framepattern) + self.assertEqual(xgm_frame_coords[-1], 19) + cls._xgm['pulse'] = xgm_frame_coords + + + @unittest.skipIf(is_dark == True, "No xgm data") + def test_maskpulses(self): + cls = self.__class__ + + data = np.ones([len(cls._run.train_ids), cls._fpt], dtype=bool) + dimensions = ['trainId', 'pulse'] + coordinates = {'trainId': cls._run.train_ids, 'pulse': range(cls._fpt)} + pulsemask = xr.DataArray(data, dims=dimensions, coords=coordinates) + + n_frames_dark = len([p for p in framepattern if 'dark' in p]) + valid = (cls._xgm > xgm_min) * (cls._xgm < xgm_max) + pulsemask = valid.combine_first(pulsemask).astype(bool) + nrejected = int(valid.size - valid.sum()) + percent_rejected = 100 * nrejected / valid.size + + log_root.info(f'rejecting {nrejected} out of {valid.size} pulses' + f'({percent_rejected:.1f}%) due to xgm threshold') + + pulsemask.to_netcdf(cls._maskfile, group='data', mode='w', + engine='h5netcdf') + def test_prepareempty(self): + cls = self.__class__ + module_data = prepare_module_empty(cls._scan_variable, + framepattern) + self.assertIsNotNone(module_data.dims['scan_variable']) + + + def test_loadchunkdata(self): + module = 1 + chunksize = 512 + sourcename = f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf' + + collection = ed.open_run(proposal, run_nr, + include=f'*DSSC{module:02d}*') + + ntrains = len(collection.train_ids) + chunks = np.arange(ntrains, step=chunksize) + start_index = chunks[0] + + sel = collection.select_trains( + ed.by_index[start_index:start_index + chunksize]) + data = load_chunk_data(sel, sourcename) + self.assertIsNotNone(data) + + + def test_processmodule(self): + cls = self.__class__ + max_GB = 400 + chunksize = int(max_GB * 128 // cls._fpt) + chunksize = min(512, chunksize) + print('processing', chunksize, 'trains per chunk') + + jobs = [] + for m in range(16): + jobs.append(dict( + proposal=proposal, + run_nr=run_nr, + module=m, + chunksize=chunksize, + scanfile=cls._scanfile, + framepattern=framepattern, + maskfile=None if is_dark else cls._maskfile, + maxframes=maxframes,)) + + print(f'start processing modules:', strftime('%X')) + + with multiprocessing.Pool(16) as pool: + module_data = pool.map(tbdet.process_dssc_module, jobs) + + print('finished processing modules:', strftime('%X')) + + def test_splitframes(self): pass +def list_suites(): + print("""\nPossible test suites:\n-------------------------""") + for key in suites: + print(key) + print("-------------------------\n") def suite(*tests): @@ -107,9 +216,6 @@ def main(*cliargs): - - - if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('--list-suites', @@ -124,4 +230,4 @@ if __name__ == '__main__': list_suites() if args.run_suites: - main(*args.run_suites) + main(*args.run_suites) \ No newline at end of file diff --git a/src/toolbox_scs/util/data_access.py b/src/toolbox_scs/util/data_access.py index b088bac354f18ba9af69af09c93f126113661974..66ff996428faa0ef5d5d0be5860c057147dfea45 100644 --- a/src/toolbox_scs/util/data_access.py +++ b/src/toolbox_scs/util/data_access.py @@ -18,8 +18,8 @@ def find_run_dir(proposal, run): """ Get run directory for given run. - This method is an extension to the extra_data method - 'find_proposal' and should eventually be transferred over. + This method is an extension to 'find_proposal' in the extra_data + package and should eventually be transferred to the latter. Parameters ---------- diff --git a/src/toolbox_scs/util/pkg_info.py b/src/toolbox_scs/util/pkg_info.py new file mode 100644 index 0000000000000000000000000000000000000000..e679441d3df71e7817189c448324cbcde50d331c --- /dev/null +++ b/src/toolbox_scs/util/pkg_info.py @@ -0,0 +1,2 @@ +def get_version(): + pass \ No newline at end of file