diff --git a/knife_edge.py b/knife_edge.py index 224b8a88bda810ad6371e613e89a7130b4efc017..e7b906bab1fd8bd28ec5683f5bbcf12bcb18e21d 100644 --- a/knife_edge.py +++ b/knife_edge.py @@ -8,10 +8,12 @@ import matplotlib.pyplot as plt import numpy as np from scipy.special import erfc from scipy.optimize import curve_fit +import bisect -def knife_edge(nrun, axisKey='scannerX', signalKey='FastADC4peaks', p0=None, full=False, plot=False): +def knife_edge(nrun, axisKey='scannerX', signalKey='FastADC4peaks', + axisRange=[None,None], p0=None, full=False, plot=False): ''' Calculates the beam radius at 1/e^2 from a knife-edge scan by fitting with - erfc function: f(a, u) = a*erfc(u) or f(a, u) = a*erfc(-u) where + erfc function: f(a,b,u) = a*erfc(u)+b or f(a,b,u) = a*erfc(-u)+b where u = sqrt(2)*(x-x0)/w0 with w0 the beam radius at 1/e^2 and x0 the beam center. Inputs: nrun: xarray Dataset containing the detector signal and the motor @@ -19,22 +21,24 @@ def knife_edge(nrun, axisKey='scannerX', signalKey='FastADC4peaks', p0=None, ful axisKey: string, key of the axis against which the knife-edge is performed. signalKey: string, key of the detector signal. - p0: list, initial parameters used for the fit: x0, w0, a. If None, a beam + axisRange: list of length 2, minimum and maximum values between which to apply + the fit. + p0: list, initial parameters used for the fit: x0, w0, a, b. If None, a beam radius of 100 um is assumed. full: bool: If False, returns the beam radius and standard error. If True, returns the popt, pcov list of parameters and covariance matrix from - curve_fit. + curve_fit as well as the fitting function. plot: bool: If True, plots the data and the result of the fit. Outputs: If full is False, ndarray with beam radius at 1/e^2 in mm and standard error from the fit in mm. If full is True, returns popt and pcov from curve_fit function. ''' - def integPowerUp(x, x0, w0, a): - return a*erfc(-np.sqrt(2)*(x-x0)/w0) + def integPowerUp(x, x0, w0, a, b): + return a*erfc(-np.sqrt(2)*(x-x0)/w0) + b - def integPowerDown(x, x0, w0, a): - return a*erfc(np.sqrt(2)*(x-x0)/w0) + def integPowerDown(x, x0, w0, a, b): + return a*erfc(np.sqrt(2)*(x-x0)/w0) + b #get the number of pulses per train from the signal source: dim = nrun[signalKey].dims[1] @@ -46,22 +50,38 @@ def knife_edge(nrun, axisKey='scannerX', signalKey='FastADC4peaks', p0=None, ful sortIdx = np.argsort(positions) positions = positions[sortIdx] intensities = nrun[signalKey].values.flatten()[sortIdx] - + + if axisRange[0] is None or axisRange[0] < positions[0]: + idxMin = 0 + else: + if axisRange[0] >= positions[-1]: + raise ValueError('The minimum value of axisRange is too large') + idxMin = bisect.bisect(positions, axisRange[0]) + if axisRange[1] is None or axisRange[1] > positions[-1]: + idxMax = None + else: + if axisRange[1] <= positions[0]: + raise ValueError('The maximum value of axisRange is too small') + idxMax = bisect.bisect(positions, axisRange[1]) + 1 + positions = positions[idxMin:idxMax] + intensities = intensities[idxMin:idxMax] + # estimate a linear slope fitting the data to determine which function to fit slope = np.cov(positions, intensities)[0][1]/np.var(positions) if slope < 0: func = integPowerDown - funcStr = 'a*erfc(np.sqrt(2)*(x-x0)/w0)' + funcStr = 'a*erfc(np.sqrt(2)*(x-x0)/w0) + b' else: func = integPowerUp - funcStr = 'a*erfc(-np.sqrt(2)*(x-x0)/w0)' + funcStr = 'a*erfc(-np.sqrt(2)*(x-x0)/w0) + b' if p0 is None: - p0 = [np.mean(positions), 0.1, np.max(intensities)/2] + p0 = [np.mean(positions), 0.1, np.max(intensities)/2, 0] popt, pcov = curve_fit(func, positions, intensities, p0=p0) print('fitting function:', funcStr) print('w0 = (%.1f +/- %.1f) um'%(popt[1]*1e3, pcov[1,1]**0.5*1e3)) - print('x0 = (%.3f +/- %.3f) mm'%(popt[0], pcov[0,0]**0.5*1e3)) - print('a = %e +/- %e '%(popt[2], pcov[2,2]**0.5*1e3)) + print('x0 = (%.3f +/- %.3f) mm'%(popt[0], pcov[0,0]**0.5)) + print('a = %e +/- %e '%(popt[2], pcov[2,2]**0.5)) + print('b = %e +/- %e '%(popt[3], pcov[3,3]**0.5)) if plot: xfit = np.linspace(positions.min(), positions.max(), 1000) @@ -78,6 +98,6 @@ def knife_edge(nrun, axisKey='scannerX', signalKey='FastADC4peaks', p0=None, ful plt.title(nrun.attrs['runFolder']) plt.tight_layout() if full: - return popt, pcov + return popt, pcov, func else: return np.array([popt[1], pcov[1,1]**0.5]) diff --git a/xgm.py b/xgm.py index 59f2dac2d359cac236effff394feca38656e7dcd..dbc47a40921ffb49aabe0c9d392ea15a75807b5d 100644 --- a/xgm.py +++ b/xgm.py @@ -8,6 +8,7 @@ import matplotlib.pyplot as plt import numpy as np import xarray as xr +from scipy.signal import find_peaks # Machine def pulsePatternInfo(data, plot=False): @@ -967,6 +968,51 @@ def fastAdcPeaks(data, channel, intstart, intstop, bkgstart, bkgstop, period=Non results[:,i] = integ return results +def autoFindFastAdcPeaks(data, channel=5, threshold=35000, display=False, plot=False): + ''' Automatically finds positive peaks in channel of Fast ADC trace, assuming + a minimum absolute height of 'threshold' counts and a minimum width of 4 + samples. The find_peaks function and determination of the peak region and + baseline subtraction is optimized for typical photodiode signals of the + SCS instrument (ILH, FFT reflectometer, FFT diag stage). + Inputs: + data: xarray Dataset containing Fast ADC traces + key: data key of the array of traces + threshold: minimum height of the peaks + display: bool, displays info on the pulses found + plot: plots regions of integration of the first pulse in the trace + Output: + peaks: DataArray of the integrated peaks + ''' + + key = f'FastADC{channel}raw' + if key not in data: + raise ValueError(f'{key} not found in data set') + trace = data[key].where(data['npulses_sase3']>0, drop=True).isel(trainId=0).values + centers, peaks = find_peaks(trace, height=threshold, width=(4, None)) + c = centers[0] + w = np.average(peaks['widths']).astype(int) + period = np.median(np.diff(centers)).astype(int) + npulses = centers.shape[0] + intstart = int(c - w/4) + 1 + intstop = int(c + w/4) + 1 + bkgstop = int(peaks['left_ips'][0])-5 + bkgstart = bkgstop - 10 + if display: + print(f'Found {npulses} pulses, avg. width={w}, period={period} samples, ' + + f'rep. rate={1e6/(9.230769*period):.3f} kHz') + fAdcPeaks = fastAdcPeaks(data, channel=channel, intstart=intstart, intstop=intstop, + bkgstart=bkgstart, bkgstop=bkgstop, period=period, npulses=npulses) + if plot: + plt.figure() + plt.plot(trace, 'o-', ms=3) + for i in range(npulses): + plt.axvline(intstart+i*period, ls='--', color='g') + plt.axvline(intstop+i*period, ls='--', color='r') + plt.axvline(bkgstart+i*period, ls='--', color='lightgrey') + plt.axvline(bkgstop+i*period, ls='--', color='grey') + plt.title(f'Fast ADC {channel} trace') + plt.xlim(bkgstart-10, intstop + 50) + return fAdcPeaks def mergeFastAdcPeaks(data, channel, intstart, intstop, bkgstart, bkgstop, period=None, npulses=None, dim='lasPulseId'):