Skip to content
Snippets Groups Projects

DSSC multiprocessing analysis

Merged Loïc Le Guyader requested to merge crunchDSSC into master
Files
3
DSSC.py 0 → 100644
+ 623
0
import multiprocessing
from time import strftime
import tempfile
import shutil
from tqdm.auto import tqdm
import os
import warnings
import psutil
import karabo_data as kd
from karabo_data.geometry2 import DSSC_1MGeometry
import ToolBox as tb
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
import numpy as np
import xarray as xr
import h5py
from imageio import imread
class DSSC:
def __init__(self, semester, proposal, topic='SCS', distance=1):
""" Create a DSSC object to process DSSC data.
inputs:
semester: semester string
proposal: proposal number string
topic: topic, by default SCS
distance: distance sample to DSSC detector in meter
"""
self.semester = semester
self.proposal = proposal
self.topic = topic
self.tempdir = None
self.save_folder = f'/gpfs/exfel/exp/{self.topic}/{self.semester}/{self.proposal}/usr/condensed_runs/'
self.distance = distance
self.px_pitch_h = 236 # horizontal pitch in microns
self.px_pitch_v = 204 # vertical pitch in microns
self.aspect = self.px_pitch_v/self.px_pitch_h # aspect ratio of the DSSC images
self.geom = None
self.mask = None
print('DSSC configuration')
print(f'Topic: {self.topic}')
print(f'Semester: {self.semester}')
print(f'Proposal: {self.proposal}')
print(f'Default save folder: {self.save_folder}')
print(f'Sample to DSSC distance: {self.distance} m')
if not os.path.exists(self.save_folder):
warnings.warn(f'Default save folder does not exist: {self.save_folder}')
def __del__(self):
# deleting temporay folder
if self.tempdir:
shutil.rmtree(self.tempdir)
def open_run(self, run_nr, isDark=False):
""" Open a run with karabo-data and prepare the virtual dataset for multiprocessing
inputs:
run_nr: the run number
isDark: a boolean to specify if the run is a dark run or not
"""
print('Opening run data with karabo-data')
self.run_nr = run_nr
self.xgm = None
self.scan_vname = None
self.run = kd.open_run(self.proposal, self.run_nr)
self.isDark = isDark
self.plot_title = f'{self.proposal} run: {self.run_nr}'
self.fpt = self.run.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')['frames_per_train']
self.nbunches = self.run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
self.nbunches = np.unique(self.nbunches)
if len(self.nbunches) == 1:
self.nbunches = self.nbunches[0]
else:
warnings.warn('not all trains have same length DSSC data')
print(f'nbunches: {self.nbunches}')
self.nbunches = self.nbunches[-1]
print(f'DSSC frames per train: {self.fpt}')
print(f'SA3 bunches per train: {self.nbunches}')
if self.tempdir is not None:
shutil.rmtree(self.tempdir)
self.tempdir = tempfile.mkdtemp()
print(f'Temporary directory: {self.tempdir}')
print('Creating virtual dataset')
self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
# create a dummy scan variable for dark run
# for other type or run, use DSSC.define_run function to overwrite it
self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'],
coords={'trainId': self.run.train_ids})
self.scan_vname = 'dummy'
self.vds_scan = None
def define_scan(self, vname, bins):
"""
Prepare the binning of the DSSC data.
inputs:
vname: variable name for the scan, can be a mnemonic string from ToolBox
or a dictionnary with ['source', 'key'] fields
bins: step size (or bins_edge but not yet implemented)
"""
if type(vname) is dict:
self.scan = self.run.get_array(vname['source'], vname['key'])
elif type(vname) is str:
if vname not in tb.mnemonics:
raise ValueError(f'{vname} not found in the ToolBox mnemonics table')
self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
else:
raise ValueError(f'vname should be a string or a dict. We got {type(vname)}')
if (type(bins) is int) or (type(bins) is float):
self.scan = bins * np.round(self.scan / bins)
else:
# TODO: digitize the data
raise ValueError(f'To be implemented')
self.scan_vname = vname
self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
if os.path.isfile(self.vds_scan):
os.remove(self.vds_scan)
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0')
self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0')
self.scan.to_netcdf(self.vds_scan, group='data')
self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])),
dims=['scan_variable'],
coords={'scan_variable': self.scan['scan_variable'].values},
name='counts')
self.scan_points = self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values
self.scan_points_counts = self.scan_counts.groupby('scan_variable').sum()
self.plot_scan()
def plot_scan(self):
""" Plot a previously defined scan to see the scan range and the statistics.
"""
if self.scan:
fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
else:
fig, ax1 = plt.subplots(nrows=1, figsize=[5, 2.5])
ax1.plot(self.scan_points, self.scan_points_counts, 'o-', ms=2)
ax1.set_xlabel(f'{self.scan_vname}')
ax1.set_ylabel('# trains')
ax1.set_title(self.plot_title)
if self.scan:
ax2.plot(self.scan['scan_variable'])
ax2.set_xlabel('train #')
ax2.set_ylabel(f'{self.scan_vname}')
def load_xgm(self):
""" Loads pulse resolved dedicated SAS3 data from the SCS XGM.
"""
if self.xgm is None:
self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches])
def plot_xgm_hist(self, nbins=100):
""" Plots an histogram of the SCS XGM dedicated SAS3 data.
inputs:
nbins: number of the bins for the histogram.
"""
if self.xgm is None:
self.load_xgm()
hist, bins_edges = np.histogram(self.xgm, nbins, density=True)
width = 1.0 * (bins_edges[1] - bins_edges[0])
bins_center = 0.5*(bins_edges[:-1] + bins_edges[1:])
plt.figure(figsize=(5,3))
plt.bar(bins_center, hist, align='center', width=width)
plt.xlabel(f"{tb.mnemonics['SCS_SA3']['source']}{tb.mnemonics['SCS_SA3']['key']}")
plt.ylabel('density')
plt.title(self.plot_title)
def xgm_filter(self, xgm_low=-np.inf, xgm_high=np.inf):
""" Filters the data by train. If one pulse within a train has an SASE3 SCS XGM value below
xgm_low or above xgm_high, that train will be dropped from the dataset.
inputs:
xgm_low: low threshold value
xgm_high: high threshold value
"""
if self.xgm is None:
self.load_xgm()
if self.isDark:
warnings.warn(f'This run was loaded as dark. Filtering on xgm makes no sense. Aborting')
return
self.xgm_low = xgm_low
self.xgm_high = xgm_high
valid = ((self.xgm > self.xgm_low) * (self.xgm < self.xgm_high)).prod('dim_0').astype(bool)
xgm_valid = self.xgm.where(valid)
xgm_valid = xgm_valid.dropna('trainId')
self.scan = self.scan.sel({'trainId': xgm_valid.trainId})
nrejected = len(self.run.train_ids) - len(self.scan)
print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm '
f'thresholds: [{self.xgm_low}, {self.xgm_high}]'))
def load_geom(self):
""" Loads and return the DSSC geometry.
"""
quad_pos = [
(-124.100, 3.112), # TR
(-133.068, -110.604), # BR
( 0.988, -125.236), # BL
( 4.528, -4.912) # TL
]
path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
self.geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
return self.geom
def load_mask(self, fname, plot=True):
""" Load a DSSC mask file.
input:
fname: string of the filename of the mask file
plot: if True, the loaded mask is plotted
"""
dssc_mask = imread(fname)
dssc_mask = dssc_mask.astype(float)[..., 0] // 255
dssc_mask[dssc_mask==0] = np.nan
self.mask = dssc_mask
if plot:
plt.figure()
plt.imshow(self.mask)
def create_virtual_dssc_datasets(self, run, path=''):
""" Create virtual datasets for each 16 DSSC modules used for the multiprocessing.
input:
run: karabo-data run
path: string where the virtual files are created
"""
vds_list = []
for m in tqdm(range(16)):
vds_filename = os.path.join(path, f'dssc{m}_vds.h5')
if os.path.isfile(vds_filename):
os.remove(vds_filename)
module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf',
'image.data', filename=vds_filename)
vds_list.append([vds_filename, module_vds])
return vds_list
def binning(self):
""" Bin the DSSC data by the predifined scan type (DSSC.define()) using multiprocessing
"""
if self.vds_scan is None:
# probably a dark run with a dummy scan variable
self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
if os.path.isfile(self.vds_scan):
os.remove(self.vds_scan)
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan.to_netcdf(self.vds_scan, group='data')
# get available memory in GB, we will try to use 80 % of it
max_GB = psutil.virtual_memory().available/1024**3
print(f'max available memory: {max_GB} GB')
# max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
self.chunksize = int(0.8*max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
print('processing', self.chunksize, 'trains per chunk')
jobs = []
for m in range(16):
jobs.append(dict(
module=m,
fpt=self.fpt,
vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'),
chunksize=self.chunksize,
vdf_scan=self.vds_scan,
nbunches=self.nbunches,
run_nr=self.run_nr,
))
timestamp = strftime('%X')
print(f'start time: {timestamp}')
with multiprocessing.Pool(16) as pool:
module_data = pool.map(process_one_module, jobs)
print('finished:', strftime('%X'))
# rearange the multiprocessed data
self.module_data = xr.concat(module_data, dim='module')
self.module_data['run'] = self.run_nr
self.module_data = self.module_data.transpose('scan_variable', 'module', 'x', 'y')
self.module_data = xr.merge([self.module_data, self.scan.groupby('scan_variable').mean('trainId')])
self.module_data = self.module_data.squeeze()
self.module_data.attrs['scan_variable'] = self.scan_vname
def save(self, save_folder=None, overwrite=False):
""" Save the crunched data.
inputs:
save_folder: string of the fodler where to save the data.
overwrite: boolean whether or not to overwrite existing files.
"""
if save_folder is None:
save_folder = this.save_folder
if self.isDark:
fname = f'run{self.run_nr}_dark.h5' # no scan
else:
fname = f'run{self.run_nr}.h5' # run with delay scan (change for other scan types!)
save_path = os.path.join(save_folder, fname)
file_exists = os.path.isfile(save_path)
if not file_exists or (file_exists and overwrite):
if file_exists:
warnings.warn(f'Overwriting file: {save_path}')
os.remove(save_path)
self.module_data.to_netcdf(save_path, group='data')
os.chmod(save_path, 0o664)
print('saving: ', save_path)
else:
print('file', save_path, 'exists and overwrite is False')
def load_binned(self, runNB, dark_runNB, xgm_norm = True, save_folder=None):
""" load previously binned (crunched) DSSC data by DSSC.crunch() and DSSC.save()
inputs:
runNB: run number to load
dark_runNB: run number of the corresponding dark
xgm_norm: normlize by XGM data if True
save_folder: path string where the crunched data are saved
"""
if save_folder is None:
save_folder = self.save_folder
self.plot_title = f'{self.proposal} run: {runNB} dark: {dark_runNB}'
dark = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data')
binned = xr.open_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data')
binned['pumped'] = (binned['pumped'] - dark['pumped'].values)
binned['unpumped'] = (binned['unpumped'] - dark['unpumped'].values)
if xgm_norm:
binned['pumped'] = binned['pumped'] / binned['xgm_pumped']
binned['unpumped'] = binned['unpumped'] / binned['xgm_unpumped']
self.scan_points = binned['scan_variable']
self.scan_points_counts = binned['sum_count'][:, 0]
self.scan_vname = binned.attrs['scan_variable']
self.scan = None
self.binned = binned
def plot_DSSC(self, use_mask = True, p_low = 1, p_high = 98, vmin = None, vmax = None):
""" Plot pumped and unpumped DSSC images.
inputs:
use_mask: if True, a mask is applied on the DSSC.
p_low: low percentile value to adjust the contrast scale on the unpumped and pumped image
p_high: high percentile value to adjust the contrast scale on the unpumped and pumped image
vmin: low value of the image scale
vmax: high value of the image scale
"""
if use_mask:
if self.mask is None:
raise ValueError('No mask was loaded !')
mask = self.mask
mask_txt = ' masked'
else:
mask = 1
mask_txt = ''
if self.geom is None:
self.load_geom()
im_pump_mean, _ = self.geom.position_modules_fast(self.binned['pumped'].mean('scan_variable'))
im_unpump_mean, _ = self.geom.position_modules_fast(self.binned['unpumped'].mean('scan_variable'))
im_pump_mean = mask*im_pump_mean
im_unpump_mean = mask*im_unpump_mean
fig = plt.figure(figsize=(9, 4))
grid = ImageGrid(fig, 111,
nrows_ncols=(1,2),
axes_pad=0.15,
share_all=True,
cbar_location="right",
cbar_mode="single",
cbar_size="7%",
cbar_pad=0.15,
)
_vmin, _vmax = np.percentile(im_pump_mean[~np.isnan(im_pump_mean)], [p_low, p_high])
if vmin is None:
vmin = _vmin
if vmax is None:
vmax = _vmax
im = grid[0].imshow(im_pump_mean, vmin=vmin, vmax=vmax, aspect=self.aspect)
grid[0].set_title('pumped' + mask_txt)
im = grid[1].imshow(im_unpump_mean, vmin=vmin, vmax=vmax, aspect=self.aspect)
grid[1].set_title('unpumped' + mask_txt)
grid[-1].cax.colorbar(im)
grid[-1].cax.toggle_label(True)
fig.suptitle(self.plot_title)
def azimuthal_int(self, wl, center=None, angle_range=[0, 360], dr=1, use_mask=True):
""" Perform azimuthal integration of 1D binned DSSC run.
inputs:
wl: photon wavelength
center: center of integration
angle_range: angles of integration
dr: dr
use_mask: if True, use the loaded mask
"""
if self.geom is None:
self.load_geom()
if use_mask:
if self.mask is None:
raise ValueError('No mask was loaded !')
mask = self.mask
mask_txt = ' masked'
else:
mask = 1
mask_txt = ''
im_pumped_arranged, c_geom = self.geom.position_modules_fast(self.binned['pumped'].values)
im_unpumped_arranged, c_geom = self.geom.position_modules_fast(self.binned['unpumped'].values)
im_pumped_arranged *= mask
im_unpumped_arranged *= mask
im_pumped_mean = im_pumped_arranged.mean(axis=0)
im_unpumped_mean = im_unpumped_arranged.mean(axis=0)
if center is None:
center = c_geom
ai = tb.azimuthal_integrator(im_pumped_mean.shape, center, angle_range, dr=dr)
norm = ai(~np.isnan(im_pumped_mean))
az_pump = []
az_unpump = []
for i in tqdm(range(len(self.binned['scan_variable']))):
az_pump.append(ai(im_pumped_arranged[i]) / norm)
az_unpump.append(ai(im_unpumped_arranged[i]) / norm)
az_pump = np.stack(az_pump)
az_unpump = np.stack(az_unpump)
coords = {'scan_variable': self.binned['scan_variable'], 'distance': ai.distance}
azimuthal = xr.DataArray(az_pump, dims=['scan_variable', 'distance'], coords=coords)
azimuthal = azimuthal.to_dataset(name='pumped')
azimuthal['unpumped'] = xr.DataArray(az_unpump, dims=['scan_variable', 'distance'], coords=coords)
azimuthal = azimuthal.transpose('distance', 'scan_variable')
#t0 = 225.5
#azimuthal['delay'] = (t0 - azimuthal.delay)*6.6
#azimuthal['delay'] = azimuthal.delay
azimuthal['delta_q (1/nm)'] = 2e-9 * np.pi * np.sin(
np.arctan(azimuthal.distance * self.px_pitch_v*1e-6 / self.distance)) / wl
azimuthal.attrs = self.binned.attrs
self.azimuthal = azimuthal.swap_dims({'distance': 'delta_q (1/nm)'})
def plot_azimuthal_int(self):
""" Plot a computed azimuthal integration.
"""
fig, [ax1, ax2] = plt.subplots(nrows=2, sharex=True, sharey=True)
xr.plot.imshow(self.azimuthal.pumped, ax=ax1, robust=True)
ax1.set_title('unpumped')
ax1.set_xlabel(self.scan_vname)
xr.plot.imshow(self.azimuthal.pumped - self.azimuthal.unpumped, ax=ax2, robust=True)
ax2.set_title('pumped - unpumped')
ax2.set_xlabel(self.scan_vname)
fig.suptitle(f'{self.plot_title}')
def plot_azimuthal_line_cut(self, data, qranges, qwidths):
""" Plot line scans on top of the data.
inputs:
data: an azimuthal integrated xarray DataArray with 'delta_q (1/nm)' as one of its dimension.
qranges: a list of q-range
qwidth: a list of q-width, same length as qranges
"""
fig, [ax1, ax2] = plt.subplots(nrows=2, sharex=True, figsize=[8, 7])
xr.plot.imshow(data, ax=ax1, robust=True)
# attributes are not propagated during xarray mathematical operation https://github.com/pydata/xarray/issues/988
# so we might not have in data the scan vaiable name anymore
ax1.set_xlabel(self.scan_vname)
fig.suptitle(f'{self.plot_title}')
for i, (qr, qw) in enumerate(zip(qranges, qwidths)):
sel = (data['delta_q (1/nm)'] > (qr - qw/2)) * (data['delta_q (1/nm)'] < (qr + qw/2))
val = data.where(sel).mean('delta_q (1/nm)')
ax2.plot(data.scan_variable, val, c=f'C{i}', label=f'q = {qr:.2f}')
ax1.axhline(qr - qw/2, c=f'C{i}', lw=1)
ax1.axhline(qr + qw/2, c=f'C{i}', lw=1)
ax2.legend()
ax2.set_xlabel(self.scan_vname)
# since 'self' is not pickable, this function has to be outside the DSSC class so that it can be used
# by the multiprocessing pool.map function
def process_one_module(job):
module = job['module']
fpt = job['fpt']
data_vdf = job['vdf_module']
scan_vdf = job['vdf_scan']
chunksize = job['chunksize']
nbunches = job['nbunches']
image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data'
npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count'
with h5py.File(data_vdf, 'r') as m:
all_trainIds = m['INDEX/trainId'][()]
n_trains = len(all_trainIds)
chunk_start = np.arange(n_trains, step=chunksize, dtype=int)
# load scan variable
scan = xr.open_dataset(scan_vdf, group='data')['scan_variable']
scan.name = 'scan'
len_scan = len(scan.groupby(scan))
# create empty dataset to add actual data to
module_data = xr.DataArray(np.empty([len_scan, 128, 512], dtype=np.float64),
dims=['scan_variable', 'x', 'y'],
coords={'scan_variable': np.unique(scan)})
module_data = module_data.to_dataset(name='pumped')
module_data['unpumped'] = xr.full_like(module_data['pumped'], 0)
module_data['sum_count'] = xr.DataArray(np.zeros_like(np.unique(scan)), dims=['scan_variable'])
module_data['module'] = module
# crunching
with h5py.File(data_vdf, 'r') as m:
#fpt_calc = int(len(m[image_path]) / n_trains)
#assert fpt_calc == fpt, f'data length does not match expected value (module {module})'
all_trainIds = m['INDEX/trainId'][()]
frames_per_train = m[npulse_path][()]
trains_with_data = all_trainIds[frames_per_train == fpt]
#print(np.unique(pulses_per_train), '/', fpt)
#print(len(trains_with_data))
chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int)
trains_start = 0
# This line is the strange hack from https://github.com/tqdm/tqdm/issues/485
print(' ', end='', flush=True)
for c0 in tqdm(chunk_start, desc=f'pool.map#{module:02d}', position=module):
chunk_dssc = np.s_[int(c0 * fpt):int((c0 + chunksize) * fpt)] # for dssc data
data = m[image_path][chunk_dssc].squeeze()
data = data.astype(np.float64)
n_trains = int(data.shape[0] // fpt)
trainIds_chunk = np.unique(trains_with_data[trains_start:trains_start + n_trains])
trains_start += n_trains
n_trains_actual = len(trainIds_chunk)
coords = {'trainId': trainIds_chunk}
data = np.reshape(data, [n_trains_actual, fpt, 128, 512])[:, :int(2 * nbunches)]
data = xr.DataArray(data, dims=['trainId', 'pulse', 'x', 'y'], coords=coords)
data_pumped = (data[:, ::4]).mean('pulse')
data_unpumped = (data[:, 2::4]).mean('pulse')
data = data_pumped.to_dataset(name='pumped')
data['unpumped'] = data_unpumped
data['sum_count'] = xr.DataArray(np.ones(n_trains_actual), dims=['trainId'], coords=coords)
# grouping and summing
data['scan_variable'] = scan # this only adds scan data for matching trainIds
data = data.dropna('trainId')
data = data.groupby('scan_variable').sum('trainId')
where = {'scan_variable': data.scan_variable}
for var in ['pumped', 'unpumped', 'sum_count']:
module_data[var].loc[where] = module_data[var].loc[where] + data[var]
for var in ['pumped', 'unpumped']:
module_data[var] = module_data[var] / module_data.sum_count
#module_data = module_data.drop('sum_count')
return module_data
Loading