Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
ToolBox
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SCS
ToolBox
Merge requests
!170
hRIXS functions
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
hRIXS functions
hrixs
into
master
Overview
14
Commits
7
Pipelines
0
Changes
3
4 unresolved threads
Hide all comments
Merged
Cammille Carinan
requested to merge
hrixs
into
master
3 years ago
Overview
14
Commits
7
Pipelines
0
Changes
1
4 unresolved threads
Hide all comments
Expand
0
0
Merge request reports
Compare
version 3
version 6
8b40f020
3 years ago
version 5
eb1841a8
3 years ago
version 4
b64c9318
3 years ago
version 3
a8629280
3 years ago
version 2
a9829e62
3 years ago
version 1
0ed885aa
3 years ago
master (base)
and
latest version
latest version
e9b7b74b
7 commits,
3 years ago
version 6
8b40f020
6 commits,
3 years ago
version 5
eb1841a8
5 commits,
3 years ago
version 4
b64c9318
3 commits,
3 years ago
version 3
a8629280
2 commits,
3 years ago
version 2
a9829e62
3 commits,
3 years ago
version 1
0ed885aa
2 commits,
3 years ago
Show latest version
1 file
+
227
−
0
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
src/toolbox_scs/detectors/hrixs.py
+
227
−
0
Options
from
functools
import
lru_cache
import
numpy
as
np
from
scipy.optimize
import
curve_fit
from
scipy.signal
import
fftconvolve
import
toolbox_scs
as
tb
__all__
=
[
'
find_curvature
'
,
@@ -142,3 +146,226 @@ def gauss1d(x, height, x0, sigma, offset):
def
to_fwhm
(
sigma
):
return
abs
(
sigma
*
FWHM_COEFF
)
# -----------------------------------------------------------------------------
# Centroid
THRESHOLD
=
510
# pixel counts above which a hit candidate is assumed
CURVE_A
=
2.19042931e-02
# curvature parameters as determined elsewhere
CURVE_B
=
-
3.02191568e-07
def
_esrf_centroid
(
image
,
threshold
=
THRESHOLD
,
curvature
=
(
CURVE_A
,
CURVE_B
)):
gs
=
2
base
=
image
.
mean
()
cp
=
np
.
argwhere
(
image
[
gs
//
2
:
-
gs
//
2
,
gs
//
2
:
-
gs
//
2
]
>
threshold
)
+
np
.
array
([
gs
//
2
,
gs
//
2
])
if
len
(
cp
)
>
100000
:
raise
RuntimeError
(
'
Threshold too low or acquisition time too long
'
)
res
=
[]
for
cy
,
cx
in
cp
:
spot
=
image
[
cy
-
gs
//
2
:
cy
+
gs
//
2
+
1
,
cx
-
gs
//
2
:
cx
+
gs
//
2
+
1
]
-
base
spot
[
spot
<
0
]
=
0
if
(
spot
>
image
[
cy
,
cx
]).
sum
()
==
0
:
mx
=
np
.
average
(
np
.
arange
(
cx
-
gs
//
2
,
cx
+
gs
//
2
+
1
),
weights
=
spot
.
sum
(
axis
=
0
))
my
=
np
.
average
(
np
.
arange
(
cy
-
gs
//
2
,
cy
+
gs
//
2
+
1
),
weights
=
spot
.
sum
(
axis
=
1
))
my
-=
(
curvature
[
0
]
+
curvature
[
1
]
*
mx
)
*
mx
res
.
append
((
my
,
mx
))
return
res
def
_new_centroid
(
image
,
threshold
=
THRESHOLD
,
curvature
=
(
CURVE_A
,
CURVE_B
)):
"""
find the position of photons with sub-pixel precision
A photon is supposed to have hit the detector if the intensity within a
2-by-2 square exceeds a threshold. In this case the position of the photon
is calculated as the center-of-mass in a 4-by-4 square.
Return the list of x,y coordinate pairs, corrected by the curvature.
"""
base
=
image
.
mean
()
corners
=
image
[
1
:,
1
:]
+
image
[:
-
1
,
1
:]
+
image
[
1
:,
:
-
1
]
+
image
[:
-
1
,
:
-
1
]
threshold
=
corners
.
mean
()
+
3.5
*
corners
.
std
()
middle
=
corners
[
1
:
-
1
,
1
:
-
1
]
candidates
=
(
(
middle
>
threshold
)
*
(
middle
>=
corners
[:
-
2
,
1
:
-
1
])
*
(
middle
>
corners
[
2
:,
1
:
-
1
])
*
(
middle
>=
corners
[
1
:
-
1
,
:
-
2
])
*
(
middle
>
corners
[
1
:
-
1
,
2
:])
*
(
middle
>=
corners
[:
-
2
,
:
-
2
])
*
(
middle
>
corners
[
2
:,
:
-
2
])
*
(
middle
>=
corners
[:
-
2
,
2
:])
*
(
middle
>
corners
[
2
:,
2
:]))
cp
=
np
.
argwhere
(
candidates
)
if
len
(
cp
)
>
10000
:
raise
RuntimeError
(
"
too many peaks, threshold too low or acquisition time too high
"
)
res
=
[]
for
cy
,
cx
in
cp
:
spot
=
image
[
cy
:
cy
+
4
,
cx
:
cx
+
4
]
-
base
mx
=
np
.
average
(
np
.
arange
(
cx
,
cx
+
4
),
weights
=
spot
.
sum
(
axis
=
0
))
my
=
np
.
average
(
np
.
arange
(
cy
,
cy
+
4
),
weights
=
spot
.
sum
(
axis
=
1
))
my
-=
(
curvature
[
0
]
+
curvature
[
1
]
*
mx
)
*
mx
res
.
append
((
my
,
mx
))
return
res
centroid
=
_new_centroid
def
decentroid
(
res
):
res
=
np
.
array
(
res
)
ret
=
np
.
zeros
(
shape
=
(
res
.
max
(
axis
=
0
)
+
1
).
astype
(
int
))
for
cy
,
cx
in
res
:
if
cx
>
0
and
cy
>
0
:
ret
[
int
(
cy
),
int
(
cx
)]
+=
1
return
ret
# -----------------------------------------------------------------------------
# Integral
FACTOR
=
3
RANGE
=
[
300
,
400
]
BINS
=
abs
(
np
.
subtract
(
*
RANGE
))
*
FACTOR
def
parabola
(
x
,
a
,
b
,
c
=
0
):
return
(
a
*
x
+
b
)
*
x
+
c
def
integrate
(
image
,
factor
=
FACTOR
,
range
=
RANGE
,
curvature
=
(
CURVE_A
,
CURVE_B
),
):
image
=
image
-
image
.
mean
()
x
=
np
.
arange
(
image
.
shape
[
1
])[
None
,
:]
y
=
np
.
arange
(
image
.
shape
[
0
])[:,
None
]
ys
=
factor
*
(
y
-
parabola
(
x
,
curvature
[
1
],
curvature
[
0
]))
ysf
=
np
.
floor
(
ys
)
rang
=
(
factor
*
range
[
0
],
factor
*
range
[
1
])
bins
=
rang
[
1
]
-
rang
[
0
]
lhy
,
lhx
=
np
.
histogram
(
ysf
.
ravel
(),
bins
=
bins
,
weights
=
((
ys
-
ysf
)
*
image
).
ravel
(),
range
=
rang
)
rhy
,
rhx
=
np
.
histogram
((
ysf
+
1
).
ravel
(),
bins
=
bins
,
weights
=
(((
ysf
+
1
)
-
ys
)
*
image
).
ravel
(),
range
=
rang
)
lvy
,
lvx
=
np
.
histogram
(
ysf
.
ravel
(),
bins
=
bins
,
weights
=
(
ys
-
ysf
).
ravel
(),
range
=
rang
)
rvy
,
rvx
=
np
.
histogram
((
ysf
+
1
).
ravel
(),
bins
=
bins
,
weights
=
((
ysf
+
1
)
-
ys
).
ravel
(),
range
=
rang
)
return
(
lhy
+
rhy
)
/
(
lvy
+
rvy
)
# -----------------------------------------------------------------------------
# hRIXS class
class
hRIXS
:
# run
PROPOSAL
=
2769
# image range
X_RANGE
=
np
.
s_
[
1300
:
-
100
]
Y_RANGE
=
np
.
s_
[:]
# centroid
THRESHOLD
=
THRESHOLD
# pixel counts above which a hit candidate is assumed
CURVE_A
=
CURVE_A
# curvature parameters as determined elsewhere
CURVE_B
=
CURVE_B
# integral
FACTOR
=
FACTOR
RANGE
=
RANGE
BINS
=
abs
(
np
.
subtract
(
*
RANGE
))
*
FACTOR
METHOD
=
'
centroid
'
# ['centroid', 'integral']
@classmethod
def
set_params
(
cls
,
**
params
):
for
key
,
value
in
params
.
items
():
setattr
(
cls
,
key
.
upper
(),
value
)
def
__set_params
(
self
,
**
params
):
self
.
__class__
.
set_params
(
**
params
)
self
.
refresh
()
@classmethod
def
get_params
(
cls
,
*
params
):
if
not
params
:
params
=
(
'
proposal
'
,
'
x_range
'
,
'
y_range
'
,
'
threshold
'
,
'
curve_a
'
,
'
curve_b
'
,
'
factor
'
,
'
range
'
,
'
bins
'
,
'
method
'
)
return
{
param
:
getattr
(
cls
,
param
.
upper
())
for
param
in
params
}
def
refresh
(
self
):
cls
=
self
.
__class__
for
cached
in
[
'
_centroid
'
,
'
_integral
'
]:
getattr
(
cls
,
cached
).
fget
.
cache_clear
()
def
__init__
(
self
,
images
,
norm
=
None
):
self
.
images
=
images
self
.
norm
=
norm
# class/instance method compatibility
self
.
set_params
=
self
.
__set_params
@classmethod
def
from_run
(
cls
,
runNB
,
proposal
=
None
,
first_wrong
=
False
):
if
proposal
is
None
:
proposal
=
cls
.
PROPOSAL
run
,
data
=
tb
.
load
(
proposal
,
runNB
=
runNB
,
fields
=
[
'
hRIXS_det
'
])
# Get slow train data
mnemo
=
tb
.
mnemonics_for_run
(
run
)[
'
SCS_slowTrain
'
]
slow_train
=
run
[
mnemo
[
'
source
'
],
mnemo
[
'
key
'
]].
ndarray
().
sum
()
return
cls
(
images
=
data
[
'
hRIXS_det
'
][
1
if
first_wrong
else
0
:].
data
,
norm
=
slow_train
)
@property
@lru_cache
()
def
_centroid
(
self
):
return
sum
((
centroid
(
image
[
self
.
Y_RANGE
,
self
.
X_RANGE
].
T
,
threshold
=
self
.
THRESHOLD
,
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
),
)
for
image
in
self
.
images
),
[])
def
_centroid_spectrum
(
self
,
bins
=
None
,
range
=
None
,
normalize
=
True
):
if
bins
is
None
:
bins
=
self
.
BINS
if
range
is
None
:
range
=
self
.
RANGE
r
=
np
.
array
(
self
.
_centroid
)
hy
,
hx
=
np
.
histogram
(
r
[:,
0
],
bins
=
bins
,
range
=
range
)
if
normalize
and
self
.
norm
is
not
None
:
hy
=
hy
/
self
.
norm
return
(
hx
[:
-
1
]
+
hx
[
1
:])
/
2
,
hy
@property
@lru_cache
()
def
_integral
(
self
):
return
sum
((
integrate
(
image
[
self
.
Y_RANGE
,
self
.
X_RANGE
].
T
,
factor
=
self
.
FACTOR
,
range
=
self
.
RANGE
,
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
))
for
image
in
self
.
images
))
def
_integral_spectrum
(
self
,
normalize
=
True
):
values
=
self
.
_integral
if
normalize
and
self
.
norm
is
not
None
:
values
=
values
/
self
.
norm
return
np
.
arange
(
values
.
size
),
values
@property
def
corrected
(
self
):
return
decentroid
(
self
.
_centroid
)
def
spectrum
(
self
,
normalize
=
True
):
spec_func
=
(
self
.
_centroid_spectrum
if
self
.
METHOD
.
lower
()
==
'
centroid
'
else
self
.
_integral_spectrum
)
return
spec_func
(
normalize
=
normalize
)
def
__sub__
(
self
,
other
):
px
,
py
=
self
.
spectrum
()
mx
,
my
=
other
.
spectrum
()
return
(
px
+
mx
)
/
2
,
py
-
my
def
__add__
(
self
,
other
):
return
self
.
__class__
(
images
=
list
(
self
.
images
)
+
list
(
other
.
images
),
norm
=
self
.
norm
+
other
.
norm
)
Loading