Skip to content
Snippets Groups Projects
Commit 6740eceb authored by Cammille Carinan's avatar Cammille Carinan Committed by Laurent Mercadier
Browse files

Base knife-edge scan analysis implementation

parent 023a300b
No related branches found
No related tags found
No related merge requests found
from . import knife_edge as knife_edge_module
from .knife_edge import *
__all__ = knife_edge_module.__all__
import numpy as np
from scipy import special
from scipy.optimize import curve_fit
__all__ = ['knife_edge', 'knife_edge_base']
def knife_edge(positions, intensities, axisRange=None, p0=None):
"""
Calculates the beam radius at 1/e^2 from a knife-edge scan by
fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2
and x0 the beam center.
Parameters
----------
positions : np.ndarray
Motor position values, typically 1D
intensities : np.ndarray
Intensity values, could be either 1D or 2D, with the number or rows
equivalent with the position size
axisRange : sequence of two floats or None
Edges of the scanning axis between which to apply the fit.
p0 : list of floats, numpy 1D array
Initial parameters used for the fit: x0, w0, a, b. If None, a beam
radius of 100 um is assumed.
Returns
-------
width : float
The beam radius at 1/e^2
std : float
The standard deviation of the width
"""
popt, pcov = knife_edge_base(positions, intensities,
axisRange=axisRange, p0=p0)
width, std = 0, 0
if popt is not None and pcov is not None:
width, std = np.abs(popt[1]), pcov[1, 1]**0.5
return width, std
def knife_edge_base(positions, intensities, axisRange=None, p0=None):
"""
The base implementation of the knife-edge scan analysis.
Calculates the beam radius at 1/e^2 from a knife-edge scan by
fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2
and x0 the beam center.
Parameters
----------
positions : np.ndarray
Motor position values, typically 1D
intensities : np.ndarray
Intensity values, could be either 1D or 2D, with the number or rows
equivalent with the position size
axisRange : sequence of two floats or None
Edges of the scanning axis between which to apply the fit.
p0 : list of floats, numpy 1D array
Initial parameters used for the fit: x0, w0, a, b. If None, a beam
radius of 100 um is assumed.
Returns
-------
popt : sequence of floats or None
The parameters of the resulting fit.
pcov : sequence of floats
The covariance matrix of the resulting fit.
"""
# Prepare arrays
positions, intensities = prepare_arrays(positions, intensities,
xRange=axisRange)
# Estimate initial fitting params
if p0 is None:
p0 = [np.mean(positions), 0.1, np.max(intensities) / 2, 0]
# Fit
popt, pcov = function_fit(erfc, positions, intensities, p0=p0)
return popt, pcov
def function_fit(func, x, y, **kwargs):
"""A wrapper for scipy.optimize curve_fit()
"""
# Fit
try:
popt, pcov = curve_fit(func, x, y, **kwargs)
except (TypeError, RuntimeError) as err:
print("Fit did not converge:", err)
popt, pcov = (None, None)
return popt, pcov
def prepare_arrays(arrX: np.ndarray, arrY: np.ndarray,
xRange=None, yRange=None):
"""
Preprocessing of the input x and y arrays.
This involves the following steps.
1. Converting the arrays to 1D of the same size
2. Select the ranges from the input x- and y-ranges
3. Retrieve finite values.
"""
# Convert both arrays to 1D of the same size
arrX, arrY = arrays_to1d(arrX, arrY)
# Select ranges
if xRange is not None:
low, high = xRange
if low == high:
raise ValueError('The supplied xRange is not a valid range.')
mask_ = range_mask(arrX, low, high)
arrX = arrX[mask_]
arrY = arrY[mask_]
if yRange is not None:
low, high = yRange
if low == high:
raise ValueError('The supplied xRange is not a valid range.')
mask_ = range_mask(arrY, low, high)
arrX = arrX[mask_]
arrY = arrY[mask_]
# Clean both arrays by only getting finite values
finite_idx = np.isfinite(arrX) & np.isfinite(arrY)
arrX = arrX[finite_idx]
arrY = arrY[finite_idx]
return arrX, arrY
def arrays_to1d(arrX: np.ndarray, arrY: np.ndarray):
"""Flatten two arrays and matches their sizes
"""
assert arrX.shape[0] == arrY.shape[0]
arrX, arrY = arrX.flatten(), arrY.flatten()
if len(arrX) > len(arrY):
arrY = np.repeat(arrY, len(arrX) // len(arrY))
else:
arrX = np.repeat(arrX, len(arrY) // len(arrX))
return arrX, arrY
def range_mask(array, minimum=None, maximum=None):
"""Retrieve the resulting array from the given minimum and maximum
"""
default = np.ones(array.shape, dtype=np.bool)
min_slice, max_slice = default, default
if minimum is not None:
if minimum > np.nanmax(array):
raise ValueError('The range minimum is too large.')
min_slice = array >= minimum
if maximum is not None:
if maximum < np.nanmin(array):
raise ValueError('The range maximum is too small.')
max_slice = array <= maximum
return min_slice & max_slice
def erfc(x, x0, w0, a, b):
return a * special.erfc(np.sqrt(2) * (x - x0) / w0) + b
import numpy as np
import pytest
from ..knife_edge import erfc, knife_edge_base, prepare_arrays, range_mask
def test_range_mask():
arr = np.array([1, 2, 3, 4, 5])
# Exact
slice_ = range_mask(arr, minimum=2, maximum=4)
np.testing.assert_array_equal(slice_, [False, True, True, True, False])
# Range exceeds the closest values
slice_ = range_mask(arr, minimum=1.75, maximum=4.25)
np.testing.assert_array_equal(slice_, [False, True, True, True, False])
# Range misses the closest values
slice_ = range_mask(arr, minimum=2.25, maximum=3.75)
np.testing.assert_array_equal(slice_, [False, False, True, False, False])
# Equidistant
slice_ = range_mask(arr, minimum=2.5, maximum=4.5)
np.testing.assert_array_equal(slice_, [False, False, True, True, False])
# Out of bounds, valid minimum
slice_ = range_mask(arr, minimum=0)
np.testing.assert_array_equal(slice_, [True, True, True, True, True])
# Out of bounds, invalid minimum
with pytest.raises(ValueError):
range_mask(arr, minimum=6)
# Out of bounds, valid maximum
slice_ = range_mask(arr, maximum=6)
np.testing.assert_array_equal(slice_, [True, True, True, True, True])
# Out of bounds, invalid minimum
with pytest.raises(ValueError):
range_mask(arr, maximum=0)
# with NaNs
arr = np.array([1, np.nan, 3, np.nan, 5])
slice_ = range_mask(arr, minimum=3)
np.testing.assert_array_equal(slice_, [False, False, True, False, True])
def test_prepare_arrays_nans():
# Setup test values
trains, pulses = 5, 10
size = trains * pulses
motor = np.arange(trains)
signal = np.random.randint(100, size=(trains, pulses))
# Test finite motor and signal values
positions, intensities = prepare_arrays(motor, signal)
assert positions.shape == (size,)
assert intensities.shape == (size,)
# Test finite motors and signals with NaNs
signal_nan = _with_values(signal, value=np.nan, num=20)
positions, intensities = prepare_arrays(motor, signal_nan)
assert positions.shape == (size-20,)
assert np.isfinite(positions).all()
assert intensities.shape == (size-20,)
assert np.isfinite(intensities).all()
# Test finite signals and motors with NaNs
motor_nan = _with_values(motor, value=np.nan, num=3)
positions, intensities = prepare_arrays(motor_nan, signal)
assert positions.shape == ((trains-3) * pulses,)
assert np.isfinite(positions).all()
assert intensities.shape == ((trains-3) * pulses,)
assert np.isfinite(intensities).all()
def test_prepare_arrays_size():
trains, pulses = 5, 10
size = trains * pulses
motor = np.arange(trains)
signal = np.random.randint(100, size=(trains, pulses))
# Test finite motor and 2D signals
positions, intensities = prepare_arrays(motor, signal)
assert positions.shape == (size,)
assert intensities.shape == (size,)
# Test finite motor and 1D signals
positions, intensities = prepare_arrays(motor, signal[:, 0])
assert positions.shape == (trains,)
assert intensities.shape == (trains,)
def test_prepare_arrays_range():
trains, pulses = 5, 10
motor = np.arange(trains)
signal = np.random.randint(100, size=(trains, pulses))
# Test valid range, inside bounds
positions, intensities = prepare_arrays(motor, signal, xRange=[2, 4])
assert (positions.min(), positions.max()) == (2, 4)
unique = np.unique(positions)
np.testing.assert_array_equal(unique, [2, 3, 4])
assert intensities.shape == (unique.size * pulses,)
# Test invalid ranges
with pytest.raises(ValueError):
prepare_arrays(motor, signal, xRange=[5, 3])
with pytest.raises(ValueError):
prepare_arrays(motor, signal, xRange=[3, 3])
def test_knife_edge_base():
p0 = [0, -1.5, 1, 0]
x = np.linspace(-3, 3)
y = erfc(x, *p0)
noise = y * np.random.normal(0, .02, y.shape) # 2% error
eff_y = y + noise
# Test noisy data
popt, _ = knife_edge_base(x, eff_y)
np.testing.assert_allclose(p0, popt, atol=1e-1)
# Test flipped data
popt, _ = knife_edge_base(x, eff_y[::-1])
p0[1] = abs(p0[1]) # Absolute value when flipped
np.testing.assert_allclose(p0, popt, atol=1e-1)
def _with_values(array, value, num=5):
copy = array.astype(np.float)
copy.ravel()[np.random.choice(copy.size, num, replace=False)] = value
return copy
""" Toolbox for SCS. """ Toolbox for SCS.
Various utilities function to quickly process data measured Various utilities function to quickly process data measured
at the SCS instruments. at the SCS instrument.
Copyright (2019-) SCS Team. Copyright (2019-) SCS Team.
""" """
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from scipy.special import erfc from toolbox_scs.base.knife_edge import knife_edge_base, erfc, arrays_to1d
from scipy.optimize import curve_fit
import bisect
__all__ = [ __all__ = [
'knife_edge' 'knife_edge'
] ]
def knife_edge(ds, axisKey='scannerX', def knife_edge(ds, axisKey='scannerX', signalKey='FastADC4peaks',
signalKey='FastADC4peaks', axisRange=[None, None], p0=None, full=False, plot=False,
axisRange=[None, None], p0=None, display=False):
full=False, plot=False):
""" """
Calculates the beam radius at 1/e^2 from a knife-edge scan by Calculates the beam radius at 1/e^2 from a knife-edge scan by
fitting with erfc function: f(a,b,u) = a*erfc(u) + b or fitting with erfc function:
f(a,b,u) = a*erfc(-u) + b where u = sqrt(2)*(x-x0)/w0 with w0 f(x, x0, w0, a, b) = a*erfc(np.sqrt(2)*(x-x0)/w0) + b
the beam radius at 1/e^2 and x0 the beam center. with w0 the beam radius at 1/e^2 and x0 the beam center.
Parameters Parameters
---------- ----------
...@@ -38,103 +35,62 @@ def knife_edge(ds, axisKey='scannerX', ...@@ -38,103 +35,62 @@ def knife_edge(ds, axisKey='scannerX',
edges of the scanning axis between which to apply the fit. edges of the scanning axis between which to apply the fit.
p0: list of floats, numpy 1D array p0: list of floats, numpy 1D array
initial parameters used for the fit: x0, w0, a, b. If None, a beam initial parameters used for the fit: x0, w0, a, b. If None, a beam
radius of 100 um is assumed. radius of 100 micrometers is assumed.
full: bool full: bool
If False, returns the beam radius and standard error. If False, returns the beam radius and standard error.
If True, returns the popt, pcov list of parameters and covariance If True, returns the popt, pcov list of parameters and covariance
matrix from scipy.optimize.curve_fit as well as the fitting function. matrix from scipy.optimize.curve_fit.
plot: bool plot: bool
If True, plots the data and the result of the fit. If True, plots the data and the result of the fit. Default is False.
display: bool
If True, displays info on the fit. True when plot is True, default is
False.
Returns Returns
------- -------
If full is False, ndarray with beam radius at 1/e^2 in mm and standard If full is False, tuple with beam radius at 1/e^2 in mm and standard
error from the fit in mm. If full is True, returns parameters and error from the fit in mm. If full is True, returns parameters and
covariance matrix from scipy.optimize.curve_fit function. covariance matrix from scipy.optimize.curve_fit function.
""" """
def stepUp(x, x0, w0, a, b): popt, pcov = knife_edge_base(ds[axisKey].values, ds[signalKey].values,
return a*erfc(-np.sqrt(2)*(x-x0)/w0) + b axisRange=axisRange, p0=p0)
if plot:
def stepDown(x, x0, w0, a, b): positions, intensities = arrays_to1d(ds[axisKey].values,
return a*erfc(np.sqrt(2)*(x-x0)/w0) + b ds[signalKey].values)
plot_knife_edge(positions, intensities, popt, pcov[1, 1]**0.5,
# get the number of pulses per train from the signal source: ds.attrs['runFolder'], axisKey, signalKey)
dim = [k for k in ds[signalKey].dims if k != 'trainId'][0] display = True
# duplicate motor position values to match signal shape
# this is faster than using ds.stack()
positions = np.repeat(ds[axisKey].values,
len(ds[dim])).astype(ds[signalKey].dtype)
# sort the data to decide which fitting function to use
sortIdx = np.argsort(positions)
positions = positions[sortIdx]
intensities = ds[signalKey].values.flatten()[sortIdx]
if axisRange[0] is None or axisRange[0] < positions[0]:
idxMin = 0
else:
if axisRange[0] >= positions[-1]:
raise ValueError('The minimum value of axisRange is too large')
idxMin = bisect.bisect(positions, axisRange[0])
if axisRange[1] is None or axisRange[1] > positions[-1]:
idxMax = None
else:
if axisRange[1] <= positions[0]:
raise ValueError('The maximum value of axisRange is too small')
idxMax = bisect.bisect(positions, axisRange[1]) + 1
pos_sel = positions[idxMin:idxMax]
int_sel = intensities[idxMin:idxMax]
no_nan = ~np.isnan(int_sel)
pos_sel = pos_sel[no_nan]
int_sel = int_sel[no_nan]
# estimate a linear slope fitting the data to determine which function if display:
# to fit
slope = np.cov(pos_sel, int_sel)[0][1]/np.var(pos_sel)
if slope < 0:
func = stepDown
funcStr = 'a*erfc(np.sqrt(2)*(x-x0)/w0) + b' funcStr = 'a*erfc(np.sqrt(2)*(x-x0)/w0) + b'
else:
func = stepUp
funcStr = 'a*erfc(-np.sqrt(2)*(x-x0)/w0) + b'
if p0 is None:
p0 = [np.mean(pos_sel), 0.1, np.max(int_sel)/2, 0]
try:
popt, pcov = curve_fit(func, pos_sel, int_sel, p0=p0)
print('fitting function:', funcStr) print('fitting function:', funcStr)
print('w0 = (%.1f +/- %.1f) um' % (popt[1]*1e3, pcov[1, 1]**0.5*1e3)) print('w0 = (%.1f +/- %.1f) um' % (np.abs(popt[1])*1e3,
pcov[1, 1]**0.5*1e3))
print('x0 = (%.3f +/- %.3f) mm' % (popt[0], pcov[0, 0]**0.5)) print('x0 = (%.3f +/- %.3f) mm' % (popt[0], pcov[0, 0]**0.5))
print('a = %e +/- %e ' % (popt[2], pcov[2, 2]**0.5)) print('a = %e +/- %e ' % (popt[2], pcov[2, 2]**0.5))
print('b = %e +/- %e ' % (popt[3], pcov[3, 3]**0.5)) print('b = %e +/- %e ' % (popt[3], pcov[3, 3]**0.5))
fitSuccess = True
except Exception as e:
print(f'Could not fit the data with erfc function: {e}.' +
' Try adjusting the axisRange and the initial parameters p0')
fitSuccess = False
if plot:
plt.figure(figsize=(7, 4))
plt.scatter(positions, intensities, color='C1',
label='exp', s=2, alpha=0.1)
if fitSuccess:
xfit = np.linspace(positions.min(), positions.max(), 1000)
yfit = func(xfit, *popt)
plt.plot(xfit, yfit, color='C4',
label=r'fit $\rightarrow$ $w_0=$(%.1f $\pm$ %.1f) $\mu$m' % (
popt[1]*1e3, pcov[1, 1]**0.5*1e3))
leg = plt.legend()
for lh in leg.legendHandles:
lh.set_alpha(1)
plt.ylabel(signalKey)
plt.xlabel(axisKey + ' position [mm]')
plt.title(ds.attrs['runFolder'])
plt.tight_layout()
if full: if full:
if fitSuccess: return popt, pcov
return popt, pcov, func
else:
return np.zeros(4), np.zeros(2), None
else: else:
if fitSuccess: return np.abs(popt[1]), pcov[1, 1]**0.5
return np.array([popt[1], pcov[1, 1]**0.5])
else:
return np.zeros(2) def plot_knife_edge(positions, intensities, fit_params, rel_err, title,
axisKey, signalKey):
plt.figure(figsize=(7, 4))
plt.scatter(positions, intensities, color='C1',
label='measured', s=2, alpha=0.1)
xfit = np.linspace(positions.min(), positions.max(), 1000)
yfit = erfc(xfit, *fit_params)
plt.plot(xfit, yfit, color='C4',
label=r'fit $\rightarrow$ $w_0=$(%.1f $\pm$ %.1f) $\mu$m' % (
np.abs(fit_params[1])*1e3, rel_err*1e3))
leg = plt.legend()
for lh in leg.legendHandles:
lh.set_alpha(1)
plt.ylabel(signalKey)
plt.xlabel(axisKey + ' position [mm]')
plt.title(title)
plt.tight_layout()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment