Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
# -*- coding: utf-8 -*-
""" Toolbox for XAS experiments.
Based on the LCLS LO59 experiment libraries.
Time-resolved XAS and XMCD with uncertainties.
Copyright (2019-) SCS Team
Copyright (2017-2019) Loïc Le Guyader <loic.le.guyader@xfel.eu>
"""
import numpy as np
def absorption(T, Io):
""" Compute the absorption A = -ln(T/Io)
Inputs:
T: 1-D transmission value array of length N
Io: 1-D Io monitor value array of length N
Output:
a structured array with:
muA: absorption mean
sigmaA: absorption standard deviation
weights: sum of Io values
muT: transmission mean
sigmaT: transmission standard deviation
muIo: Io mean
sigmaIo: Io standard deviation
p: correlation coefficient between T and Io
counts: length of T
"""
counts = len(T)
assert counts == len(Io), "T and Io must have the same length"
# return type of the structured array
fdtype = [('muA', 'f8'), ('sigmaA', 'f8'), ('weights', 'f8'),
('muT', 'f8'), ('sigmaT', 'f8'), ('muIo', 'f8'), ('sigmaIo', 'f8'),
('p', 'f8'), ('counts', 'i8')]
if counts == 0:
return np.array([(np.NaN, np.NaN, np.NaN, np.NaN, np.NaN, np.NaN,
np.NaN, np.NaN, 0)], dtype=fdtype)
muT = np.mean(T)
sigmaT = np.std(T)
muIo = np.mean(Io)
sigmaIo = np.std(Io)
weights = np.sum(Io)
p = np.corrcoef(T, Io)[0,1]
muA = -np.log(muT/muIo)
# from error propagation for correlated data
sigmaA = (np.sqrt((sigmaT/muT)**2 + (sigmaIo/muIo)**2
- 2*p*sigmaIo*sigmaT/(muIo*muT)))
# direct calculation
#mask = (Io != 0)
#sigmaA = np.nanstd(-np.log(T[mask]/Io[mask]))
return np.array([(muA, sigmaA, weights, muT, sigmaT, muIo, sigmaIo,
p, counts)], dtype=fdtype)
def binning(x, data, func, bins=100, bin_length=None):
""" General purpose 1-dimension data binning
Inputs:
x: input vector of len N
data: structured array of len N
func: a function handle that takes data from a bin an return
a structured array of statistics computed in that bin
bins: array of bin-edges or number of desired bins
bin_length: if not None, the bin width covering the whole range
Outputs:
bins: the bins edges
res: a structured array of binned data
"""
if bin_length is not None:
bin_start = np.amin(x)
bin_end = np.amax(x)
bins = np.arange(bin_start, bin_end+bin_length, bin_length)
elif np.size(bins) == 1:
bin_start = np.amin(x)
bin_end = np.amax(x)
bins = np.linspace(bin_start, bin_end, bins)
bin_centers = (bins[1:]+bins[:-1])/2
nb_bins = len(bin_centers)
bin_idx = np.digitize(x, bins)
dummy = func([])
res = np.empty((nb_bins), dtype=dummy.dtype)
for k in range(nb_bins):
res[k] = func(data[bin_idx == k])
return bins, res
def xas(nrun, bins=None, Iokey='SCS_XGM', Itkey='MCP3apd', nrjkey='nrj', Iooffset=0):
""" Compute the XAS spectra from a xarray nrun.
Inputs:
nrun: xarray of SCS data
bins: array of bin-edges or number of desired bins
Iokey: string for the Io fields, typically 'SCS_XGM'
Itkey: string for the It fields, typically 'MCP3apd'
NRJkey: string for the nrj fields, typically 'nrj'
Iooffset: offset to apply on Io
Outputs:
a dictionnary containing:
nrj: the bins center
muA: the absorption
sigmaA: standard deviation on the absorption
sterrA: standard error on the absorption
muIo: the mean of the Io
counts: the number of events in each bin
"""
stacked = nrun.stack(x=['trainId','pId'])
Io = stacked[Iokey].values + Iooffset
It = stacked[Itkey].values
nrj = stacked[nrjkey].values
names_list = ['nrj', 'Io', 'It']
rundata = np.vstack((nrj, Io, It))
rundata = np.rec.fromarrays(rundata, names=names_list)
def whichIo(data):
""" Select which fields to use as I0 and which to use as I1
"""
if len(data) == 0:
return absorption([], [])
else:
return absorption(-data['It'], data['Io'])
if bins is None:
num_bins = 80
energy_limits = [np.min(nrj), np.max(nrj)]
bins = np.linspace(energy_limits[0], energy_limits[1], num_bins+1)
dummy, nosample = binning(rundata['nrj'], rundata, whichIo, bins)
muA = nosample['muA']
sterrA = nosample['sigmaA']/np.sqrt(nosample['counts'])
bins_c = 0.5*(bins[1:] + bins[:-1])
return {'nrj':bins_c, 'muA':muA, 'sterrA':sterrA, 'sigmaA':nosample['sigmaA'],
'muIo':nosample['muIo'], 'counts':nosample['counts']}
def xasxmcd(dataP, dataN):
""" Compute XAS and XMCD from data with both magnetic field direction
Inputs:
dataP: structured array for positive field
dataN: structured array for negative field
Outputs:
xas: structured array for the sum
xmcd: structured array for the difference
"""
assert len(dataP) == len(dataN), "binned datasets must be of same lengths"
assert not np.any(dataP['nrj'] - dataN['nrj']), "Energy points for dataP and dataN should be the same"
muXAS = dataP['muA'] + dataN['muA']
muXMCD = dataP['muA'] - dataN['muA']
# standard error is the same for XAS and XMCD
sigma = np.sqrt(dataP['sterrA']**2 + dataN['sterrA']**2)
res = np.empty(len(muXAS), dtype=[('nrj', 'f8'), ('muXAS', 'f8'), ('sigmaXAS', 'f8'),
('muXMCD', 'f8'), ('sigmaXMCD', 'f8')])
res['nrj'] = dataP['nrj']
res['muXAS'] = muXAS
res['muXMCD'] = muXMCD
res['sigmaXAS'] = sigma
res['sigmaXMCD'] = sigma
return res