Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
P
pes_to_spec
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Machine Learning projects.
pes_to_spec
Commits
be3fad54
Commit
be3fad54
authored
2 years ago
by
Danilo Enoque Ferreira de Lima
Browse files
Options
Downloads
Plain Diff
Merge branch 'meeting-jan-31' into 'main'
Check consistency per channel See merge request
!5
parents
a7323718
704d3f0b
No related branches found
Branches containing commit
No related tags found
Tags containing commit
1 merge request
!5
Check consistency per channel
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
pes_to_spec/model.py
+78
-12
78 additions, 12 deletions
pes_to_spec/model.py
with
78 additions
and
12 deletions
pes_to_spec/model.py
+
78
−
12
View file @
be3fad54
...
...
@@ -20,6 +20,8 @@ from sklearn.model_selection import train_test_split
from
sklearn.base
import
clone
,
MetaEstimatorMixin
from
joblib
import
Parallel
,
delayed
from
functools
import
partial
import
multiprocessing
as
mp
from
typing
import
Any
,
Dict
,
List
,
Optional
,
Union
,
Tuple
...
...
@@ -343,7 +345,7 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
self
.
tof_start
=
tof_start
self
.
delta_tof
=
delta_tof
def
transform
(
self
,
X
:
Dict
[
str
,
np
.
ndarray
])
->
np
.
ndarray
:
def
transform
(
self
,
X
:
Dict
[
str
,
np
.
ndarray
]
,
keep_dictionary_structure
:
bool
=
False
)
->
np
.
ndarray
:
"""
Get a dictionary with the channel names for the inut low resolution data and output
only the relevant input data in an array.
...
...
@@ -351,18 +353,20 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
Args:
X: Dictionary with keys named channel_{i}_{k},
where i is a number between 1 and 4 and k is a letter between A and D.
keep_dictionary_structure: Whether to concatenate all channels, or keep them as a dictionary.
Returns: Concatenated and pre-processed low-resolution data of shape (train_id, features).
"""
if
self
.
tof_start
is
None
:
raise
NotImplementedError
(
"
The low-resolution data cannot be transformed before the prompt has been identified. Call the fit function first.
"
)
items
=
[
X
[
k
]
for
k
in
self
.
channels
]
y
=
X
if
self
.
delta_tof
is
not
None
:
items
=
[
item
[:,
self
.
tof_start
:(
self
.
tof_start
+
self
.
delta_tof
)]
for
item
in
items
]
else
:
items
=
[
item
[:,
self
.
tof_start
:]
for
item
in
items
]
cat
=
np
.
concatenate
(
items
,
axis
=
1
)
return
cat
first
=
max
(
0
,
self
.
tof_start
-
self
.
delta_tof
)
last
=
min
(
X
[
self
.
channels
[
0
]].
shape
[
1
],
self
.
tof_start
+
self
.
delta_tof
)
y
=
{
channel
:
item
[:,
first
:
last
]
for
channel
,
item
in
X
.
items
()}
if
not
keep_dictionary_structure
:
return
np
.
concatenate
(
list
(
y
.
values
()),
axis
=
1
)
return
y
def
estimate_prompt_peak
(
self
,
X
:
Dict
[
str
,
np
.
ndarray
])
->
int
:
"""
...
...
@@ -416,8 +420,8 @@ class SelectRelevantLowResolution(TransformerMixin, BaseEstimator):
import
matplotlib.pyplot
as
plt
fig
=
plt
.
figure
(
figsize
=
(
8
,
16
))
ax
=
plt
.
gca
()
ax
.
plot
(
np
.
arange
(
peak_idx
-
1
00
,
peak_idx
+
300
),
sum_low_res
[
peak_idx
-
1
00
:
peak_idx
+
300
],
ax
.
plot
(
np
.
arange
(
peak_idx
-
3
00
,
peak_idx
+
300
),
sum_low_res
[
peak_idx
-
3
00
:
peak_idx
+
300
],
c
=
"
b
"
,
label
=
"
Data
"
)
ax
.
set
(
title
=
""
,
...
...
@@ -541,6 +545,11 @@ class Model(TransformerMixin, BaseEstimator):
#self.fit_model = FitModel()
self
.
fit_model
=
MultiOutputWithStd
(
ARDRegression
(
n_iter
=
30
,
tol
=
1e-4
,
verbose
=
True
))
self
.
channel_pca_model
=
{
channel
:
Pipeline
([(
'
pca
'
,
PCA
(
n_pca_lr
,
whiten
=
True
)),
(
'
unc
'
,
UncertaintyHolder
()),
])
for
channel
in
channels
}
# size of the test subset
self
.
validation_size
=
validation_size
...
...
@@ -593,18 +602,23 @@ class Model(TransformerMixin, BaseEstimator):
Returns: Smoothened high resolution spectrum.
"""
print
(
"
Fitting PCA on low-resolution data.
"
)
x_t
=
self
.
x_model
.
fit_transform
(
low_res_data
)
print
(
"
Fitting PCA on high-resolution data.
"
)
y_t
=
self
.
y_model
.
fit_transform
(
high_res_data
,
smoothen__energy
=
high_res_photon_energy
)
#self.fit_model.set_params(fex__gamma=1.0/float(x_t.shape[0]))
print
(
"
Fitting model.
"
)
self
.
fit_model
.
fit
(
x_t
,
y_t
)
# calculate the effect of the PCA
print
(
"
Calculate PCA unc. on high-resolution data.
"
)
high_res
=
self
.
y_model
[
'
smoothen
'
].
transform
(
high_res_data
)
high_pca
=
self
.
y_model
[
'
pca
'
].
transform
(
high_res
)
high_pca_rec
=
self
.
y_model
[
'
pca
'
].
inverse_transform
(
high_pca
)
high_pca_unc
=
np
.
sqrt
(
np
.
mean
((
high_res
-
high_pca_rec
)
**
2
,
axis
=
0
,
keepdims
=
True
))
self
.
y_model
[
'
unc
'
].
set_uncertainty
(
high_pca_unc
)
print
(
"
Calculate PCA unc. on low-resolution data.
"
)
low_res
=
self
.
x_model
[
'
select
'
].
transform
(
low_res_data
)
pca_model
=
self
.
x_model
[
'
pca
'
]
if
'
fex
'
in
self
.
x_model
.
named_steps
:
...
...
@@ -614,8 +628,58 @@ class Model(TransformerMixin, BaseEstimator):
low_pca_unc
=
np
.
mean
(
np
.
sqrt
(
np
.
mean
((
low_res
-
low_pca_rec
)
**
2
,
axis
=
1
,
keepdims
=
True
)),
axis
=
0
,
keepdims
=
True
)
self
.
x_model
[
'
unc
'
].
set_uncertainty
(
low_pca_unc
)
# for consistency check per channel
print
(
"
Calculate PCA per channel on low-resolution data.
"
)
selection_model
=
self
.
x_model
[
'
select
'
]
low_res
=
selection_model
.
transform
(
low_res_data
,
keep_dictionary_structure
=
True
)
for
channel
in
self
.
get_channels
():
print
(
f
"
Calculate PCA on
{
channel
}
"
)
low_pca
=
self
.
channel_pca_model
[
channel
].
named_steps
[
"
pca
"
].
fit_transform
(
low_res
[
channel
])
low_pca_rec
=
self
.
channel_pca_model
[
channel
].
named_steps
[
"
pca
"
].
inverse_transform
(
low_pca
)
low_pca_unc
=
np
.
mean
(
np
.
sqrt
(
np
.
mean
((
low_res
[
channel
]
-
low_pca_rec
)
**
2
,
axis
=
1
,
keepdims
=
True
)),
axis
=
0
,
keepdims
=
True
)
self
.
channel_pca_model
[
channel
][
'
unc
'
].
set_uncertainty
(
low_pca_unc
)
print
(
"
End of fit.
"
)
return
high_res
def
get_channel_quality
(
self
,
channel
:
str
,
low_res
:
Dict
[
str
,
np
.
ndarray
],
channel_pca_model
:
Dict
[
str
,
Pipeline
])
->
float
:
"""
Get the compatibility for a single channel.
Args:
channel: The channel.
low_res: The data in a dictionary.
pca_model: The PCA model.
Returns: the compatibility factor.
"""
pca_model
=
channel_pca_model
[
channel
].
named_steps
[
'
pca
'
]
low_pca
=
pca_model
.
transform
(
low_res
[
channel
])
low_pca_rec
=
pca_model
.
inverse_transform
(
low_pca
)
low_pca_unc
=
channel_pca_model
[
channel
].
named_steps
[
'
unc
'
].
uncertainty
()
low_pca_dev
=
np
.
sqrt
(
np
.
mean
((
low_res
[
channel
]
-
low_pca_rec
)
**
2
,
axis
=
1
,
keepdims
=
True
))
return
low_pca_dev
/
low_pca_unc
def
check_compatibility_per_channel
(
self
,
low_res_data
:
Dict
[
str
,
np
.
ndarray
])
->
Dict
[
str
,
np
.
ndarray
]:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
comparing the effect of the trained PCA model on it, but do it per channel.
Args:
low_res_data: Low resolution data as in the fit step with shape (train_id, channel, ToF channel).
Returns: Ratio of root-mean-squared-error of the data reconstruction using the existing PCA model and the one from the original model per channel.
"""
selection_model
=
self
.
x_model
[
'
select
'
]
low_res
=
selection_model
.
transform
(
low_res_data
,
keep_dictionary_structure
=
True
)
quality
=
{
channel
:
0.0
for
channel
in
low_res
.
keys
()}
channels
=
list
(
low_res
.
keys
())
#with mp.Pool(len(low_res.keys())) as p:
# values = p.map(partial(self.get_channel_quality, low_res=low_res, channel_pca_model=self.channel_pca_model), channels)
values
=
map
(
partial
(
self
.
get_channel_quality
,
low_res
=
low_res
,
channel_pca_model
=
self
.
channel_pca_model
),
channels
)
quality
=
dict
(
zip
(
channels
,
values
))
return
quality
def
check_compatibility
(
self
,
low_res_data
:
Dict
[
str
,
np
.
ndarray
])
->
np
.
ndarray
:
"""
Check if a new low-resolution data source is compatible with the one used in training, by
...
...
@@ -689,8 +753,9 @@ class Model(TransformerMixin, BaseEstimator):
"""
joblib
.
dump
([
self
.
x_model
,
self
.
y_model
,
self
.
fit_model
],
filename
,
compress
=
'
zlib
'
)
self
.
fit_model
,
self
.
channel_pca_model
],
filename
,
compress
=
'
zlib
'
)
@staticmethod
def
load
(
filename
:
str
)
->
Model
:
...
...
@@ -702,10 +767,11 @@ class Model(TransformerMixin, BaseEstimator):
Returns: A new model object.
"""
x_model
,
y_model
,
fit_model
=
joblib
.
load
(
filename
)
x_model
,
y_model
,
fit_model
,
channel_pca_model
=
joblib
.
load
(
filename
)
obj
=
Model
()
obj
.
x_model
=
x_model
obj
.
y_model
=
y_model
obj
.
fit_model
=
fit_model
obj
.
channel_pca_model
=
channel_pca_model
return
obj
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment