Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
ToolBox
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Model registry
Operate
Environments
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
SCS
ToolBox
Merge requests
!170
hRIXS functions
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
hRIXS functions
hrixs
into
master
Overview
14
Commits
7
Pipelines
0
Changes
1
Merged
Cammille Carinan
requested to merge
hrixs
into
master
3 years ago
Overview
14
Commits
7
Pipelines
0
Changes
1
Expand
0
0
Merge request reports
Viewing commit
eb1841a8
Prev
Next
Show latest version
1 file
+
2
−
4
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
eb1841a8
Improve SCS_slowTrain call
· eb1841a8
Cammille Carinan
authored
3 years ago
src/toolbox_scs/detectors/hrixs.py
0 → 100644
+
371
−
0
Options
from
functools
import
lru_cache
import
numpy
as
np
from
scipy.optimize
import
curve_fit
from
scipy.signal
import
fftconvolve
import
toolbox_scs
as
tb
__all__
=
[
'
find_curvature
'
,
'
correct_curvature
'
,
'
get_spectrum
'
,
'
energy_calibration
'
,
'
calibrate
'
,
'
gaussian_fit
'
,
'
to_fwhm
'
]
# -----------------------------------------------------------------------------
# Curvature
def
find_curvature
(
image
,
frangex
=
None
,
frangey
=
None
,
deg
=
2
,
axis
=
1
,
offset
=
100
):
# Resolve arguments
x_range
=
(
0
,
image
.
shape
[
1
])
if
frangex
is
not
None
:
x_range
=
(
max
(
frangex
[
0
],
x_range
[
0
]),
min
(
frangex
[
1
],
x_range
[
1
]))
y_range
=
(
0
,
image
.
shape
[
0
])
if
frangex
is
not
None
:
y_range
=
(
max
(
frangey
[
0
],
y_range
[
0
]),
min
(
frangey
[
1
],
y_range
[
1
]))
axis_range
=
y_range
if
axis
==
1
else
x_range
axis_dim
=
image
.
shape
[
axis
-
1
]
# Get kernel
integral
=
image
[
slice
(
*
y_range
),
slice
(
*
x_range
)].
mean
(
axis
=
axis
)
roi
=
np
.
ones
([
axis_range
[
1
]
-
axis_range
[
0
],
axis_dim
])
ref
=
roi
*
integral
[:,
np
.
newaxis
]
# Get sliced image
slice_
=
[
slice
(
None
),
slice
(
None
)]
slice_
[
axis
-
1
]
=
slice
(
max
(
axis_range
[
0
]
-
offset
,
0
),
min
(
axis_range
[
1
]
+
offset
,
axis_dim
))
sliced
=
image
[
tuple
(
slice_
)]
if
axis
==
0
:
sliced
=
sliced
.
T
# Get curvature factor from cross correlation
crosscorr
=
fftconvolve
(
sliced
,
ref
[::
-
1
,
:],
axes
=
0
,
)
shifts
=
np
.
argmax
(
crosscorr
,
axis
=
0
)
curv
=
np
.
polyfit
(
np
.
arange
(
axis_dim
),
shifts
,
deg
=
deg
)
return
curv
[:
-
1
][::
-
1
]
def
correct_curvature
(
image
,
factor
=
None
,
axis
=
1
):
if
factor
is
None
:
return
if
axis
==
1
:
image
=
image
.
T
ydim
,
xdim
=
image
.
shape
x
=
np
.
arange
(
xdim
+
1
)
y
=
np
.
arange
(
ydim
+
1
)
xx
,
yy
=
np
.
meshgrid
(
x
[:
-
1
]
+
0.5
,
y
[:
-
1
]
+
0.5
)
xxn
=
xx
-
factor
[
0
]
*
yy
-
factor
[
1
]
*
yy
**
2
ret
=
np
.
histogramdd
((
xxn
.
flatten
(),
yy
.
flatten
()),
bins
=
[
x
,
y
],
weights
=
image
.
flatten
())[
0
]
return
ret
if
axis
==
1
else
ret
.
T
def
get_spectrum
(
image
,
factor
=
None
,
axis
=
0
,
pixel_range
=
None
,
energy_range
=
None
,
):
start
,
stop
=
(
0
,
image
.
shape
[
axis
-
1
])
if
pixel_range
is
not
None
:
start
=
max
(
pixel_range
[
0
]
or
start
,
start
)
stop
=
min
(
pixel_range
[
1
]
or
stop
,
stop
)
edge
=
image
.
sum
(
axis
=
axis
)[
start
:
stop
]
bins
=
np
.
arange
(
start
,
stop
+
1
)
centers
=
(
bins
[
1
:]
+
bins
[:
-
1
])
*
0.5
if
factor
is
not
None
:
centers
,
edge
=
calibrate
(
centers
,
edge
,
factor
=
factor
,
range_
=
energy_range
)
return
centers
,
edge
# -----------------------------------------------------------------------------
# Energy calibration
def
energy_calibration
(
channels
,
energies
):
return
np
.
polyfit
(
channels
,
energies
,
deg
=
1
)
def
calibrate
(
x
,
y
=
None
,
factor
=
None
,
range_
=
None
):
if
factor
is
not
None
:
x
=
np
.
polyval
(
factor
,
x
)
if
y
is
not
None
and
range_
is
not
None
:
start
=
np
.
argmin
(
np
.
abs
((
x
-
range_
[
0
])))
stop
=
np
.
argmin
(
np
.
abs
((
x
-
range_
[
1
])))
# Calibrated energies have a different direction
x
,
y
=
x
[
stop
:
start
],
y
[
stop
:
start
]
return
x
,
y
# -----------------------------------------------------------------------------
# Gaussian-related functions
FWHM_COEFF
=
2
*
np
.
sqrt
(
2
*
np
.
log
(
2
))
def
gaussian_fit
(
x_data
,
y_data
,
offset
=
0
):
"""
Centre-of-mass and width. Lifted from image_processing.imageCentreofMass()
"""
x0
=
np
.
average
(
x_data
,
weights
=
y_data
)
sx
=
np
.
sqrt
(
np
.
average
((
x_data
-
x0
)
**
2
,
weights
=
y_data
))
# Gaussian fit
baseline
=
y_data
.
min
()
p_0
=
(
y_data
.
max
(),
x0
+
offset
,
sx
,
baseline
)
try
:
p_f
,
_
=
curve_fit
(
gauss1d
,
x_data
,
y_data
,
p_0
,
maxfev
=
10000
)
return
p_f
except
(
RuntimeError
,
TypeError
)
as
e
:
print
(
e
)
return
None
def
gauss1d
(
x
,
height
,
x0
,
sigma
,
offset
):
return
height
*
np
.
exp
(
-
0.5
*
((
x
-
x0
)
/
sigma
)
**
2
)
+
offset
def
to_fwhm
(
sigma
):
return
abs
(
sigma
*
FWHM_COEFF
)
# -----------------------------------------------------------------------------
# Centroid
THRESHOLD
=
510
# pixel counts above which a hit candidate is assumed
CURVE_A
=
2.19042931e-02
# curvature parameters as determined elsewhere
CURVE_B
=
-
3.02191568e-07
def
_esrf_centroid
(
image
,
threshold
=
THRESHOLD
,
curvature
=
(
CURVE_A
,
CURVE_B
)):
gs
=
2
base
=
image
.
mean
()
cp
=
np
.
argwhere
(
image
[
gs
//
2
:
-
gs
//
2
,
gs
//
2
:
-
gs
//
2
]
>
threshold
)
+
np
.
array
([
gs
//
2
,
gs
//
2
])
if
len
(
cp
)
>
100000
:
raise
RuntimeError
(
'
Threshold too low or acquisition time too long
'
)
res
=
[]
for
cy
,
cx
in
cp
:
spot
=
image
[
cy
-
gs
//
2
:
cy
+
gs
//
2
+
1
,
cx
-
gs
//
2
:
cx
+
gs
//
2
+
1
]
-
base
spot
[
spot
<
0
]
=
0
if
(
spot
>
image
[
cy
,
cx
]).
sum
()
==
0
:
mx
=
np
.
average
(
np
.
arange
(
cx
-
gs
//
2
,
cx
+
gs
//
2
+
1
),
weights
=
spot
.
sum
(
axis
=
0
))
my
=
np
.
average
(
np
.
arange
(
cy
-
gs
//
2
,
cy
+
gs
//
2
+
1
),
weights
=
spot
.
sum
(
axis
=
1
))
my
-=
(
curvature
[
0
]
+
curvature
[
1
]
*
mx
)
*
mx
res
.
append
((
my
,
mx
))
return
res
def
_new_centroid
(
image
,
threshold
=
THRESHOLD
,
curvature
=
(
CURVE_A
,
CURVE_B
)):
"""
find the position of photons with sub-pixel precision
A photon is supposed to have hit the detector if the intensity within a
2-by-2 square exceeds a threshold. In this case the position of the photon
is calculated as the center-of-mass in a 4-by-4 square.
Return the list of x,y coordinate pairs, corrected by the curvature.
"""
base
=
image
.
mean
()
corners
=
image
[
1
:,
1
:]
+
image
[:
-
1
,
1
:]
+
image
[
1
:,
:
-
1
]
+
image
[:
-
1
,
:
-
1
]
threshold
=
corners
.
mean
()
+
3.5
*
corners
.
std
()
middle
=
corners
[
1
:
-
1
,
1
:
-
1
]
candidates
=
(
(
middle
>
threshold
)
*
(
middle
>=
corners
[:
-
2
,
1
:
-
1
])
*
(
middle
>
corners
[
2
:,
1
:
-
1
])
*
(
middle
>=
corners
[
1
:
-
1
,
:
-
2
])
*
(
middle
>
corners
[
1
:
-
1
,
2
:])
*
(
middle
>=
corners
[:
-
2
,
:
-
2
])
*
(
middle
>
corners
[
2
:,
:
-
2
])
*
(
middle
>=
corners
[:
-
2
,
2
:])
*
(
middle
>
corners
[
2
:,
2
:]))
cp
=
np
.
argwhere
(
candidates
)
if
len
(
cp
)
>
10000
:
raise
RuntimeError
(
"
too many peaks, threshold too low or acquisition time too high
"
)
res
=
[]
for
cy
,
cx
in
cp
:
spot
=
image
[
cy
:
cy
+
4
,
cx
:
cx
+
4
]
-
base
mx
=
np
.
average
(
np
.
arange
(
cx
,
cx
+
4
),
weights
=
spot
.
sum
(
axis
=
0
))
my
=
np
.
average
(
np
.
arange
(
cy
,
cy
+
4
),
weights
=
spot
.
sum
(
axis
=
1
))
my
-=
(
curvature
[
0
]
+
curvature
[
1
]
*
mx
)
*
mx
res
.
append
((
my
,
mx
))
return
res
centroid
=
_new_centroid
def
decentroid
(
res
):
res
=
np
.
array
(
res
)
ret
=
np
.
zeros
(
shape
=
(
res
.
max
(
axis
=
0
)
+
1
).
astype
(
int
))
for
cy
,
cx
in
res
:
if
cx
>
0
and
cy
>
0
:
ret
[
int
(
cy
),
int
(
cx
)]
+=
1
return
ret
# -----------------------------------------------------------------------------
# Integral
FACTOR
=
3
RANGE
=
[
300
,
400
]
BINS
=
abs
(
np
.
subtract
(
*
RANGE
))
*
FACTOR
def
parabola
(
x
,
a
,
b
,
c
=
0
):
return
(
a
*
x
+
b
)
*
x
+
c
def
integrate
(
image
,
factor
=
FACTOR
,
range
=
RANGE
,
curvature
=
(
CURVE_A
,
CURVE_B
),
):
image
=
image
-
image
.
mean
()
x
=
np
.
arange
(
image
.
shape
[
1
])[
None
,
:]
y
=
np
.
arange
(
image
.
shape
[
0
])[:,
None
]
ys
=
factor
*
(
y
-
parabola
(
x
,
curvature
[
1
],
curvature
[
0
]))
ysf
=
np
.
floor
(
ys
)
rang
=
(
factor
*
range
[
0
],
factor
*
range
[
1
])
bins
=
rang
[
1
]
-
rang
[
0
]
lhy
,
lhx
=
np
.
histogram
(
ysf
.
ravel
(),
bins
=
bins
,
weights
=
((
ys
-
ysf
)
*
image
).
ravel
(),
range
=
rang
)
rhy
,
rhx
=
np
.
histogram
((
ysf
+
1
).
ravel
(),
bins
=
bins
,
weights
=
(((
ysf
+
1
)
-
ys
)
*
image
).
ravel
(),
range
=
rang
)
lvy
,
lvx
=
np
.
histogram
(
ysf
.
ravel
(),
bins
=
bins
,
weights
=
(
ys
-
ysf
).
ravel
(),
range
=
rang
)
rvy
,
rvx
=
np
.
histogram
((
ysf
+
1
).
ravel
(),
bins
=
bins
,
weights
=
((
ysf
+
1
)
-
ys
).
ravel
(),
range
=
rang
)
return
(
lhy
+
rhy
)
/
(
lvy
+
rvy
)
# -----------------------------------------------------------------------------
# hRIXS class
class
hRIXS
:
# run
PROPOSAL
=
2769
# image range
X_RANGE
=
np
.
s_
[
1300
:
-
100
]
Y_RANGE
=
np
.
s_
[:]
# centroid
THRESHOLD
=
THRESHOLD
# pixel counts above which a hit candidate is assumed
CURVE_A
=
CURVE_A
# curvature parameters as determined elsewhere
CURVE_B
=
CURVE_B
# integral
FACTOR
=
FACTOR
RANGE
=
RANGE
BINS
=
abs
(
np
.
subtract
(
*
RANGE
))
*
FACTOR
METHOD
=
'
centroid
'
# ['centroid', 'integral']
@classmethod
def
set_params
(
cls
,
**
params
):
for
key
,
value
in
params
.
items
():
setattr
(
cls
,
key
.
upper
(),
value
)
def
__set_params
(
self
,
**
params
):
self
.
__class__
.
set_params
(
**
params
)
self
.
refresh
()
@classmethod
def
get_params
(
cls
,
*
params
):
if
not
params
:
params
=
(
'
proposal
'
,
'
x_range
'
,
'
y_range
'
,
'
threshold
'
,
'
curve_a
'
,
'
curve_b
'
,
'
factor
'
,
'
range
'
,
'
bins
'
,
'
method
'
)
return
{
param
:
getattr
(
cls
,
param
.
upper
())
for
param
in
params
}
def
refresh
(
self
):
cls
=
self
.
__class__
for
cached
in
[
'
_centroid
'
,
'
_integral
'
]:
getattr
(
cls
,
cached
).
fget
.
cache_clear
()
def
__init__
(
self
,
images
,
norm
=
None
):
self
.
images
=
images
self
.
norm
=
norm
# class/instance method compatibility
self
.
set_params
=
self
.
__set_params
@classmethod
def
from_run
(
cls
,
runNB
,
proposal
=
None
,
first_wrong
=
False
):
if
proposal
is
None
:
proposal
=
cls
.
PROPOSAL
run
,
data
=
tb
.
load
(
proposal
,
runNB
=
runNB
,
fields
=
[
'
hRIXS_det
'
])
# Get slow train data
mnemo
=
tb
.
mnemonics_for_run
(
run
)[
'
SCS_slowTrain
'
]
slow_train
=
run
[
mnemo
[
'
source
'
],
mnemo
[
'
key
'
]].
ndarray
().
sum
()
return
cls
(
images
=
data
[
'
hRIXS_det
'
][
1
if
first_wrong
else
0
:].
data
,
norm
=
slow_train
)
@property
@lru_cache
()
def
_centroid
(
self
):
return
sum
((
centroid
(
image
[
self
.
Y_RANGE
,
self
.
X_RANGE
].
T
,
threshold
=
self
.
THRESHOLD
,
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
),
)
for
image
in
self
.
images
),
[])
def
_centroid_spectrum
(
self
,
bins
=
None
,
range
=
None
,
normalize
=
True
):
if
bins
is
None
:
bins
=
self
.
BINS
if
range
is
None
:
range
=
self
.
RANGE
r
=
np
.
array
(
self
.
_centroid
)
hy
,
hx
=
np
.
histogram
(
r
[:,
0
],
bins
=
bins
,
range
=
range
)
if
normalize
and
self
.
norm
is
not
None
:
hy
=
hy
/
self
.
norm
return
(
hx
[:
-
1
]
+
hx
[
1
:])
/
2
,
hy
@property
@lru_cache
()
def
_integral
(
self
):
return
sum
((
integrate
(
image
[
self
.
Y_RANGE
,
self
.
X_RANGE
].
T
,
factor
=
self
.
FACTOR
,
range
=
self
.
RANGE
,
curvature
=
(
self
.
CURVE_A
,
self
.
CURVE_B
))
for
image
in
self
.
images
))
def
_integral_spectrum
(
self
,
normalize
=
True
):
values
=
self
.
_integral
if
normalize
and
self
.
norm
is
not
None
:
values
=
values
/
self
.
norm
return
np
.
arange
(
values
.
size
),
values
@property
def
corrected
(
self
):
return
decentroid
(
self
.
_centroid
)
def
spectrum
(
self
,
normalize
=
True
):
spec_func
=
(
self
.
_centroid_spectrum
if
self
.
METHOD
.
lower
()
==
'
centroid
'
else
self
.
_integral_spectrum
)
return
spec_func
(
normalize
=
normalize
)
def
__sub__
(
self
,
other
):
px
,
py
=
self
.
spectrum
()
mx
,
my
=
other
.
spectrum
()
return
(
px
+
mx
)
/
2
,
py
-
my
def
__add__
(
self
,
other
):
return
self
.
__class__
(
images
=
list
(
self
.
images
)
+
list
(
other
.
images
),
norm
=
self
.
norm
+
other
.
norm
)
Loading