Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# -*- coding: utf-8 -*-
""" Toolbox for XAS experiments.
Based on the LCLS LO59 experiment libraries.
Time-resolved XAS and XMCD with uncertainties.
Copyright (2019-) SCS Team
Copyright (2017-2019) Loïc Le Guyader <loic.le.guyader@xfel.eu>
"""
import numpy as np
def absorption(T, Io):
""" Compute the absorption A = -ln(T/Io)
Inputs:
T: 1-D transmission value array of length N
Io: 1-D Io monitor value array of length N
Output:
a structured array with:
muA: absorption mean
sigmaA: absorption standard deviation
weights: sum of Io values
muT: transmission mean
sigmaT: transmission standard deviation
muIo: Io mean
sigmaIo: Io standard deviation
p: correlation coefficient between T and Io
counts: length of T
"""
counts = len(T)
assert counts == len(Io), "T and Io must have the same length"
# return type of the structured array
fdtype = [('muA', 'f8'), ('sigmaA', 'f8'), ('weights', 'f8'),
('muT', 'f8'), ('sigmaT', 'f8'), ('muIo', 'f8'), ('sigmaIo', 'f8'),
('p', 'f8'), ('counts', 'i8')]
if counts == 0:
return np.array([(np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN,
np.NaN, np.NaN, 0)], dtype=fdtype)
muT = np.mean(T)
sigmaT = np.std(T)
muIo = np.mean(Io)
sigmaIo = np.std(Io)
weights = np.sum(Io)
p = np.corrcoef(T, Io)[0,1]
muA = -np.log(muT/muIo)
# from error propagation for correlated data
sigmaA = (np.sqrt((sigmaT/muT)**2 + (sigmaIo/muIo)**2
- 2*p*sigmaIo*sigmaT/(muIo*muT)))
# direct calculation
#mask = (Io != 0)
#sigmaA = np.nanstd(-np.log(T[mask]/Io[mask]))
return np.array([(muA, sigmaA, weights, muT, sigmaT, muIo, sigmaIo,
p, counts)], dtype=fdtype)
def binning(x, data, func, bins=100, bin_length=None):
""" General purpose 1-dimension data binning
Inputs:
x: input vector of len N
data: structured array of len N
func: a function handle that takes data from a bin an return
a structured array of statistics computed in that bin
bins: array of bin-edges or number of desired bins
bin_length: if not None, the bin width covering the whole range
Outputs:
bins: the bins edges
res: a structured array of binned data
"""
if bin_length is not None:
bin_start = np.amin(x)
bin_end = np.amax(x)
bins = np.arange(bin_start, bin_end+bin_length, bin_length)
elif np.size(bins) == 1:
bin_start = np.amin(x)
bin_end = np.amax(x)
bins = np.linspace(bin_start, bin_end, bins)
bin_centers = (bins[1:]+bins[:-1])/2
nb_bins = len(bin_centers)
bin_idx = np.digitize(x, bins)
dummy = func([])
res = np.empty((nb_bins), dtype=dummy.dtype)
for k in range(nb_bins):
res[k] = func(data[bin_idx == k])
return bins, res
Mercadier
committed
def xas(nrun, bins=None, Iokey='SCS_SA3', Itkey='MCP3apd', nrjkey='nrj', Iooffset=0):
""" Compute the XAS spectra from a xarray nrun.
Inputs:
nrun: xarray of SCS data
bins: an array of bin-edges or an integer number of
desired bins or a float for the desired bin width.
Iokey: string for the Io fields, typically 'SCS_XGM'
Itkey: string for the It fields, typically 'MCP3apd'
NRJkey: string for the nrj fields, typically 'nrj'
Iooffset: offset to apply on Io
Outputs:
a dictionnary containing:
nrj: the bins center
muA: the absorption
sigmaA: standard deviation on the absorption
sterrA: standard error on the absorption
muIo: the mean of the Io
counts: the number of events in each bin
"""
Mercadier
committed
stacked = nrun.stack(x=['trainId','sa3_pId'])
Io = stacked[Iokey].values + Iooffset
It = stacked[Itkey].values
nrj = stacked[nrjkey].values
names_list = ['nrj', 'Io', 'It']
rundata = np.vstack((nrj, Io, It))
rundata = np.rec.fromarrays(rundata, names=names_list)
def whichIo(data):
""" Select which fields to use as I0 and which to use as I1
"""
if len(data) == 0:
return absorption([], [])
else:
return absorption(-data['It'], data['Io'])
if bins is None:
num_bins = 80
energy_limits = [np.min(nrj), np.max(nrj)]
bins = np.linspace(energy_limits[0], energy_limits[1], num_bins+1)
elif type(bins) == int:
energy_limits = [np.min(nrj), np.max(nrj)]
bins = np.linspace(energy_limits[0], energy_limits[1], bins+1)
elif type(bins) == float:
energy_limits = [np.min(nrj), np.max(nrj)]
bins = np.arange(energy_limits[0], energy_limits[1], bins)
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
dummy, nosample = binning(rundata['nrj'], rundata, whichIo, bins)
muA = nosample['muA']
sterrA = nosample['sigmaA']/np.sqrt(nosample['counts'])
bins_c = 0.5*(bins[1:] + bins[:-1])
return {'nrj':bins_c, 'muA':muA, 'sterrA':sterrA, 'sigmaA':nosample['sigmaA'],
'muIo':nosample['muIo'], 'counts':nosample['counts']}
def xasxmcd(dataP, dataN):
""" Compute XAS and XMCD from data with both magnetic field direction
Inputs:
dataP: structured array for positive field
dataN: structured array for negative field
Outputs:
xas: structured array for the sum
xmcd: structured array for the difference
"""
assert len(dataP) == len(dataN), "binned datasets must be of same lengths"
assert not np.any(dataP['nrj'] - dataN['nrj']), "Energy points for dataP and dataN should be the same"
muXAS = dataP['muA'] + dataN['muA']
muXMCD = dataP['muA'] - dataN['muA']
# standard error is the same for XAS and XMCD
sigma = np.sqrt(dataP['sterrA']**2 + dataN['sterrA']**2)
res = np.empty(len(muXAS), dtype=[('nrj', 'f8'), ('muXAS', 'f8'), ('sigmaXAS', 'f8'),
('muXMCD', 'f8'), ('sigmaXMCD', 'f8')])
res['nrj'] = dataP['nrj']
res['muXAS'] = muXAS
res['muXMCD'] = muXMCD
res['sigmaXAS'] = sigma
res['sigmaXMCD'] = sigma
return res