Skip to content
Snippets Groups Projects
Commit 649fa4e7 authored by Loïc Le Guyader's avatar Loïc Le Guyader
Browse files

adds multiprocessing binning of DSSC data

parent fa56dcb6
No related branches found
No related tags found
No related merge requests found
DSSC.py 0 → 100644
import multiprocessing
from time import strftime
import tempfile
import shutil
from tqdm.auto import tqdm
import os
import warnings
import karabo_data as kd
from karabo_data.geometry2 import DSSC_1MGeometry
import ToolBox as tb
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
import h5py
class DSSC:
def __init__(self, semester, proposal, topic='SCS'):
self.semester = semester
self.proposal = proposal
self.topic = topic
self.tempdir = None
self.save_folder = f'/gpfs/exfel/exp/{self.topic}/{self.semester}/{self.proposal}/usr/condensed_runs/'
print('DSSC configuration')
print(f'Topic: {self.topic}')
print(f'Semester: {self.semester}')
print(f'Proposal: {self.proposal}')
print(f'Default save folder: {self.save_folder}')
if not os.path.exists(self.save_folder):
warnings.warn(f'Default save folder does not exist: {self.save_folder}')
def __del__(self):
# deleting temporay folder
if self.tempdir:
shutil.rmtree(self.tempdir)
def open_run(self, run_nr, isDark=False):
print('Opening run data with karabo-data')
self.run_nr = run_nr
self.xgm = None
self.scan_vname = None
self.run = kd.open_run(self.proposal, self.run_nr)
self.isDark = isDark
self.plot_title = f'{self.semester} run: {self.run_nr}'
self.fpt = self.run.detector_info('SCS_DET_DSSC1M-1/DET/0CH0:xtdf')['frames_per_train']
self.nbunches = self.run.get_array('SCS_RR_UTC/MDL/BUNCH_DECODER', 'sase3.nPulses.value')
self.nbunches = np.unique(self.nbunches)
if len(self.nbunches) == 1:
self.nbunches = self.nbunches[0]
else:
warnings.warn('not all trains have same length DSSC data')
print(f'nbunches: {self.nbunches}')
self.nbunches = self.nbunches[-1]
print(f'DSSC frames per train: {self.fpt}')
print(f'SA3 bunches per train: {self.nbunches}')
if self.tempdir is not None:
shutil.rmtree(self.tempdir)
self.tempdir = tempfile.mkdtemp()
print(f'Temporary directory: {self.tempdir}')
print('Creating virtual dataset')
self.vdslist = self.create_virtual_dssc_datasets(self.run, path=self.tempdir)
# create a dummy scan variable for dark run
# for other type or run, use DSSC.define_run function to overwrite it
self.scan = xr.DataArray(np.ones_like(self.run.train_ids), dims=['trainId'],
coords={'trainId': self.run.train_ids})
self.scan_vname = 'dummy'
self.vds_scan = None
def define_scan(self, vname, bins):
"""
vname: variable name for the scan, can be a mnemonic string from ToolBox
or a dictionnary with ['source', 'key'] fields
bins: step size or bins
"""
if type(vname) is dict:
self.scan = self.run.get_array(vname['source'], vname['key'])
elif type(vname) is str:
if vname not in tb.mnemonics:
raise ValueError(f'{vname} not found in the ToolBox mnemonics table')
self.scan = self.run.get_array(tb.mnemonics[vname]['source'], tb.mnemonics[vname]['key'])
else:
raise ValueError(f'vname should be a string or a dict. We got {type(vname)}')
if (type(bins) is int) or (type(bins) is float):
self.scan = bins * np.round(self.scan / bins)
else:
# TODO: digitize the data
raise ValueError(f'To be implemented')
self.scan_vname = vname
self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
if os.path.isfile(self.vds_scan):
os.remove(self.vds_scan)
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan['xgm_pumped'] = self.xgm[:, :self.nbunches:2].mean('dim_0')
self.scan['xgm_unpumped'] = self.xgm[:, 1:self.nbunches:2].mean('dim_0')
self.scan.to_netcdf(self.vds_scan, group='data')
self.scan_counts = xr.DataArray(np.ones(len(self.scan['scan_variable'])),
dims=['scan_variable'],
coords={'scan_variable': self.scan['scan_variable'].values},
name='counts')
fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=[5, 5])
ax1.plot(self.scan.groupby('scan_variable').mean('trainId').coords['scan_variable'].values,
self.scan_counts.groupby('scan_variable').sum(),
'o-', ms=2)
ax1.set_xlabel('scan variable')
ax1.set_ylabel('# trains')
ax1.set_title(self.plot_title)
ax2.plot(self.scan['scan_variable'])
ax2.set_xlabel('train #')
ax2.set_ylabel(f'{vname}')
plt.tight_layout()
def load_xgm(self):
if self.xgm is None:
self.xgm = self.run.get_array(tb.mnemonics['SCS_SA3']['source'],
tb.mnemonics['SCS_SA3']['key'], roi=kd.by_index[:self.nbunches])
def plot_xgm_hist(self, nbins):
if self.xgm is None:
self.load_xgm()
hist, bins_edges = np.histogram(self.xgm, nbins, density=True)
width = 1.0 * (bins_edges[1] - bins_edges[0])
bins_center = 0.5*(bins_edges[:-1] + bins_edges[1:])
plt.figure()
plt.bar(bins_center, hist, align='center', width=width)
plt.xlabel(f"{tb.mnemonics['SCS_SA3']['source']}{tb.mnemonics['SCS_SA3']['key']}")
plt.ylabel('density')
plt.title(self.plot_title)
plt.tight_layout()
def xgm_filter(self, xgm_low=-np.inf, xgm_high=np.inf):
if self.xgm is None:
self.load_xgm()
if self.isDark:
warnings.warn(f'This run was loaded as dark. Filtering on xgm makes no sense. Aborting')
return
self.xgm_low = xgm_low
self.xgm_high = xgm_high
valid = ((self.xgm > self.xgm_low) * (self.xgm < self.xgm_high)).prod('dim_0').astype(bool)
xgm_valid = self.xgm.where(valid)
xgm_valid = xgm_valid.dropna('trainId')
self.scan = self.scan.sel({'trainId': xgm_valid.trainId})
nrejected = len(self.run.train_ids) - len(self.scan)
print((f'Rejecting {nrejected} out of {len(self.run.train_ids)} trains due to xgm '
f'thresholds: [{self.xgm_low}, {self.xgm_high}]'))
def load_geom(self):
quad_pos = [
(-124.100, 3.112), # TR
(-133.068, -110.604), # BR
( 0.988, -125.236), # BL
( 4.528, -4.912) # TL
]
path = '/gpfs/exfel/sw/software/exfel_environments/misc/git/karabo_data/docs/dssc_geo_june19.h5'
geom = DSSC_1MGeometry.from_h5_file_and_quad_positions(path, quad_pos)
return geom
def create_virtual_dssc_datasets(self, run, path=''):
vds_list = []
for m in tqdm(range(16)):
vds_filename = os.path.join(path, f'dssc{m}_vds.h5')
if os.path.isfile(vds_filename):
os.remove(vds_filename)
module_vds = run.get_virtual_dataset(f'SCS_DET_DSSC1M-1/DET/{m}CH0:xtdf',
'image.data', filename=vds_filename)
vds_list.append([vds_filename, module_vds])
return vds_list
def crunch(self):
if self.vds_scan is None:
# probably a dark run with a dummy scan variable
self.vds_scan = os.path.join(self.tempdir, 'scan_variable.h5')
if os.path.isfile(self.vds_scan):
os.remove(self.vds_scan)
self.scan = self.scan.to_dataset(name='scan_variable')
self.scan.to_netcdf(self.vds_scan, group='data')
max_GB = 400
# max_GB / (8byte * 16modules * 128px * 512px * N_pulses)
self.chunksize = int(max_GB * 1024**3 // (8 * 16 * 128 * 512 * self.fpt))
print('processing', self.chunksize, 'trains per chunk')
jobs = []
for m in range(16):
jobs.append(dict(
module=m,
fpt=self.fpt,
vdf_module=os.path.join(self.tempdir, f'dssc{m}_vds.h5'),
chunksize=self.chunksize,
vdf_scan=self.vds_scan,
nbunches=self.nbunches,
run_nr=self.run_nr,
))
timestamp = strftime('%X')
print(f'start time: {timestamp}')
with multiprocessing.Pool(16) as pool:
module_data = pool.map(process_one_module, jobs)
print('finished:', strftime('%X'))
# rearange the multiprocessed data
self.module_data = xr.concat(module_data, dim='module')
self.module_data['run'] = self.run_nr
self.module_data = self.module_data.transpose('scan_variable', 'module', 'x', 'y')
self.module_data = xr.merge([self.module_data, self.scan.groupby('scan_variable').mean('trainId')])
self.module_data = self.module_data.squeeze()
def save(self, save_folder=None, overwrite=False):
if save_folder is None:
save_folder = this.save_folder
if self.isDark:
fname = f'run{self.run_nr}_el.h5' # no scan
else:
fname = f'run{self.run_nr}_by-delay_el.h5' # run with delay scan (change for other scan types!)
save_path = os.path.join(save_folder, fname)
file_exists = os.path.isfile(save_path)
if not file_exists or (file_exists and overwrite):
if file_exists:
warnings.warn(f'Overwriting file: {save_path}')
os.remove(save_path)
self.module_data.to_netcdf(save_path, group='data')
os.chmod(save_path, 0o664)
print('saving: ', save_path)
else:
print('file', save_path, 'exists and overwrite is False')
# since 'self' is not pickable, this function has to be outside the DSSC class so that it can be used
# by the multiprocessing pool.map function
def process_one_module(job):
module = job['module']
fpt = job['fpt']
data_vdf = job['vdf_module']
scan_vdf = job['vdf_scan']
chunksize = job['chunksize']
nbunches = job['nbunches']
image_path = f'INSTRUMENT/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/data'
npulse_path = f'INDEX/SCS_DET_DSSC1M-1/DET/{module}CH0:xtdf/image/count'
with h5py.File(data_vdf, 'r') as m:
all_trainIds = m['INDEX/trainId'][()]
n_trains = len(all_trainIds)
chunk_start = np.arange(n_trains, step=chunksize, dtype=int)
# load scan variable
scan = xr.open_dataset(scan_vdf, group='data')['scan_variable']
scan.name = 'scan'
len_scan = len(scan.groupby(scan))
# create empty dataset to add actual data to
module_data = xr.DataArray(np.empty([len_scan, 128, 512], dtype=np.float64),
dims=['scan_variable', 'x', 'y'],
coords={'scan_variable': np.unique(scan)})
module_data = module_data.to_dataset(name='pumped')
module_data['unpumped'] = xr.full_like(module_data['pumped'], 0)
module_data['sum_count'] = xr.DataArray(np.zeros_like(np.unique(scan)), dims=['scan_variable'])
module_data['module'] = module
# crunching
with h5py.File(data_vdf, 'r') as m:
#fpt_calc = int(len(m[image_path]) / n_trains)
#assert fpt_calc == fpt, f'data length does not match expected value (module {module})'
all_trainIds = m['INDEX/trainId'][()]
frames_per_train = m[npulse_path][()]
trains_with_data = all_trainIds[frames_per_train == fpt]
#print(np.unique(pulses_per_train), '/', fpt)
#print(len(trains_with_data))
chunk_start = np.arange(len(all_trainIds), step=chunksize, dtype=int)
trains_start = 0
# This line is the strange hack from https://github.com/tqdm/tqdm/issues/485
print(' ', end='', flush=True)
for c0 in tqdm(chunk_start, desc=f'pool.map#{module:02d}', position=module):
chunk_dssc = np.s_[int(c0 * fpt):int((c0 + chunksize) * fpt)] # for dssc data
data = m[image_path][chunk_dssc].squeeze()
data = data.astype(np.float64)
n_trains = int(data.shape[0] // fpt)
trainIds_chunk = np.unique(trains_with_data[trains_start:trains_start + n_trains])
trains_start += n_trains
n_trains_actual = len(trainIds_chunk)
coords = {'trainId': trainIds_chunk}
data = np.reshape(data, [n_trains_actual, fpt, 128, 512])[:, :int(2 * nbunches)]
data = xr.DataArray(data, dims=['trainId', 'pulse', 'x', 'y'], coords=coords)
data_pumped = (data[:, ::4]).mean('pulse')
data_unpumped = (data[:, 2::4]).mean('pulse')
data = data_pumped.to_dataset(name='pumped')
data['unpumped'] = data_unpumped
data['sum_count'] = xr.DataArray(np.ones(n_trains_actual), dims=['trainId'], coords=coords)
# grouping and summing
data['scan_variable'] = scan # this only adds scan data for matching trainIds
data = data.dropna('trainId')
data = data.groupby('scan_variable').sum('trainId')
where = {'scan_variable': data.scan_variable}
for var in ['pumped', 'unpumped', 'sum_count']:
module_data[var].loc[where] = module_data[var].loc[where] + data[var]
for var in ['pumped', 'unpumped']:
module_data[var] = module_data[var] / module_data.sum_count
#module_data = module_data.drop('sum_count')
return module_data
...@@ -3,3 +3,5 @@ from ToolBox.xgm import * ...@@ -3,3 +3,5 @@ from ToolBox.xgm import *
from ToolBox.XAS import * from ToolBox.XAS import *
from ToolBox.knife_edge import * from ToolBox.knife_edge import *
from ToolBox.Laser_utils import * from ToolBox.Laser_utils import *
from ToolBox.DSSC import DSSC
from ToolBox.azimuthal_integrator import azimuthal_integrator
class azimuthal_integrator(object):
def __init__(self, imageshape, center, polar_range, dr=2):
'''
Create a reusable integrator for repeated azimuthal integration of similar
images. Calculates array indices for a given parameter set that allows
fast recalculation.
Parameters
==========
imageshape : tuple of ints
The shape of the images to be integrated over.
center : tuple of ints
center coordinates in pixels
polar_range : tuple of ints
start and stop polar angle (in degrees) to restrict integration to wedges
dr : int, default 2
radial width of the integration slices. Takes non-square DSSC pixels into account.
Returns
=======
ai : azimuthal_integrator instance
Instance can directly be called with image data:
> az_intensity = ai(image)
radial distances and the polar mask are accessible as attributes:
> ai.distance
> ai.polar_mask
'''
self.shape = imageshape
cx, cy = center
sx, sy = imageshape
xcoord, ycoord = np.ogrid[:sx, :sy]
xcoord -= cx
ycoord -= cy
# distance from center, hexagonal pixel shape taken into account
dist_array = np.hypot(xcoord * 204 / 236, ycoord)
# array of polar angles
tmin, tmax = np.deg2rad(np.sort(polar_range)) % np.pi
polar_array = np.arctan2(xcoord, ycoord)
polar_array = np.mod(polar_array, np.pi)
self.polar_mask = (polar_array > tmin) * (polar_array < tmax)
self.maxdist = min(sx - cx, sy - cy)
ix, iy = np.indices(dimensions=(sx, sy))
self.index_array = np.ravel_multi_index((ix, iy), (sx, sy))
self.distance = np.array([])
self.flat_indices = []
for dist in range(dr, self.maxdist, dr):
ring_mask = self.polar_mask * (dist_array >= (dist - dr)) * (dist_array < dist)
self.flat_indices.append(self.index_array[ring_mask])
self.distance = np.append(self.distance, dist)
def __call__(self, image):
assert self.shape == image.shape, 'image shape does not match'
image_flat = image.flatten()
return np.array([np.nansum(image_flat[indices]) for indices in self.flat_indices])
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment