Skip to content
Snippets Groups Projects

DSSC multiprocessing analysis

Merged Loïc Le Guyader requested to merge crunchDSSC into master
Files
2
+ 248
22
@@ -5,11 +5,13 @@ import shutil
from tqdm.auto import tqdm
import os
import warnings
import psutil
import karabo_data as kd
from karabo_data.geometry2 import DSSC_1MGeometry
import ToolBox as tb
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import ImageGrid
import numpy as np
import xarray as xr
import h5py
@@ -18,13 +20,14 @@ from imageio import imread
class DSSC:
def __init__(self, semester, proposal, topic='SCS'):
def __init__(self, semester, proposal, topic='SCS', distance=1):
""" Create a DSSC object to process DSSC data.
inputs:
semester: semester string
proposal: proposal number string
topic: topic, by default SCS
distance: distance sample to DSSC detector in meter
"""
self.semester = semester
@@ -32,12 +35,19 @@ class DSSC:
self.topic = topic
self.tempdir = None
self.save_folder = f'/gpfs/exfel/exp/{self.topic}/{self.semester}/{self.proposal}/usr/condensed_runs/'
self.distance = distance
self.px_pitch_h = 236 # horizontal pitch in microns
self.px_pitch_v = 204 # vertical pitch in microns
self.aspect = self.px_pitch_v/self.px_pitch_h # aspect ratio of the DSSC images
self.geom = None
self.mask = None
print('DSSC configuration')
print(f'Topic: {self.topic}')
print(f'Semester: {self.semester}')
print(f'Proposal: {self.proposal}')
print(f'Default save folder: {self.save_folder}')
print(f'Sample to DSSC distance: {self.distance} m')
if not os.path.exists(self.save_folder):
warnings.warn(f'Default save folder does not exist: {self.save_folder}')
@@ -63,7 +73,7 @@ class DSSC:
self.run = kd.open_run(self.proposal, self.run_nr)
self.isDark = isDark
self.plot_title = f'{self.semester} run: {self.run_nr}'
self.plot_title = f'{self.proposal} run: {self.run_nr}'
self.fpt = self.run.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')['frames_per_train']
self.nbunches = self.run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
@@ -133,20 +143,27 @@ class DSSC:
dims=['scan_variable'],
coords={'scan_variable': self.scan['scan_variable'].values},
name='counts')
self.scan_points = self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values
self.scan_points_counts = self.scan_counts.groupby('scan_variable').sum()
self.plot_scan()
fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
ax1.plot(self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values,
self.scan_counts.groupby('scan_variable').sum(),
'o-', ms=2)
ax1.set_xlabel('scan variable')
def plot_scan(self):
""" Plot a previously defined scan to see the scan range and the statistics.
"""
if self.scan:
fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
else:
fig, ax1 = plt.subplots(nrows=1, figsize=[5, 2.5])
ax1.plot(self.scan_points, self.scan_points_counts, 'o-', ms=2)
ax1.set_xlabel(f'{self.scan_vname}')
ax1.set_ylabel('# trains')
ax1.set_title(self.plot_title)
ax2.plot(self.scan['scan_variable'])
ax2.set_xlabel('train #')
ax2.set_ylabel(f'{vname}')
plt.tight_layout()
if self.scan:
ax2.plot(self.scan['scan_variable'])
ax2.set_xlabel('train #')
ax2.set_ylabel(f'{self.scan_vname}')
def load_xgm(self):
""" Loads pulse resolved dedicated SAS3 data from the SCS XGM.
@@ -174,7 +191,6 @@ class DSSC:
plt.xlabel(f"{tb.mnemonics['SCS_SA3']['source']}{tb.mnemonics['SCS_SA3']['key']}")
plt.ylabel('density')
plt.title(self.plot_title)
plt.tight_layout()
def xgm_filter(self, xgm_low=-np.inf, xgm_high=np.inf):
""" Filters the data by train. If one pulse within a train has an SASE3 SCS XGM value below
@@ -213,14 +229,15 @@ class DSSC:
( 4.528, -4.912) # TL
]
path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
return geom
self.geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
return self.geom
def load_mask(self, fname):
def load_mask(self, fname, plot=True):
""" Load a DSSC mask file.
input:
fname: string of the filename of the mask file
plot: if True, the loaded mask is plotted
"""
@@ -228,6 +245,10 @@ class DSSC:
dssc_mask = dssc_mask.astype(float)[..., 0] // 255
dssc_mask[dssc_mask==0] = np.nan
self.mask = dssc_mask
if plot:
plt.figure()
plt.imshow(self.mask)
def create_virtual_dssc_datasets(self, run, path=''):
""" Create virtual datasets for each 16 DSSC modules used for the multiprocessing.
@@ -247,8 +268,8 @@ class DSSC:
vds_list.append([vds_filename, module_vds])
return vds_list
def crunch(self):
""" Crunch through the DSSC data using multiprocessing.
def binning(self):
""" Bin the DSSC data by the predifined scan type (DSSC.define()) using multiprocessing
"""
if self.vds_scan is None:
@@ -260,9 +281,12 @@ class DSSC:
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan.to_netcdf(self.vds_scan, group='data')
max_GB = 400
# get available memory in GB, we will try to use 80 % of it
max_GB = psutil.virtual_memory().available/1024**3
print(f'max available memory: {max_GB} GB')
# max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
self.chunksize = int(max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
self.chunksize = int(0.8*max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
print('processing', self.chunksize, 'trains per chunk')
@@ -293,6 +317,8 @@ class DSSC:
self.module_data = xr.merge([self.module_data, self.scan.groupby('scan_variable').mean('trainId')])
self.module_data = self.module_data.squeeze()
self.module_data.attrs['scan_variable'] = self.scan_vname
def save(self, save_folder=None, overwrite=False):
""" Save the crunched data.
@@ -305,9 +331,9 @@ class DSSC:
save_folder = this.save_folder
if self.isDark:
fname = f'run{self.run_nr}_el.h5' # no scan
fname = f'run{self.run_nr}_dark.h5' # no scan
else:
fname = f'run{self.run_nr}_by-delay_el.h5' # run with delay scan (change for other scan types!)
fname = f'run{self.run_nr}.h5' # run with delay scan (change for other scan types!)
save_path = os.path.join(save_folder, fname)
@@ -322,7 +348,207 @@ class DSSC:
print('saving: ', save_path)
else:
print('file', save_path, 'exists and overwrite is False')
def load_binned(self, runNB, dark_runNB, xgm_norm = True, save_folder=None):
""" load previously binned (crunched) DSSC data by DSSC.crunch() and DSSC.save()
inputs:
runNB: run number to load
dark_runNB: run number of the corresponding dark
xgm_norm: normlize by XGM data if True
save_folder: path string where the crunched data are saved
"""
if save_folder is None:
save_folder = self.save_folder
self.plot_title = f'{self.proposal} run: {runNB} dark: {dark_runNB}'
dark = xr.open_dataset(os.path.join(save_folder, f'run{dark_runNB}_dark.h5'), group='data')
binned = xr.open_dataset(os.path.join(save_folder, f'run{runNB}.h5'), group='data')
binned['pumped'] = (binned['pumped'] - dark['pumped'].values)
binned['unpumped'] = (binned['unpumped'] - dark['unpumped'].values)
if xgm_norm:
binned['pumped'] = binned['pumped'] / binned['xgm_pumped']
binned['unpumped'] = binned['unpumped'] / binned['xgm_unpumped']
self.scan_points = binned['scan_variable']
self.scan_points_counts = binned['sum_count'][:, 0]
self.scan_vname = binned.attrs['scan_variable']
self.scan = None
self.binned = binned
def plot_DSSC(self, use_mask = True, p_low = 1, p_high = 98, vmin = None, vmax = None):
""" Plot pumped and unpumped DSSC images.
inputs:
use_mask: if True, a mask is applied on the DSSC.
p_low: low percentile value to adjust the contrast scale on the unpumped and pumped image
p_high: high percentile value to adjust the contrast scale on the unpumped and pumped image
vmin: low value of the image scale
vmax: high value of the image scale
"""
if use_mask:
if self.mask is None:
raise ValueError('No mask was loaded !')
mask = self.mask
mask_txt = ' masked'
else:
mask = 1
mask_txt = ''
if self.geom is None:
self.load_geom()
im_pump_mean, _ = self.geom.position_modules_fast(self.binned['pumped'].mean('scan_variable'))
im_unpump_mean, _ = self.geom.position_modules_fast(self.binned['unpumped'].mean('scan_variable'))
im_pump_mean = mask*im_pump_mean
im_unpump_mean = mask*im_unpump_mean
fig = plt.figure(figsize=(9, 4))
grid = ImageGrid(fig, 111,
nrows_ncols=(1,2),
axes_pad=0.15,
share_all=True,
cbar_location="right",
cbar_mode="single",
cbar_size="7%",
cbar_pad=0.15,
)
_vmin, _vmax = np.percentile(im_pump_mean[~np.isnan(im_pump_mean)], [p_low, p_high])
if vmin is None:
vmin = _vmin
if vmax is None:
vmax = _vmax
im = grid[0].imshow(im_pump_mean, vmin=vmin, vmax=vmax, aspect=self.aspect)
grid[0].set_title('pumped' + mask_txt)
im = grid[1].imshow(im_unpump_mean, vmin=vmin, vmax=vmax, aspect=self.aspect)
grid[1].set_title('unpumped' + mask_txt)
grid[-1].cax.colorbar(im)
grid[-1].cax.toggle_label(True)
fig.suptitle(self.plot_title)
def azimuthal_int(self, wl, center=None, angle_range=[0, 360], dr=1, use_mask=True):
""" Perform azimuthal integration of 1D binned DSSC run.
inputs:
wl: photon wavelength
center: center of integration
angle_range: angles of integration
dr: dr
use_mask: if True, use the loaded mask
"""
if self.geom is None:
self.load_geom()
if use_mask:
if self.mask is None:
raise ValueError('No mask was loaded !')
mask = self.mask
mask_txt = ' masked'
else:
mask = 1
mask_txt = ''
im_pumped_arranged, c_geom = self.geom.position_modules_fast(self.binned['pumped'].values)
im_unpumped_arranged, c_geom = self.geom.position_modules_fast(self.binned['unpumped'].values)
im_pumped_arranged *= mask
im_unpumped_arranged *= mask
im_pumped_mean = im_pumped_arranged.mean(axis=0)
im_unpumped_mean = im_unpumped_arranged.mean(axis=0)
if center is None:
center = c_geom
ai = tb.azimuthal_integrator(im_pumped_mean.shape, center, angle_range, dr=dr)
norm = ai(~np.isnan(im_pumped_mean))
az_pump = []
az_unpump = []
for i in tqdm(range(len(self.binned['scan_variable']))):
az_pump.append(ai(im_pumped_arranged[i]) / norm)
az_unpump.append(ai(im_unpumped_arranged[i]) / norm)
az_pump = np.stack(az_pump)
az_unpump = np.stack(az_unpump)
coords = {'scan_variable': self.binned['scan_variable'], 'distance': ai.distance}
azimuthal = xr.DataArray(az_pump, dims=['scan_variable', 'distance'], coords=coords)
azimuthal = azimuthal.to_dataset(name='pumped')
azimuthal['unpumped'] = xr.DataArray(az_unpump, dims=['scan_variable', 'distance'], coords=coords)
azimuthal = azimuthal.transpose('distance', 'scan_variable')
#t0 = 225.5
#azimuthal['delay'] = (t0 - azimuthal.delay)*6.6
#azimuthal['delay'] = azimuthal.delay
azimuthal['delta_q (1/nm)'] = 2e-9 * np.pi * np.sin(
np.arctan(azimuthal.distance * self.px_pitch_v*1e-6 / self.distance)) / wl
azimuthal.attrs = self.binned.attrs
self.azimuthal = azimuthal.swap_dims({'distance': 'delta_q (1/nm)'})
def plot_azimuthal_int(self):
""" Plot a computed azimuthal integration.
"""
fig, [ax1, ax2] = plt.subplots(nrows=2, sharex=True, sharey=True)
xr.plot.imshow(self.azimuthal.pumped, ax=ax1, robust=True)
ax1.set_title('unpumped')
ax1.set_xlabel(self.scan_vname)
xr.plot.imshow(self.azimuthal.pumped - self.azimuthal.unpumped, ax=ax2, robust=True)
ax2.set_title('pumped - unpumped')
ax2.set_xlabel(self.scan_vname)
fig.suptitle(f'{self.plot_title}')
def plot_azimuthal_line_cut(self, data, qranges, qwidths):
""" Plot line scans on top of the data.
inputs:
data: an azimuthal integrated xarray DataArray with 'delta_q (1/nm)' as one of its dimension.
qranges: a list of q-range
qwidth: a list of q-width, same length as qranges
"""
fig, [ax1, ax2] = plt.subplots(nrows=2, sharex=True, figsize=[8, 7])
xr.plot.imshow(data, ax=ax1, robust=True)
# attributes are not propagated during xarray mathematical operation https://github.com/pydata/xarray/issues/988
# so we might not have in data the scan vaiable name anymore
ax1.set_xlabel(self.scan_vname)
fig.suptitle(f'{self.plot_title}')
for i, (qr, qw) in enumerate(zip(qranges, qwidths)):
sel = (data['delta_q (1/nm)'] > (qr - qw/2)) * (data['delta_q (1/nm)'] < (qr + qw/2))
val = data.where(sel).mean('delta_q (1/nm)')
ax2.plot(data.scan_variable, val, c=f'C{i}', label=f'q = {qr:.2f}')
ax1.axhline(qr - qw/2, c=f'C{i}', lw=1)
ax1.axhline(qr + qw/2, c=f'C{i}', lw=1)
ax2.legend()
ax2.set_xlabel(self.scan_vname)
# since 'self' is not pickable, this function has to be outside the DSSC class so that it can be used
# by the multiprocessing pool.map function
def process_one_module(job):
Loading