Skip to content
Snippets Groups Projects

Commented centroid

Closed Johannes Niskanen requested to merge p2866_updates_jhns_centroid into master
1 file
+ 302
92
Compare changes
  • Side-by-side
  • Inline
from functools import lru_cache
import numpy as np
import matplotlib.pyplot as plt
from scipy.optimize import leastsq
from scipy.optimize import curve_fit
from scipy.signal import fftconvolve
import xarray as xr
import toolbox_scs as tb
@@ -56,6 +58,26 @@ def find_curvature(image, frangex=None, frangey=None,
return curv[:-1][::-1]
def find_curvature(img, args, plot=False, **kwargs):
def parabola(x, a, b, c, s=0, h=0, o=0):
return (a*x + b)*x + c
def gauss(y, x, a, b, c, s, h, o=0):
return h * np.exp(-((y - parabola(x, a, b, c)) / (2 * s))**2) + o
x = np.arange(img.shape[1])[None, :]
y = np.arange(img.shape[0])[:, None]
if plot:
plt.figure(figsize=(10,10))
plt.imshow(img, cmap='gray', aspect='auto', interpolation='nearest', **kwargs)
plt.plot(x[0, :], parabola(x[0, :], *args))
args, _ = leastsq(lambda args: (gauss(y, x, *args) - img).ravel(), args)
if plot:
plt.plot(x[0, :], parabola(x[0, :], *args))
return args
def correct_curvature(image, factor=None, axis=1):
if factor is None:
return
@@ -175,7 +197,7 @@ def _esrf_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
return res
def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
def _new_centroid(image, threshold=THRESHOLD, std_threshold=3.5, curvature=(CURVE_A, CURVE_B)):
"""find the position of photons with sub-pixel precision
A photon is supposed to have hit the detector if the intensity within a
@@ -186,7 +208,8 @@ def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
"""
base = image.mean()
corners = image[1:, 1:] + image[:-1, 1:] + image[1:, :-1] + image[:-1, :-1]
threshold = corners.mean() + 3.5 * corners.std()
if threshold is None:
threshold = corners.mean() + std_threshold * corners.std()
middle = corners[1:-1, 1:-1]
candidates = (
(middle > threshold)
@@ -257,11 +280,12 @@ class hRIXS:
PROPOSAL = 2769
# image range
X_RANGE = np.s_[1300:-100]
X_RANGE = np.s_[:]
Y_RANGE = np.s_[:]
# centroid
THRESHOLD = THRESHOLD # pixel counts above which a hit candidate is assumed
THRESHOLD = None # pixel counts above which a hit candidate is assumed
STD_THRESHOLD = 3.5 # same as THRESHOLD, in standard deviations
CURVE_A = CURVE_A # curvature parameters as determined elsewhere
CURVE_B = CURVE_B
@@ -271,101 +295,287 @@ class hRIXS:
BINS = abs(np.subtract(*RANGE)) * FACTOR
METHOD = 'centroid' # ['centroid', 'integral']
# Dark image and mask treatment
USE_DARK = False
USE_DARK_MASK = False
DARK_MASK_THRESHOLD = 100
MASK_AVG_X = np.s_[1850:2000]
MASK_AVG_Y = np.s_[500:1500]
@classmethod
def set_params(cls, **params):
for key, value in params.items():
setattr(cls, key.upper(), value)
ENERGY_INTERCEPT = 0
ENERGY_SLOPE = 1
def __set_params(self, **params):
self.__class__.set_params(**params)
self.refresh()
FIELDS = ['hRIXS_det', 'hRIXS_index', 'hRIXS_delay', 'hRIXS_norm']
@classmethod
def get_params(cls, *params):
def set_params(self, **params):
for key, value in params.items():
setattr(self, key.upper(), value)
def get_params(self, *params):
if not params:
params = ('proposal', 'x_range', 'y_range',
'threshold', 'curve_a', 'curve_b',
'factor', 'range', 'bins',
'method')
return {param: getattr(cls, param.upper()) for param in params}
def refresh(self):
cls = self.__class__
for cached in ['_centroid', '_integral']:
getattr(cls, cached).fget.cache_clear()
'method', 'fields')
return {param: getattr(self, param.upper()) for param in params}
def __init__(self, images, norm=None):
self.images = images
self.norm = norm
# class/instance method compatibility
self.set_params = self.__set_params
@classmethod
def from_run(cls, runNB, proposal=None, first_wrong=False):
def from_run(self, runNB, proposal=None, extra_fields=()):
if proposal is None:
proposal = cls.PROPOSAL
run, data = tb.load(proposal, runNB=runNB, fields=['hRIXS_det'])
# Get slow train data
mnemo = tb.mnemonics_for_run(run)['SCS_slowTrain']
slow_train = run[mnemo['source'], mnemo['key']].ndarray().sum()
return cls(images=data['hRIXS_det'][1 if first_wrong else 0:].data,
norm=slow_train)
@property
@lru_cache()
def _centroid(self):
return sum((centroid(image[self.Y_RANGE, self.X_RANGE].T,
threshold=self.THRESHOLD,
curvature=(self.CURVE_A, self.CURVE_B), )
for image in self.images), [])
def _centroid_spectrum(self, bins=None, range=None, normalize=True):
proposal = self.PROPOSAL
run, data = tb.load(proposal, runNB=runNB,
fields=self.FIELDS + list(extra_fields))
return data
def load_dark(self, runNB, proposal=None, use_dark=True, mask=True,
mask_threshold=None):
#*************************************************************
# Loads a dark image and assigns it to the hRIXS instance
# In addition sets attributes whether or not
# - hot pixels are identified and masked out
# - the dark image is to be used in background subtraction
# In addition a threshold value for hot pixel mask generation
# can be given.
#*************************************************************
if mask_threshold == None:
mask_threshold = self.DARK_MASK_THRESHOLD
#*************************************************************
# If given a list of runs, iterate over them.
# Otherwise read one. Give an exception if neither is the case.
#*************************************************************
if type(runNB) == type([]):
data_list = []
for run in runNB:
data_list.append(self.from_run(run, proposal))
data = xr.concat(data_list, dim='trainId')
elif type(runNB) == type(1):
data = self.from_run(runNB, proposal)
else:
raise Exception('load_dark() expects a list of indeces or an integer.')
#*************************************************************
# Store the dark image (mean over aqs.) in two formats
#*************************************************************
self.dark_image = data['hRIXS_det'].mean(dim='trainId')
self.dark_im_array = self.dark_image.to_numpy()
#*************************************************************
# Set a flag whether the dark image is to be used later
#*************************************************************
if use_dark:
self.USE_DARK = True
#*************************************************************
# If hot/dead pixel masking is requested, find the mask and
# set a flag in the instance. Set the masked dark values to
# mean intensity.
#*************************************************************
if mask:
dark_avg = np.mean(self.dark_im_array[self.MASK_AVG_Y,
self.MASK_AVG_X], (0, 1))
self.dark_mask = self.dark_im_array > dark_avg + mask_threshold
self.dark_im_array_m = np.array(self.dark_im_array)
self.dark_im_array_m[self.dark_mask] = dark_avg
self.USE_DARK_MASK = True
return
def find_curvature(self, runNB, proposal=None, plot=True, args=None, **kwargs):
data = self.from_run(runNB, proposal)
image = data['hRIXS_det'].sum(dim='trainId') \
.to_numpy()[self.X_RANGE, self.Y_RANGE].T
if args is None:
spec = (image - image[:10, :].mean()).mean(axis=1)
mean = np.average(np.arange(len(spec)), weights=spec)
args = (-2e-7, 0.02, mean - 0.02 * image.shape[1] / 2, 3,
spec.max(), image.mean())
args = find_curvature(image, args, plot=plot, **kwargs)
self.CURVE_B, self.CURVE_A, *_ = args
return self.CURVE_A, self.CURVE_B
def centroid(self, data, bins=None, return_hits=False):
if bins is None:
bins = self.BINS
hit_x = []
hit_y = []
hits = []
ret = np.zeros((len(data["hRIXS_det"]), bins))
for image, r in zip(data["hRIXS_det"], ret):
use_image = image.to_numpy()
if self.USE_DARK_MASK:
use_image[self.dark_mask] = np.mean(use_image[self.MASK_AVG_Y,
self.MASK_AVG_X], (0, 1))
if self.USE_DARK:
use_image = use_image - self.dark_im_array_m
else:
if self.USE_DARK:
use_image = use_image - self.dark_im_array
c = centroid(
use_image[self.X_RANGE, self.Y_RANGE].T,
threshold=self.THRESHOLD,
std_threshold=self.STD_THRESHOLD,
curvature=(self.CURVE_A, self.CURVE_B))
if not len(c):
continue
rc = np.array(c)
if return_hits:
hit_x.append(rc[:, 0])
hit_y.append(rc[:, 1])
hits.append(rc)
hy, hx = np.histogram(
rc[:, 0], bins=bins,
range=(0, self.Y_RANGE.stop - self.Y_RANGE.start))
r[:] = hy
data = data.assign_coords(
energy=np.linspace(self.Y_RANGE.start, self.Y_RANGE.stop, bins)
* self.ENERGY_SLOPE + self.ENERGY_INTERCEPT)
#**********************************************
# If hits were requested, assign them to data
#**********************************************
if return_hits:
data = data.assign(hits=(("trainId"), hits),
xhits=(("trainId"), hit_x),
yhits=(("trainId"), hit_y))
#**********************************************
# Always assign the spectrum to data
#**********************************************
data = data.assign(spectrum=(("trainId", "energy"), ret))
return data
def centroid_jhns(self, data, bins=None, return_hits=False):
#*************************************************************
# Carry out hit finding on data and bin them in grid
# Allows for
# - redifining the bins
# - extraction of the hit positions from each image
# The function will use dark images and hot pixel mask as
# given by
# self.USE_DARK
# self.USE_DARK_MASK
#*************************************************************
#*************************************************************
# If new bins are given, use them
#*************************************************************
if bins is None:
bins = self.BINS
if range is None:
range = self.RANGE
r = np.array(self._centroid)
hy, hx = np.histogram(r[:, 0], bins=bins, range=range)
if normalize and self.norm is not None:
hy = hy / self.norm
return (hx[:-1] + hx[1:]) / 2, hy
@property
@lru_cache()
def _integral(self):
return sum((integrate(image[self.Y_RANGE, self.X_RANGE].T,
factor=self.FACTOR,
range=self.RANGE,
curvature=(self.CURVE_A, self.CURVE_B))
for image in self.images))
def _integral_spectrum(self, normalize=True):
values = self._integral
if normalize and self.norm is not None:
values = values / self.norm
return np.arange(values.size), values
@property
def corrected(self):
return decentroid(self._centroid)
def spectrum(self, normalize=True):
spec_func = (self._centroid_spectrum if self.METHOD.lower() == 'centroid'
else self._integral_spectrum)
return spec_func(normalize=normalize)
def __sub__(self, other):
px, py = self.spectrum()
mx, my = other.spectrum()
return (px + mx) / 2, py - my
def __add__(self, other):
return self.__class__(images=list(self.images) + list(other.images),
norm=self.norm + other.norm)
#*************************************************************
# Define empty arrays and matrix for the output data
#*************************************************************
hit_x = []
hit_y = []
hits = []
ret = np.zeros((len(data["hRIXS_det"]), bins))
#*************************************************************
# Handle each Aq image separately
#*************************************************************
for image, r in zip(data["hRIXS_det"], ret):
use_image = image.to_numpy()
#*************************************************************
# Treat background by optionally
# -subtracting dark image (self.USE_DARK)
# -masking the hot pixels (self.USE_DARK_MASK)
# Diffrent combinations of the two flags result
# in different actions
#*************************************************************
if self.USE_DARK:
#***************************************
# subtract dark image
#***************************************
use_image = use_image - self.dark_im_array
if self.USE_DARK_MASK:
#***************************************
# set masked pixels 0
#***************************************
use_image[self.dark_mask] = 0
else:
#***************************************
# dont subtract dark image, but set hot
# pixels to a dark baseline value
#***************************************
if self.USE_DARK_MASK:
use_image[self.dark_mask] = np.mean(use_image[self.MASK_AVG_Y,
self.MASK_AVG_X], (0, 1))
#*************************************************************
# Run centroiding on the preprocessed image
#*************************************************************
c = centroid(
use_image[self.X_RANGE, self.Y_RANGE].T,
threshold=self.THRESHOLD,
std_threshold=self.STD_THRESHOLD,
curvature=(self.CURVE_A, self.CURVE_B))
if not len(c):
continue
rc = np.array(c)
#*************************************************************
# If hits have been requested, append the hit data of the
# image to the lists of hit lists
#*************************************************************
if return_hits:
hit_x.append(rc[:, 0])
hit_y.append(rc[:, 1])
hits.append(rc)
#*************************************************************
# Assign the spectrum to the spectrum matrix ret. Iteration
# variable r points to the proper location of ret.
#*************************************************************
hy, hx = np.histogram(
rc[:, 0], bins=bins,
range=(0, self.Y_RANGE.stop - self.Y_RANGE.start))
r[:] = hy
#*************************************************************
# Setup and assing a linear energy grid
#*************************************************************
data = data.assign_coords(
energy=np.linspace(self.Y_RANGE.start, self.Y_RANGE.stop, bins)
* self.ENERGY_SLOPE + self.ENERGY_INTERCEPT)
#**********************************************
# If hits were requested, assign them to data
#**********************************************
if return_hits:
data = data.assign(hits=(("trainId"), hits),
xhits=(("trainId"), hit_x),
yhits=(("trainId"), hit_y))
#**********************************************
# Always assign the spectrum to data
#**********************************************
data = data.assign(spectrum=(("trainId", "energy"), ret))
return data
def integrate(self, data):
bins = self.Y_RANGE.stop - self.Y_RANGE.start
ret = np.zeros((len(data["hRIXS_det"]), bins - 20))
for image, r in zip(data["hRIXS_det"], ret):
if self.USE_DARK:
image = image - self.dark_image
r[:] = integrate(image.to_numpy()[self.X_RANGE, self.Y_RANGE].T, factor=1,
range=(10, bins - 10),
curvature=(self.CURVE_A, self.CURVE_B))
data = data.assign_coords(
energy=np.arange(self.Y_RANGE.start + 10, self.Y_RANGE.stop - 10)
* self.ENERGY_SLOPE + self.ENERGY_INTERCEPT)
return data.assign(spectrum=(("trainId", "energy"), ret))
aggregators = dict(
hRIXS_det=lambda x, dim: x.sum(dim=dim),
Delay=lambda x, dim: x.mean(dim=dim),
hRIXS_delay=lambda x, dim: x.mean(dim=dim),
hRIXS_norm=lambda x, dim: x.sum(dim=dim),
spectrum=lambda x, dim: x.sum(dim=dim),
)
def aggregator(self, da, dim):
agg = self.aggregators.get(da.name)
if agg is None:
return None
return agg(da, dim=dim)
def aggregate(self, ds, dim="trainId"):
ret = ds.map(self.aggregator, dim=dim)
ret = ret.drop_vars([n for n in ret if n not in self.aggregators])
return ret
def normalize(self, data):
return data.assign(normalized=data["spectrum"] / data["hRIXS_norm"])
Loading