Skip to content
Snippets Groups Projects
Commit 8e806941 authored by Loïc Le Guyader's avatar Loïc Le Guyader
Browse files

Merge branch 'extra_data' into 'master'

Extra data and xfel kernel

See merge request !76
parents 3da6adda 1ad0b6c1
No related branches found
No related tags found
1 merge request!76Extra data and xfel kernel
import multiprocessing
from joblib import Parallel, delayed, parallel_backend
from time import strftime
import tempfile
import shutil
......@@ -7,9 +7,9 @@ import os
import warnings
import psutil
import karabo_data as kd
from karabo_data.read_machinery import find_proposal
from karabo_data.geometry2 import DSSC_1MGeometry
import extra_data as ed
from extra_data.read_machinery import find_proposal
from extra_geom import DSSC_1MGeometry
import ToolBox as tb
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
......@@ -43,8 +43,9 @@ class DSSC:
self.aspect = self.px_pitch_v/self.px_pitch_h # aspect ratio of the DSSC images
self.geom = None
self.mask = None
self.max_fraction_memory = 0.8
self.max_fraction_memory = 0.4
self.filter_mask = None
self.Nworker = 16
print('DSSC configuration')
print(f'Topic: {self.topic}')
......@@ -62,7 +63,7 @@ class DSSC:
shutil.rmtree(self.tempdir)
def open_run(self, run_nr, isDark=False):
""" Open a run with karabo-data and prepare the virtual dataset for multiprocessing
""" Open a run with extra-data and prepare the virtual dataset for multiprocessing
inputs:
run_nr: the run number
......@@ -70,13 +71,12 @@ class DSSC:
"""
print('Opening run data with karabo-data')
print('Opening run data with extra-data')
self.run_nr = run_nr
self.xgm = None
self.filter_mask = None
self.scan_vname = None
self.run = kd.open_run(self.proposal, self.run_nr)
self.run = ed.open_run(self.proposal, self.run_nr)
self.isDark = isDark
self.plot_title = f'{self.proposal} run: {self.run_nr}'
......@@ -100,14 +100,14 @@ class DSSC:
print(f'Temporary directory: {self.tempdir}')
print('Creating virtual dataset')
self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
self.vds_filenames = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
# create a dummy scan variable for dark run
# for other type or run, use DSSC.define_run function to overwrite it
self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'],
coords={'trainId': self.run.train_ids})
coords={'trainId': self.run.train_ids}).to_dataset(
name='scan_variable')
self.scan_vname = 'dummy'
self.vds_scan = None
def define_scan(self, vname, bins):
"""
......@@ -120,29 +120,24 @@ class DSSC:
"""
if type(vname) is dict:
self.scan = self.run.get_array(vname['source'], vname['key'])
scan = self.run.get_array(vname['source'], vname['key'])
elif type(vname) is str:
if vname not in tb.mnemonics:
raise ValueError(f'{vname} not found in the ToolBox mnemonics table')
self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
else:
raise ValueError(f'vname should be a string or a dict. We got {type(vname)}')
if (type(bins) is int) or (type(bins) is float):
self.scan = bins * np.round(self.scan / bins)
scan = bins * np.round(scan / bins)
else:
# TODO: digitize the data
raise ValueError(f'To be implemented')
self.scan_vname = vname
self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
if os.path.isfile(self.vds_scan):
os.remove(self.vds_scan)
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan = scan.to_dataset(name='scan_variable')
self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0')
self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0')
self.scan.to_netcdf(self.vds_scan, group='data')
self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])),
dims=['scan_variable'],
......@@ -176,7 +171,7 @@ class DSSC:
"""
if self.xgm is None:
self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches])
tb.mnemonics['SCS_SA3']['key'], roi=ed.by_index[:self.nbunches])
def plot_xgm_hist(self, nbins=100):
""" Plots an histogram of the SCS XGM dedicated SAS3 data.
......@@ -231,17 +226,28 @@ class DSSC:
print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm '
f'thresholds: [{self.xgm_low}, {self.xgm_high}]'))
def load_geom(self):
def load_geom(self, geopath=None, quad_pos=None):
""" Loads and return the DSSC geometry.
inputs:
geopath: path to the h5 geometry file. If None uses a default file.
quad_pos: list of quadrants tuple position. If None uses a default position.
output:
return the loaded geometry
"""
quad_pos = [
(-124.100, 3.112), # TR
(-133.068, -110.604), # BR
( 0.988, -125.236), # BL
( 4.528, -4.912) # TL
]
path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
self.geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
if quad_pos is None:
quad_pos = [(-124.100, 3.112), # TR
(-133.068, -110.604), # BR
( 0.988, -125.236), # BL
( 4.528, -4.912) # TL
]
if geopath is None:
geopath = '/gpfs/exfel/sw/software/git/EXtra-geom/docs/dssc_geo_june19.h5'
self.geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(geopath, quad_pos)
return self.geom
def load_mask(self, fname, plot=True):
......@@ -266,39 +272,39 @@ class DSSC:
""" Create virtual datasets for each 16 DSSC modules used for the multiprocessing.
input:
run: karabo-data run
run: extra-data run
path: string where the virtual files are created
output:
dictionnary of key:module, value:virtual dataset filename
"""
vds_list = []
for m in tqdm(range(16)):
vds_filename = os.path.join(path, f'dssc{m}_vds.h5')
if os.path.isfile(vds_filename):
os.remove(vds_filename)
module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf',
'image.data', filename=vds_filename)
vds_list.append([vds_filename, module_vds])
return vds_list
vds_filenames = {}
for module in tqdm(range(16)):
fname = os.path.join(path, f'dssc{module}_vds.h5')
if os.path.isfile(fname):
os.remove(fname)
vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf',
'image.data', filename=fname)
vds.file.close() # keep h5 file closed outside 'with' context
vds_filenames[module] = fname
return vds_filenames
def binning(self, do_pulse_mean=True):
""" Bin the DSSC data by the predifined scan type (DSSC.define()) using multiprocessing
"""
if self.vds_scan is None:
# probably a dark run with a dummy scan variable
self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
if os.path.isfile(self.vds_scan):
os.remove(self.vds_scan)
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan.to_netcdf(self.vds_scan, group='data')
# get available memory in GB, we will try to use 80 % of it
max_GB = psutil.virtual_memory().available/1024**3
print(f'max available memory: {max_GB} GB')
# max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
self.chunksize = int(self.max_fraction_memory*max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
# max_GB / (8byte * Nworker * 128px * 512px * N_pulses)
self.chunksize = int(self.max_fraction_memory*max_GB * 1024**3 // (8 * self.Nworker * 128 * 512 * self.fpt))
print('processing', self.chunksize, 'trains per chunk')
......@@ -307,20 +313,29 @@ class DSSC:
jobs.append(dict(
module=m,
fpt=self.fpt,
vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'),
vds=self.vds_filenames[m],
chunksize=self.chunksize,
vdf_scan=self.vds_scan,
scan=self.scan['scan_variable'],
nbunches=self.nbunches,
run_nr=self.run_nr,
do_pulse_mean=do_pulse_mean
))
if self.Nworker != 16:
with warnings.catch_warnings():
warnings.simplefilter("default")
warnings.warn(('Nworker other than 16 known to cause issue' +
'(https://in.xfel.eu/gitlab/SCS/ToolBox/merge_requests/76)'),
RuntimeWarning)
timestamp = strftime('%X')
print(f'start time: {timestamp}')
with multiprocessing.Pool(16) as pool:
module_data = pool.map(process_one_module, jobs)
with parallel_backend('loky', n_jobs=self.Nworker):
module_data = Parallel(verbose=20)(
delayed(process_one_module)(job) for job in tqdm(jobs)
)
print('finished:', strftime('%X'))
# rearange the multiprocessed data
......@@ -352,9 +367,9 @@ class DSSC:
save_folder = self.save_folder
if self.isDark:
fname = f'run{self.run_nr}_dark.h5' # no scan
fname = f'run{self.run_nr}_dark.nc' # no scan
else:
fname = f'run{self.run_nr}.h5' # run with delay scan (change for other scan types!)
fname = f'run{self.run_nr}.nc' # run with delay scan (change for other scan types!)
save_path = os.path.join(save_folder, fname)
......@@ -365,6 +380,7 @@ class DSSC:
warnings.warn(f'Overwriting file: {save_path}')
os.remove(save_path)
self.module_data.to_netcdf(save_path, group='data')
self.module_data.close()
os.chmod(save_path, 0o664)
print('saving: ', save_path)
else:
......@@ -385,8 +401,10 @@ class DSSC:
self.plot_title = f'{self.proposal} run: {runNB} dark: {dark_runNB}'
dark = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data')
binned = xr.open_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data')
dark = xr.load_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.nc'), group='data',
engine='netcdf4')
binned = xr.load_dataset(os.path.join(save_folder, f'run{runNB}.nc'), group='data',
engine='netcdf4')
binned['pumped'] = (binned['pumped'] - dark['pumped'].values)
binned['unpumped'] = (binned['unpumped'] - dark['unpumped'].values)
......@@ -592,22 +610,19 @@ class DSSC:
def process_one_module(job):
module = job['module']
fpt = job['fpt']
data_vdf = job['vdf_module']
scan_vdf = job['vdf_scan']
vds = job['vds']
scan = job['scan']
chunksize = job['chunksize']
nbunches = job['nbunches']
do_pulse_mean = job['do_pulse_mean']
image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data'
npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count'
with h5py.File(data_vdf, 'r') as m:
with h5py.File(vds, 'r') as m:
all_trainIds = m['INDEX/trainId'][()]
n_trains = len(all_trainIds)
chunk_start = np.arange(n_trains, step=chunksize, dtype=int)
frames_per_train = m[npulse_path][()]
trains_with_data = all_trainIds[frames_per_train == fpt]
# load scan variable
scan = xr.open_dataset(scan_vdf, group='data')['scan_variable']
scan.name = 'scan'
len_scan = len(scan.groupby(scan))
if do_pulse_mean:
......@@ -631,14 +646,7 @@ def process_one_module(job):
module_data['module'] = module
# crunching
with h5py.File(data_vdf, 'r') as m:
#fpt_calc = int(len(m[image_path]) / n_trains)
#assert fpt_calc == fpt, f'data length does not match expected value (module {module})'
all_trainIds = m['INDEX/trainId'][()]
frames_per_train = m[npulse_path][()]
trains_with_data = all_trainIds[frames_per_train == fpt]
#print(np.unique(pulses_per_train), '/', fpt)
#print(len(trains_with_data))
with h5py.File(vds, 'r') as m:
chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int)
trains_start = 0
......
......@@ -5,8 +5,8 @@ import os
import warnings
import psutil
import karabo_data as kd
from karabo_data.read_machinery import find_proposal
import extra_data as ed
from extra_data.read_machinery import find_proposal
import ToolBox as tb
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
......@@ -58,18 +58,18 @@ class DSSC1module:
self.maxSaturatedPixel = 1
def open_run(self, run_nr, t0=0.0):
""" Open a run with karabo-data and prepare the virtual dataset for multiprocessing
""" Open a run with extra-data and prepare the virtual dataset for multiprocessing
inputs:
run_nr: the run number
t0: optional t0 in mm
"""
print('Opening run data with karabo-data')
print('Opening run data with extra-data')
self.run_nr = run_nr
self.xgm = None
self.run = kd.open_run(self.proposal, self.run_nr)
self.run = ed.open_run(self.proposal, self.run_nr)
self.plot_title = f'{self.proposal} run: {self.run_nr}'
self.fpt = self.run.detector_info(f'SCS_DET_DSSC1M-1/DET/{self.module}CH0:xtdf')['frames_per_train']
......@@ -92,7 +92,7 @@ class DSSC1module:
print(f'Loading XGM data')
self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
tb.mnemonics['SCS_SA3']['key'],
roi=kd.by_index[:self.nbunches])
roi=ed.by_index[:self.nbunches])
self.xgm = self.xgm.rename({'dim_0':'pulseId'})
self.xgm['pulseId'] = np.arange(0, 2*self.nbunches, 2)
......@@ -290,6 +290,7 @@ class DSSC1module:
warnings.warn(f'Overwriting file: {save_path}')
os.remove(save_path)
data.to_netcdf(save_path, group='data')
data.close()
os.chmod(save_path, 0o664)
print('saving: ', save_path)
else:
......@@ -306,7 +307,7 @@ class DSSC1module:
save_folder = self.save_folder
self.run_nr = dark_runNB
self.dark_data = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data')
self.dark_data = xr.load_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data')
self.plot_title = f"{self.proposal} dark: {self.dark_data['run'].values}"
def show_rois(self):
......@@ -422,4 +423,4 @@ def process_one_module(job):
module_data['std_data'] += (temp**2).sum(dim='trainId')
module_data['counts'] += n_trains
return module_data
\ No newline at end of file
return module_data
......@@ -7,8 +7,8 @@ import os
import warnings
import psutil
import karabo_data as kd
from karabo_data.read_machinery import find_proposal
import extra_data as ed
from extra_data.read_machinery import find_proposal
import ToolBox as tb
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
......@@ -72,7 +72,7 @@ class FastCCD:
shutil.rmtree(self.tempdir)
def open_run(self, run_nr, isDark=False, t0=0.0):
""" Open a run with karabo-data and prepare the virtual dataset for multiprocessing
""" Open a run with extra-data and prepare the virtual dataset for multiprocessing
inputs:
run_nr: the run number
......@@ -80,11 +80,11 @@ class FastCCD:
t0: optional t0 in mm
"""
print('Opening run data with karabo-data')
print('Opening run data with extra-data')
self.run_nr = run_nr
self.xgm = None
self.run = kd.open_run(self.proposal, self.run_nr)
self.run = ed.open_run(self.proposal, self.run_nr)
self.plot_title = f'{self.proposal} run: {self.run_nr}'
self.isDark = isDark
self.fpt = 1
......@@ -108,7 +108,7 @@ class FastCCD:
try:
self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
tb.mnemonics['SCS_SA3']['key'],
roi=kd.by_index[:self.nbunches])
roi=ed.by_index[:self.nbunches])
self.xgm = self.xgm.squeeze() # remove the pulseId dimension since XGM should have only 1 value per train
except:
self.xgm = xr.DataArray(np.zeros_like(self.run.train_ids),dims = 'trainId', coords = {"trainId":self.run.train_ids})
......@@ -369,6 +369,7 @@ class FastCCD:
warnings.warn(f'Overwriting file: {save_path}')
os.remove(save_path)
self.module_data.to_netcdf(save_path, group='data')
self.module_data.close()
os.chmod(save_path, 0o664)
print('saving: ', save_path)
else:
......@@ -389,10 +390,10 @@ class FastCCD:
self.plot_title = f'{self.proposal} run: {runNB} dark: {dark_runNB}'
binned = xr.open_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data', cache=False)
binned = xr.load_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data')
if dark_runNB is not None:
dark = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data', cache=False)
dark = xr.load_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data')
binned['pumped'] = self.gain*(binned['pumped'] - dark['pumped'].squeeze(drop=True))
binned['unpumped'] = self.gain*(binned['unpumped'] - dark['unpumped'].squeeze(drop=True))
......@@ -630,7 +631,7 @@ def process_one_module(job):
data_pumped = ds.where(ds['FastADC5'] > 0, drop=True).groupby('scan_variable').sum('trainId')
data_unpumped = ds.where(ds['FastADC5'] < 1, drop=True).groupby('scan_variable').sum('trainId')
module_data = data_pumped['fastccd'].to_dataset('pumped')
module_data = data_pumped['fastccd'].to_dataset(name='pumped')
module_data['unpumped'] = data_unpumped['fastccd']
module_data['sum_count_pumped'] = data_pumped['sum_count']
module_data['sum_count_unpumped'] = data_unpumped['sum_count']
......@@ -638,4 +639,4 @@ def process_one_module(job):
module_data['xgm_unpumped'] = data_unpumped['xgm']
module_data['workerId'] = workerId
return module_data
\ No newline at end of file
return module_data
......@@ -6,8 +6,8 @@
Copyright (2019) SCS Team.
"""
import numpy as np
from karabo_data import by_index, RunDirectory
from karabo_data.read_machinery import find_proposal
from extra_data import by_index, RunDirectory
from extra_data.read_machinery import find_proposal
import xarray as xr
import os
from ToolBox.bunch_pattern import extractBunchPattern
......@@ -421,7 +421,7 @@ def load(fields, runNB, proposalNB, subFolder='raw', display=False, validate=Fal
subFolder: (str) sub-folder from which to load the data. Use 'raw' for raw
data or 'proc' for processed data.
display: (bool) whether to show the run.info or not
validate: (bool) whether to run karabo-data-validate or not
validate: (bool) whether to run extra-data-validate or not
subset: a subset of train that can be load with by_index[:5] for the
first 5 trains
rois: a dictionnary of mnemonics with a list of rois definition and the desired
......@@ -444,7 +444,7 @@ def load(fields, runNB, proposalNB, subFolder='raw', display=False, validate=Fal
run = RunDirectory(runFolder).select_trains(subset)
if validate:
get_ipython().system('karabo-data-validate ' + runFolder)
get_ipython().system('extra-data-validate ' + runFolder)
if display:
print('Loading data from {}'.format(runFolder))
run.info()
......
......@@ -22,7 +22,7 @@ def extractBunchPattern(bp_table=None, key='sase3', runDir=None):
bp_table: DataArray corresponding to the mnemonics "bunchPatternTable".
If None, the bunch pattern table is loaded using runDir.
key: str, ['sase1', 'sase2', 'sase3', 'scs_ppl']
runDir: karabo_data run directory. Required only if bp_table is None.
runDir: extra-data DataCollection. Required only if bp_table is None.
Outputs:
bunchPattern: DataArray containing indices of the sase/laser pulses for
......@@ -255,4 +255,4 @@ def sortBAMdata(data, key='scs_ppl', sa3Offset=0):
ndata = xr.merge(mergeList, join='inner')
for k in data.attrs.keys():
ndata.attrs[k] = data.attrs[k]
return ndata
\ No newline at end of file
return ndata
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment