From 6740ecebefbd625f805ba83048a0710ef42c1dbf Mon Sep 17 00:00:00 2001
From: Cammille Carinan <cammille.carinan@xfel.eu>
Date: Fri, 4 Jun 2021 14:04:20 +0200
Subject: [PATCH] Base knife-edge scan analysis implementation

---
 src/toolbox_scs/base/__init__.py              |   5 +
 src/toolbox_scs/base/knife_edge.py            | 167 ++++++++++++++++++
 src/toolbox_scs/base/tests/__init__.py        |   0
 src/toolbox_scs/base/tests/test_knife_edge.py | 133 ++++++++++++++
 src/toolbox_scs/routines/knife_edge.py        | 140 +++++----------
 5 files changed, 353 insertions(+), 92 deletions(-)
 create mode 100644 src/toolbox_scs/base/__init__.py
 create mode 100644 src/toolbox_scs/base/knife_edge.py
 create mode 100644 src/toolbox_scs/base/tests/__init__.py
 create mode 100644 src/toolbox_scs/base/tests/test_knife_edge.py

diff --git a/src/toolbox_scs/base/__init__.py b/src/toolbox_scs/base/__init__.py
new file mode 100644
index 0000000..367a07d
--- /dev/null
+++ b/src/toolbox_scs/base/__init__.py
@@ -0,0 +1,5 @@
+from . import knife_edge as knife_edge_module
+from .knife_edge import *
+
+
+__all__ = knife_edge_module.__all__
diff --git a/src/toolbox_scs/base/knife_edge.py b/src/toolbox_scs/base/knife_edge.py
new file mode 100644
index 0000000..26b8ae5
--- /dev/null
+++ b/src/toolbox_scs/base/knife_edge.py
@@ -0,0 +1,167 @@
+import numpy as np
+from scipy import special
+from scipy.optimize import curve_fit
+
+
+__all__ = ['knife_edge', 'knife_edge_base']
+
+
+def knife_edge(positions, intensities, axisRange=None, p0=None):
+    """
+    Calculates the beam radius at 1/e^2 from a knife-edge scan by
+    fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
+    where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2
+    and x0 the beam center.
+
+    Parameters
+    ----------
+    positions : np.ndarray
+        Motor position values, typically 1D
+    intensities : np.ndarray
+        Intensity values, could be either 1D or 2D, with the number or rows
+        equivalent with the position size
+    axisRange : sequence of two floats or None
+        Edges of the scanning axis between which to apply the fit.
+    p0 : list of floats, numpy 1D array
+        Initial parameters used for the fit: x0, w0, a, b. If None, a beam
+        radius of 100 um is assumed.
+
+    Returns
+    -------
+    width : float
+        The beam radius at 1/e^2
+    std : float
+        The standard deviation of the width
+    """
+    popt, pcov = knife_edge_base(positions, intensities,
+                                 axisRange=axisRange, p0=p0)
+    width, std = 0, 0
+    if popt is not None and pcov is not None:
+        width, std = np.abs(popt[1]), pcov[1, 1]**0.5
+    return width, std
+
+
+def knife_edge_base(positions, intensities, axisRange=None, p0=None):
+    """
+    The base implementation of the knife-edge scan analysis.
+
+    Calculates the beam radius at 1/e^2 from a knife-edge scan by
+    fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
+    where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2
+    and x0 the beam center.
+
+    Parameters
+    ----------
+    positions : np.ndarray
+        Motor position values, typically 1D
+    intensities : np.ndarray
+        Intensity values, could be either 1D or 2D, with the number or rows
+        equivalent with the position size
+    axisRange : sequence of two floats or None
+        Edges of the scanning axis between which to apply the fit.
+    p0 : list of floats, numpy 1D array
+        Initial parameters used for the fit: x0, w0, a, b. If None, a beam
+        radius of 100 um is assumed.
+
+    Returns
+    -------
+    popt : sequence of floats or None
+        The parameters of the resulting fit.
+    pcov : sequence of floats
+        The covariance matrix of the resulting fit.
+    """
+    # Prepare arrays
+    positions, intensities = prepare_arrays(positions, intensities,
+                                            xRange=axisRange)
+
+    # Estimate initial fitting params
+    if p0 is None:
+        p0 = [np.mean(positions), 0.1, np.max(intensities) / 2, 0]
+
+    # Fit
+    popt, pcov = function_fit(erfc, positions, intensities, p0=p0)
+
+    return popt, pcov
+
+
+def function_fit(func, x, y, **kwargs):
+    """A wrapper for scipy.optimize curve_fit()
+    """
+    # Fit
+    try:
+        popt, pcov = curve_fit(func, x, y, **kwargs)
+    except (TypeError, RuntimeError) as err:
+        print("Fit did not converge:", err)
+        popt, pcov = (None, None)
+    return popt, pcov
+
+
+def prepare_arrays(arrX: np.ndarray, arrY: np.ndarray,
+                   xRange=None, yRange=None):
+    """
+    Preprocessing of the input x and y arrays.
+
+    This involves the following steps.
+    1. Converting the arrays to 1D of the same size
+    2. Select the ranges from the input x- and y-ranges
+    3. Retrieve finite values.
+    """
+    # Convert both arrays to 1D of the same size
+    arrX, arrY = arrays_to1d(arrX, arrY)
+
+    # Select ranges
+    if xRange is not None:
+        low, high = xRange
+        if low == high:
+            raise ValueError('The supplied xRange is not a valid range.')
+        mask_ = range_mask(arrX, low, high)
+        arrX = arrX[mask_]
+        arrY = arrY[mask_]
+    if yRange is not None:
+        low, high = yRange
+        if low == high:
+            raise ValueError('The supplied xRange is not a valid range.')
+        mask_ = range_mask(arrY, low, high)
+        arrX = arrX[mask_]
+        arrY = arrY[mask_]
+
+    # Clean both arrays by only getting finite values
+    finite_idx = np.isfinite(arrX) & np.isfinite(arrY)
+    arrX = arrX[finite_idx]
+    arrY = arrY[finite_idx]
+
+    return arrX, arrY
+
+
+def arrays_to1d(arrX: np.ndarray, arrY: np.ndarray):
+    """Flatten two arrays and matches their sizes
+    """
+    assert arrX.shape[0] == arrY.shape[0]
+    arrX, arrY = arrX.flatten(), arrY.flatten()
+    if len(arrX) > len(arrY):
+        arrY = np.repeat(arrY, len(arrX) // len(arrY))
+    else:
+        arrX = np.repeat(arrX, len(arrY) // len(arrX))
+    return arrX, arrY
+
+
+def range_mask(array, minimum=None, maximum=None):
+    """Retrieve the resulting array from the given minimum and maximum
+    """
+    default = np.ones(array.shape, dtype=np.bool)
+    min_slice, max_slice = default, default
+    if minimum is not None:
+        if minimum > np.nanmax(array):
+            raise ValueError('The range minimum is too large.')
+        min_slice = array >= minimum
+
+    if maximum is not None:
+        if maximum < np.nanmin(array):
+            raise ValueError('The range maximum is too small.')
+        max_slice = array <= maximum
+
+    return min_slice & max_slice
+
+
+def erfc(x, x0, w0, a, b):
+    return a * special.erfc(np.sqrt(2) * (x - x0) / w0) + b
diff --git a/src/toolbox_scs/base/tests/__init__.py b/src/toolbox_scs/base/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/toolbox_scs/base/tests/test_knife_edge.py b/src/toolbox_scs/base/tests/test_knife_edge.py
new file mode 100644
index 0000000..8b8d0a5
--- /dev/null
+++ b/src/toolbox_scs/base/tests/test_knife_edge.py
@@ -0,0 +1,133 @@
+import numpy as np
+import pytest
+
+from ..knife_edge import erfc, knife_edge_base, prepare_arrays, range_mask
+
+
+def test_range_mask():
+    arr = np.array([1, 2, 3, 4, 5])
+
+    # Exact
+    slice_ = range_mask(arr, minimum=2, maximum=4)
+    np.testing.assert_array_equal(slice_, [False, True, True, True, False])
+
+    # Range exceeds the closest values
+    slice_ = range_mask(arr, minimum=1.75, maximum=4.25)
+    np.testing.assert_array_equal(slice_, [False, True, True, True, False])
+
+    # Range misses the closest values
+    slice_ = range_mask(arr, minimum=2.25, maximum=3.75)
+    np.testing.assert_array_equal(slice_, [False, False, True, False, False])
+
+    # Equidistant
+    slice_ = range_mask(arr, minimum=2.5, maximum=4.5)
+    np.testing.assert_array_equal(slice_, [False, False, True, True, False])
+
+    # Out of bounds, valid minimum
+    slice_ = range_mask(arr, minimum=0)
+    np.testing.assert_array_equal(slice_, [True, True, True, True, True])
+
+    # Out of bounds, invalid minimum
+    with pytest.raises(ValueError):
+        range_mask(arr, minimum=6)
+
+    # Out of bounds, valid maximum
+    slice_ = range_mask(arr, maximum=6)
+    np.testing.assert_array_equal(slice_, [True, True, True, True, True])
+
+    # Out of bounds, invalid minimum
+    with pytest.raises(ValueError):
+        range_mask(arr, maximum=0)
+
+    # with NaNs
+    arr = np.array([1, np.nan, 3, np.nan, 5])
+    slice_ = range_mask(arr, minimum=3)
+    np.testing.assert_array_equal(slice_, [False, False, True, False, True])
+
+
+def test_prepare_arrays_nans():
+    # Setup test values
+    trains, pulses = 5, 10
+    size = trains * pulses
+    motor = np.arange(trains)
+    signal = np.random.randint(100, size=(trains, pulses))
+
+    # Test finite motor and signal values
+    positions, intensities = prepare_arrays(motor, signal)
+    assert positions.shape == (size,)
+    assert intensities.shape == (size,)
+
+    # Test finite motors and signals with NaNs
+    signal_nan = _with_values(signal, value=np.nan, num=20)
+    positions, intensities = prepare_arrays(motor, signal_nan)
+    assert positions.shape == (size-20,)
+    assert np.isfinite(positions).all()
+    assert intensities.shape == (size-20,)
+    assert np.isfinite(intensities).all()
+
+    # Test finite signals and motors with NaNs
+    motor_nan = _with_values(motor, value=np.nan, num=3)
+    positions, intensities = prepare_arrays(motor_nan, signal)
+    assert positions.shape == ((trains-3) * pulses,)
+    assert np.isfinite(positions).all()
+    assert intensities.shape == ((trains-3) * pulses,)
+    assert np.isfinite(intensities).all()
+
+
+def test_prepare_arrays_size():
+    trains, pulses = 5, 10
+    size = trains * pulses
+    motor = np.arange(trains)
+    signal = np.random.randint(100, size=(trains, pulses))
+
+    # Test finite motor and 2D signals
+    positions, intensities = prepare_arrays(motor, signal)
+    assert positions.shape == (size,)
+    assert intensities.shape == (size,)
+
+    # Test finite motor and 1D signals
+    positions, intensities = prepare_arrays(motor, signal[:, 0])
+    assert positions.shape == (trains,)
+    assert intensities.shape == (trains,)
+
+
+def test_prepare_arrays_range():
+    trains, pulses = 5, 10
+    motor = np.arange(trains)
+    signal = np.random.randint(100, size=(trains, pulses))
+
+    # Test valid range, inside bounds
+    positions, intensities = prepare_arrays(motor, signal, xRange=[2, 4])
+    assert (positions.min(), positions.max()) == (2, 4)
+    unique = np.unique(positions)
+    np.testing.assert_array_equal(unique, [2, 3, 4])
+    assert intensities.shape == (unique.size * pulses,)
+
+    # Test invalid ranges
+    with pytest.raises(ValueError):
+        prepare_arrays(motor, signal, xRange=[5, 3])
+    with pytest.raises(ValueError):
+        prepare_arrays(motor, signal, xRange=[3, 3])
+
+
+def test_knife_edge_base():
+    p0 = [0, -1.5, 1, 0]
+    x = np.linspace(-3, 3)
+    y = erfc(x, *p0)
+    noise = y * np.random.normal(0, .02, y.shape)  # 2% error
+    eff_y = y + noise
+
+    # Test noisy data
+    popt, _ = knife_edge_base(x, eff_y)
+    np.testing.assert_allclose(p0, popt, atol=1e-1)
+
+    # Test flipped data
+    popt, _ = knife_edge_base(x, eff_y[::-1])
+    p0[1] = abs(p0[1])  # Absolute value when flipped
+    np.testing.assert_allclose(p0, popt, atol=1e-1)
+
+
+def _with_values(array, value, num=5):
+    copy = array.astype(np.float)
+    copy.ravel()[np.random.choice(copy.size, num, replace=False)] = value
+    return copy
diff --git a/src/toolbox_scs/routines/knife_edge.py b/src/toolbox_scs/routines/knife_edge.py
index fccc153..1b22dc8 100644
--- a/src/toolbox_scs/routines/knife_edge.py
+++ b/src/toolbox_scs/routines/knife_edge.py
@@ -1,30 +1,27 @@
 """ Toolbox for SCS.
 
     Various utilities function to quickly process data measured
-    at the SCS instruments.
+    at the SCS instrument.
 
     Copyright (2019-) SCS Team.
 """
 import matplotlib.pyplot as plt
 import numpy as np
-from scipy.special import erfc
-from scipy.optimize import curve_fit
-import bisect
+from toolbox_scs.base.knife_edge import knife_edge_base, erfc, arrays_to1d
 
 __all__ = [
     'knife_edge'
 ]
 
 
-def knife_edge(ds, axisKey='scannerX',
-               signalKey='FastADC4peaks',
-               axisRange=[None, None], p0=None,
-               full=False, plot=False):
+def knife_edge(ds, axisKey='scannerX', signalKey='FastADC4peaks',
+               axisRange=[None, None], p0=None, full=False, plot=False,
+               display=False):
     """
     Calculates the beam radius at 1/e^2 from a knife-edge scan by
-    fitting with erfc function: f(a,b,u) = a*erfc(u) + b or
-    f(a,b,u) = a*erfc(-u) + b where u = sqrt(2)*(x-x0)/w0 with w0
-    the beam radius at 1/e^2 and x0 the beam center.
+    fitting with erfc function:
+    f(x, x0, w0, a, b) = a*erfc(np.sqrt(2)*(x-x0)/w0) + b
+    with w0 the beam radius at 1/e^2 and x0 the beam center.
 
     Parameters
     ----------
@@ -38,103 +35,62 @@ def knife_edge(ds, axisKey='scannerX',
         edges of the scanning axis between which to apply the fit.
     p0: list of floats, numpy 1D array
         initial parameters used for the fit: x0, w0, a, b. If None, a beam
-        radius of 100 um is assumed.
+        radius of 100 micrometers is assumed.
     full: bool
         If False, returns the beam radius and standard error.
         If True, returns the popt, pcov list of parameters and covariance
-        matrix from scipy.optimize.curve_fit as well as the fitting function.
+        matrix from scipy.optimize.curve_fit.
     plot: bool
-        If True, plots the data and the result of the fit.
+        If True, plots the data and the result of the fit. Default is False.
+    display: bool
+        If True, displays info on the fit. True when plot is True, default is
+        False.
 
     Returns
     -------
-    If full is False, ndarray with beam radius at 1/e^2 in mm and standard
+    If full is False, tuple with beam radius at 1/e^2 in mm and standard
         error from the fit in mm. If full is True, returns parameters and
         covariance matrix from scipy.optimize.curve_fit function.
     """
-    def stepUp(x, x0, w0, a, b):
-        return a*erfc(-np.sqrt(2)*(x-x0)/w0) + b
-
-    def stepDown(x, x0, w0, a, b):
-        return a*erfc(np.sqrt(2)*(x-x0)/w0) + b
-
-    # get the number of pulses per train from the signal source:
-    dim = [k for k in ds[signalKey].dims if k != 'trainId'][0]
-    # duplicate motor position values to match signal shape
-    # this is faster than using ds.stack()
-    positions = np.repeat(ds[axisKey].values,
-                          len(ds[dim])).astype(ds[signalKey].dtype)
-    # sort the data to decide which fitting function to use
-    sortIdx = np.argsort(positions)
-    positions = positions[sortIdx]
-    intensities = ds[signalKey].values.flatten()[sortIdx]
-
-    if axisRange[0] is None or axisRange[0] < positions[0]:
-        idxMin = 0
-    else:
-        if axisRange[0] >= positions[-1]:
-            raise ValueError('The minimum value of axisRange is too large')
-        idxMin = bisect.bisect(positions, axisRange[0])
-    if axisRange[1] is None or axisRange[1] > positions[-1]:
-        idxMax = None
-    else:
-        if axisRange[1] <= positions[0]:
-            raise ValueError('The maximum value of axisRange is too small')
-        idxMax = bisect.bisect(positions, axisRange[1]) + 1
-    pos_sel = positions[idxMin:idxMax]
-    int_sel = intensities[idxMin:idxMax]
-    no_nan = ~np.isnan(int_sel)
-    pos_sel = pos_sel[no_nan]
-    int_sel = int_sel[no_nan]
+    popt, pcov = knife_edge_base(ds[axisKey].values, ds[signalKey].values,
+                                 axisRange=axisRange, p0=p0)
+    if plot:
+        positions, intensities = arrays_to1d(ds[axisKey].values,
+                                             ds[signalKey].values)
+        plot_knife_edge(positions, intensities, popt, pcov[1, 1]**0.5,
+                        ds.attrs['runFolder'], axisKey, signalKey)
+        display = True
 
-    # estimate a linear slope fitting the data to determine which function
-    # to fit
-    slope = np.cov(pos_sel, int_sel)[0][1]/np.var(pos_sel)
-    if slope < 0:
-        func = stepDown
+    if display:
         funcStr = 'a*erfc(np.sqrt(2)*(x-x0)/w0) + b'
-    else:
-        func = stepUp
-        funcStr = 'a*erfc(-np.sqrt(2)*(x-x0)/w0) + b'
-    if p0 is None:
-        p0 = [np.mean(pos_sel), 0.1, np.max(int_sel)/2, 0]
-    try:
-        popt, pcov = curve_fit(func, pos_sel, int_sel, p0=p0)
         print('fitting function:', funcStr)
-        print('w0 = (%.1f +/- %.1f) um' % (popt[1]*1e3, pcov[1, 1]**0.5*1e3))
+        print('w0 = (%.1f +/- %.1f) um' % (np.abs(popt[1])*1e3,
+                                           pcov[1, 1]**0.5*1e3))
         print('x0 = (%.3f +/- %.3f) mm' % (popt[0], pcov[0, 0]**0.5))
         print('a = %e +/- %e ' % (popt[2], pcov[2, 2]**0.5))
         print('b = %e +/- %e ' % (popt[3], pcov[3, 3]**0.5))
-        fitSuccess = True
-    except Exception as e:
-        print(f'Could not fit the data with erfc function: {e}.' +
-              ' Try adjusting the axisRange and the initial parameters p0')
-        fitSuccess = False
 
-    if plot:
-        plt.figure(figsize=(7, 4))
-        plt.scatter(positions, intensities, color='C1',
-                    label='exp', s=2, alpha=0.1)
-        if fitSuccess:
-            xfit = np.linspace(positions.min(), positions.max(), 1000)
-            yfit = func(xfit, *popt)
-            plt.plot(xfit, yfit, color='C4',
-                 label=r'fit $\rightarrow$ $w_0=$(%.1f $\pm$ %.1f) $\mu$m' % (
-                                            popt[1]*1e3, pcov[1, 1]**0.5*1e3))
-        leg = plt.legend()
-        for lh in leg.legendHandles:
-            lh.set_alpha(1)
-        plt.ylabel(signalKey)
-        plt.xlabel(axisKey + ' position [mm]')
-        plt.title(ds.attrs['runFolder'])
-        plt.tight_layout()
     if full:
-        if fitSuccess:
-            return popt, pcov, func
-        else:
-            return np.zeros(4), np.zeros(2), None
+        return popt, pcov
     else:
-        if fitSuccess:
-            return np.array([popt[1], pcov[1, 1]**0.5])
-        else:
-            return np.zeros(2)
+        return np.abs(popt[1]), pcov[1, 1]**0.5
+
+
+def plot_knife_edge(positions, intensities, fit_params, rel_err, title,
+                    axisKey, signalKey):
+
+    plt.figure(figsize=(7, 4))
+    plt.scatter(positions, intensities, color='C1',
+                label='measured', s=2, alpha=0.1)
+    xfit = np.linspace(positions.min(), positions.max(), 1000)
+    yfit = erfc(xfit, *fit_params)
+    plt.plot(xfit, yfit, color='C4',
+             label=r'fit $\rightarrow$ $w_0=$(%.1f $\pm$ %.1f) $\mu$m' % (
+                                    np.abs(fit_params[1])*1e3, rel_err*1e3))
+    leg = plt.legend()
+    for lh in leg.legendHandles:
+        lh.set_alpha(1)
+    plt.ylabel(signalKey)
+    plt.xlabel(axisKey + ' position [mm]')
+    plt.title(title)
+    plt.tight_layout()
-- 
GitLab