Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
ToolBox
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SCS
ToolBox
Commits
d2ce5429
Commit
d2ce5429
authored
2 years ago
by
Cammille Carinan
Browse files
Options
Downloads
Plain Diff
Cumulative updates from beamtime 2776 (van Kuiken)
See merge request
!182
parents
6e126fa8
59abb97a
No related branches found
No related tags found
1 merge request
!182
Cumulative updates from beamtime 2776 (van Kuiken)
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
src/toolbox_scs/detectors/hrixs.py
+121
-91
121 additions, 91 deletions
src/toolbox_scs/detectors/hrixs.py
with
121 additions
and
91 deletions
src/toolbox_scs/detectors/hrixs.py
+
121
−
91
View file @
d2ce5429
from
functools
import
lru_cache
from
functools
import
lru_cache
import
numpy
as
np
import
numpy
as
np
import
matplotlib.pyplot
as
plt
from
scipy.optimize
import
leastsq
from
scipy.optimize
import
curve_fit
from
scipy.optimize
import
curve_fit
from
scipy.signal
import
fftconvolve
from
scipy.signal
import
fftconvolve
...
@@ -56,6 +58,26 @@ def find_curvature(image, frangex=None, frangey=None,
...
@@ -56,6 +58,26 @@ def find_curvature(image, frangex=None, frangey=None,
return
curv
[:
-
1
][::
-
1
]
return
curv
[:
-
1
][::
-
1
]
def
find_curvature
(
img
,
args
,
plot
=
False
,
**
kwargs
):
def
parabola
(
x
,
a
,
b
,
c
,
s
=
0
,
h
=
0
,
o
=
0
):
return
(
a
*
x
+
b
)
*
x
+
c
def
gauss
(
y
,
x
,
a
,
b
,
c
,
s
,
h
,
o
=
0
):
return
h
*
np
.
exp
(
-
((
y
-
parabola
(
x
,
a
,
b
,
c
))
/
(
2
*
s
))
**
2
)
+
o
x
=
np
.
arange
(
img
.
shape
[
1
])[
None
,
:]
y
=
np
.
arange
(
img
.
shape
[
0
])[:,
None
]
if
plot
:
plt
.
figure
(
figsize
=
(
10
,
10
))
plt
.
imshow
(
img
,
cmap
=
'
gray
'
,
aspect
=
'
auto
'
,
interpolation
=
'
nearest
'
,
**
kwargs
)
plt
.
plot
(
x
[
0
,
:],
parabola
(
x
[
0
,
:],
*
args
))
args
,
_
=
leastsq
(
lambda
args
:
(
gauss
(
y
,
x
,
*
args
)
-
img
).
ravel
(),
args
)
if
plot
:
plt
.
plot
(
x
[
0
,
:],
parabola
(
x
[
0
,
:],
*
args
))
return
args
def
correct_curvature
(
image
,
factor
=
None
,
axis
=
1
):
def
correct_curvature
(
image
,
factor
=
None
,
axis
=
1
):
if
factor
is
None
:
if
factor
is
None
:
return
return
...
@@ -175,7 +197,7 @@ def _esrf_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
...
@@ -175,7 +197,7 @@ def _esrf_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
return
res
return
res
def
_new_centroid
(
image
,
threshold
=
THRESHOLD
,
curvature
=
(
CURVE_A
,
CURVE_B
)):
def
_new_centroid
(
image
,
threshold
=
THRESHOLD
,
std_threshold
=
3.5
,
curvature
=
(
CURVE_A
,
CURVE_B
)):
"""
find the position of photons with sub-pixel precision
"""
find the position of photons with sub-pixel precision
A photon is supposed to have hit the detector if the intensity within a
A photon is supposed to have hit the detector if the intensity within a
...
@@ -186,7 +208,8 @@ def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
...
@@ -186,7 +208,8 @@ def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
"""
"""
base
=
image
.
mean
()
base
=
image
.
mean
()
corners
=
image
[
1
:,
1
:]
+
image
[:
-
1
,
1
:]
+
image
[
1
:,
:
-
1
]
+
image
[:
-
1
,
:
-
1
]
corners
=
image
[
1
:,
1
:]
+
image
[:
-
1
,
1
:]
+
image
[
1
:,
:
-
1
]
+
image
[:
-
1
,
:
-
1
]
threshold
=
corners
.
mean
()
+
3.5
*
corners
.
std
()
if
threshold
is
None
:
threshold
=
corners
.
mean
()
+
std_threshold
*
corners
.
std
()
middle
=
corners
[
1
:
-
1
,
1
:
-
1
]
middle
=
corners
[
1
:
-
1
,
1
:
-
1
]
candidates
=
(
candidates
=
(
(
middle
>
threshold
)
(
middle
>
threshold
)
...
@@ -257,11 +280,12 @@ class hRIXS:
...
@@ -257,11 +280,12 @@ class hRIXS:
PROPOSAL
=
2769
PROPOSAL
=
2769
# image range
# image range
X_RANGE
=
np
.
s_
[
1300
:
-
100
]
X_RANGE
=
np
.
s_
[
:
]
Y_RANGE
=
np
.
s_
[:]
Y_RANGE
=
np
.
s_
[:]
# centroid
# centroid
THRESHOLD
=
THRESHOLD
# pixel counts above which a hit candidate is assumed
THRESHOLD
=
None
# pixel counts above which a hit candidate is assumed
STD_THRESHOLD
=
3.5
# same as THRESHOLD, in standard deviations
CURVE_A
=
CURVE_A
# curvature parameters as determined elsewhere
CURVE_A
=
CURVE_A
# curvature parameters as determined elsewhere
CURVE_B
=
CURVE_B
CURVE_B
=
CURVE_B
...
@@ -271,101 +295,107 @@ class hRIXS:
...
@@ -271,101 +295,107 @@ class hRIXS:
BINS
=
abs
(
np
.
subtract
(
*
RANGE
))
*
FACTOR
BINS
=
abs
(
np
.
subtract
(
*
RANGE
))
*
FACTOR
METHOD
=
'
centroid
'
# ['centroid', 'integral']
METHOD
=
'
centroid
'
# ['centroid', 'integral']
USE_DARK
=
False
@classmethod
ENERGY_INTERCEPT
=
0
def
set_params
(
cls
,
**
params
):
ENERGY_SLOPE
=
1
for
key
,
value
in
params
.
items
():
setattr
(
cls
,
key
.
upper
(),
value
)
def
__set_params
(
self
,
**
params
):
FIELDS
=
[
'
hRIXS_det
'
,
'
hRIXS_index
'
,
'
hRIXS_delay
'
,
'
hRIXS_norm
'
]
self
.
__class__
.
set_params
(
**
params
)
self
.
refresh
()
def
set_params
(
self
,
**
params
):
for
key
,
value
in
params
.
items
():
setattr
(
self
,
key
.
upper
(),
value
)
@classmethod
def
get_params
(
self
,
*
params
):
def
get_params
(
cls
,
*
params
):
if
not
params
:
if
not
params
:
params
=
(
'
proposal
'
,
'
x_range
'
,
'
y_range
'
,
params
=
(
'
proposal
'
,
'
x_range
'
,
'
y_range
'
,
'
threshold
'
,
'
curve_a
'
,
'
curve_b
'
,
'
threshold
'
,
'
curve_a
'
,
'
curve_b
'
,
'
factor
'
,
'
range
'
,
'
bins
'
,
'
factor
'
,
'
range
'
,
'
bins
'
,
'
method
'
)
'
method
'
,
'
fields
'
)
return
{
param
:
getattr
(
cls
,
param
.
upper
())
for
param
in
params
}
return
{
param
:
getattr
(
self
,
param
.
upper
())
for
param
in
params
}
def
refresh
(
self
):
def
from_run
(
self
,
runNB
,
proposal
=
None
,
extra_fields
=
()):
cls
=
self
.
__class__
for
cached
in
[
'
_centroid
'
,
'
_integral
'
]:
getattr
(
cls
,
cached
).
fget
.
cache_clear
()
def
__init__
(
self
,
images
,
norm
=
None
):
self
.
images
=
images
self
.
norm
=
norm
# class/instance method compatibility
self
.
set_params
=
self
.
__set_params
@classmethod
def
from_run
(
cls
,
runNB
,
proposal
=
None
,
first_wrong
=
False
):
if
proposal
is
None
:
if
proposal
is
None
:
proposal
=
cls
.
PROPOSAL
proposal
=
self
.
PROPOSAL
run
,
data
=
tb
.
load
(
proposal
,
runNB
=
runNB
,
run
,
data
=
tb
.
load
(
proposal
,
runNB
=
runNB
,
fields
=
[
'
hRIXS_det
'
])
fields
=
self
.
FIELDS
+
list
(
extra_fields
))
# Get slow train data
return
data
mnemo
=
tb
.
mnemonics_for_run
(
run
)[
'
SCS_slowTrain
'
]
slow_train
=
run
[
mnemo
[
'
source
'
],
mnemo
[
'
key
'
]].
ndarray
().
sum
()
def
load_dark
(
self
,
runNB
,
proposal
=
None
):
data
=
self
.
from_run
(
runNB
,
proposal
)
return
cls
(
images
=
data
[
'
hRIXS_det
'
][
1
if
first_wrong
else
0
:].
data
,
self
.
dark_image
=
data
[
'
hRIXS_det
'
].
mean
(
dim
=
'
trainId
'
)
norm
=
slow_train
)
self
.
USE_DARK
=
True
@property
def
find_curvature
(
self
,
runNB
,
proposal
=
None
,
plot
=
True
,
args
=
None
,
**
kwargs
):
@lru_cache
()
data
=
self
.
from_run
(
runNB
,
proposal
)
def
_centroid
(
self
):
return
sum
((
centroid
(
image
[
self
.
Y_RANGE
,
self
.
X_RANGE
].
T
,
image
=
data
[
'
hRIXS_det
'
].
sum
(
dim
=
'
trainId
'
)
\
threshold
=
self
.
THRESHOLD
,
.
to_numpy
()[
self
.
X_RANGE
,
self
.
Y_RANGE
].
T
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
),
)
if
args
is
None
:
for
image
in
self
.
images
),
[])
spec
=
(
image
-
image
[:
10
,
:].
mean
()).
mean
(
axis
=
1
)
mean
=
np
.
average
(
np
.
arange
(
len
(
spec
)),
weights
=
spec
)
def
_centroid_spectrum
(
self
,
bins
=
None
,
range
=
None
,
normalize
=
True
):
args
=
(
-
2e-7
,
0.02
,
mean
-
0.02
*
image
.
shape
[
1
]
/
2
,
3
,
spec
.
max
(),
image
.
mean
())
args
=
find_curvature
(
image
,
args
,
plot
=
plot
,
**
kwargs
)
self
.
CURVE_B
,
self
.
CURVE_A
,
*
_
=
args
return
self
.
CURVE_A
,
self
.
CURVE_B
def
centroid
(
self
,
data
,
bins
=
None
):
if
bins
is
None
:
if
bins
is
None
:
bins
=
self
.
BINS
bins
=
self
.
BINS
if
range
is
None
:
ret
=
np
.
zeros
((
len
(
data
[
"
hRIXS_det
"
]),
bins
))
range
=
self
.
RANGE
for
image
,
r
in
zip
(
data
[
"
hRIXS_det
"
],
ret
):
c
=
centroid
(
r
=
np
.
array
(
self
.
_centroid
)
image
.
to_numpy
()[
self
.
X_RANGE
,
self
.
Y_RANGE
].
T
,
hy
,
hx
=
np
.
histogram
(
r
[:,
0
],
bins
=
bins
,
range
=
range
)
threshold
=
self
.
THRESHOLD
,
if
normalize
and
self
.
norm
is
not
None
:
std_threshold
=
self
.
STD_THRESHOLD
,
hy
=
hy
/
self
.
norm
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
))
if
not
len
(
c
):
return
(
hx
[:
-
1
]
+
hx
[
1
:])
/
2
,
hy
continue
rc
=
np
.
array
(
c
)
@property
hy
,
hx
=
np
.
histogram
(
@lru_cache
()
rc
[:,
0
],
bins
=
bins
,
def
_integral
(
self
):
range
=
(
0
,
self
.
Y_RANGE
.
stop
-
self
.
Y_RANGE
.
start
))
return
sum
((
integrate
(
image
[
self
.
Y_RANGE
,
self
.
X_RANGE
].
T
,
r
[:]
=
hy
factor
=
self
.
FACTOR
,
range
=
self
.
RANGE
,
data
=
data
.
assign_coords
(
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
))
energy
=
np
.
linspace
(
self
.
Y_RANGE
.
start
,
self
.
Y_RANGE
.
stop
,
bins
)
for
image
in
self
.
images
))
*
self
.
ENERGY_SLOPE
+
self
.
ENERGY_INTERCEPT
)
return
data
.
assign
(
spectrum
=
((
"
trainId
"
,
"
energy
"
),
ret
))
def
_integral_spectrum
(
self
,
normalize
=
True
):
values
=
self
.
_integral
def
integrate
(
self
,
data
):
if
normalize
and
self
.
norm
is
not
None
:
bins
=
self
.
Y_RANGE
.
stop
-
self
.
Y_RANGE
.
start
values
=
values
/
self
.
norm
ret
=
np
.
zeros
((
len
(
data
[
"
hRIXS_det
"
]),
bins
-
20
))
return
np
.
arange
(
values
.
size
),
values
for
image
,
r
in
zip
(
data
[
"
hRIXS_det
"
],
ret
):
if
self
.
USE_DARK
:
@property
image
=
image
-
self
.
dark_image
def
corrected
(
self
):
r
[:]
=
integrate
(
image
.
to_numpy
()[
self
.
X_RANGE
,
self
.
Y_RANGE
].
T
,
factor
=
1
,
return
decentroid
(
self
.
_centroid
)
range
=
(
10
,
bins
-
10
),
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
))
def
spectrum
(
self
,
normalize
=
True
):
data
=
data
.
assign_coords
(
spec_func
=
(
self
.
_centroid_spectrum
if
self
.
METHOD
.
lower
()
==
'
centroid
'
energy
=
np
.
arange
(
self
.
Y_RANGE
.
start
+
10
,
self
.
Y_RANGE
.
stop
-
10
)
else
self
.
_integral_spectrum
)
*
self
.
ENERGY_SLOPE
+
self
.
ENERGY_INTERCEPT
)
return
spec_func
(
normalize
=
normalize
)
return
data
.
assign
(
spectrum
=
((
"
trainId
"
,
"
energy
"
),
ret
))
def
__sub__
(
self
,
other
):
aggregators
=
dict
(
px
,
py
=
self
.
spectrum
()
hRIXS_det
=
lambda
x
,
dim
:
x
.
sum
(
dim
=
dim
),
mx
,
my
=
other
.
spectrum
()
Delay
=
lambda
x
,
dim
:
x
.
mean
(
dim
=
dim
),
return
(
px
+
mx
)
/
2
,
py
-
my
hRIXS_delay
=
lambda
x
,
dim
:
x
.
mean
(
dim
=
dim
),
hRIXS_norm
=
lambda
x
,
dim
:
x
.
sum
(
dim
=
dim
),
def
__add__
(
self
,
other
):
spectrum
=
lambda
x
,
dim
:
x
.
sum
(
dim
=
dim
),
return
self
.
__class__
(
images
=
list
(
self
.
images
)
+
list
(
other
.
images
),
)
norm
=
self
.
norm
+
other
.
norm
)
def
aggregator
(
self
,
da
,
dim
):
agg
=
self
.
aggregators
.
get
(
da
.
name
)
if
agg
is
None
:
return
None
return
agg
(
da
,
dim
=
dim
)
def
aggregate
(
self
,
ds
,
dim
=
"
trainId
"
):
ret
=
ds
.
map
(
self
.
aggregator
,
dim
=
dim
)
ret
=
ret
.
drop_vars
([
n
for
n
in
ret
if
n
not
in
self
.
aggregators
])
return
ret
def
normalize
(
self
,
data
):
return
data
.
assign
(
normalized
=
data
[
"
spectrum
"
]
/
data
[
"
hRIXS_norm
"
])
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment