Skip to content
Snippets Groups Projects

hRIXS functions

Merged Cammille Carinan requested to merge hrixs into master
4 unresolved threads
+ 371
0
from functools import lru_cache
import numpy as np
from scipy.optimize import curve_fit
from scipy.signal import fftconvolve
import toolbox_scs as tb
__all__ = [
'find_curvature',
'correct_curvature',
'get_spectrum',
'energy_calibration',
'calibrate',
'gaussian_fit',
'to_fwhm'
]
# -----------------------------------------------------------------------------
# Curvature
def find_curvature(image, frangex=None, frangey=None,
deg=2, axis=1, offset=100):
# Resolve arguments
x_range = (0, image.shape[1])
if frangex is not None:
x_range = (max(frangex[0], x_range[0]), min(frangex[1], x_range[1]))
y_range = (0, image.shape[0])
if frangex is not None:
y_range = (max(frangey[0], y_range[0]), min(frangey[1], y_range[1]))
axis_range = y_range if axis == 1 else x_range
axis_dim = image.shape[axis - 1]
# Get kernel
integral = image[slice(*y_range), slice(*x_range)].mean(axis=axis)
roi = np.ones([axis_range[1] - axis_range[0], axis_dim])
ref = roi * integral[:, np.newaxis]
# Get sliced image
slice_ = [slice(None), slice(None)]
slice_[axis - 1] = slice(max(axis_range[0] - offset, 0),
min(axis_range[1] + offset, axis_dim))
sliced = image[tuple(slice_)]
if axis == 0:
sliced = sliced.T
# Get curvature factor from cross correlation
crosscorr = fftconvolve(sliced,
ref[::-1, :],
axes=0, )
shifts = np.argmax(crosscorr, axis=0)
curv = np.polyfit(np.arange(axis_dim), shifts, deg=deg)
return curv[:-1][::-1]
def correct_curvature(image, factor=None, axis=1):
if factor is None:
return
if axis == 1:
image = image.T
ydim, xdim = image.shape
x = np.arange(xdim + 1)
y = np.arange(ydim + 1)
xx, yy = np.meshgrid(x[:-1] + 0.5, y[:-1] + 0.5)
xxn = xx - factor[0] * yy - factor[1] * yy ** 2
ret = np.histogramdd((xxn.flatten(), yy.flatten()),
bins=[x, y],
weights=image.flatten())[0]
return ret if axis == 1 else ret.T
def get_spectrum(image, factor=None, axis=0,
pixel_range=None, energy_range=None, ):
start, stop = (0, image.shape[axis - 1])
if pixel_range is not None:
start = max(pixel_range[0] or start, start)
stop = min(pixel_range[1] or stop, stop)
edge = image.sum(axis=axis)[start:stop]
bins = np.arange(start, stop + 1)
centers = (bins[1:] + bins[:-1]) * 0.5
if factor is not None:
centers, edge = calibrate(centers, edge,
factor=factor,
range_=energy_range)
return centers, edge
# -----------------------------------------------------------------------------
# Energy calibration
def energy_calibration(channels, energies):
return np.polyfit(channels, energies, deg=1)
def calibrate(x, y=None, factor=None, range_=None):
if factor is not None:
x = np.polyval(factor, x)
if y is not None and range_ is not None:
start = np.argmin(np.abs((x - range_[0])))
stop = np.argmin(np.abs((x - range_[1])))
# Calibrated energies have a different direction
x, y = x[stop:start], y[stop:start]
return x, y
# -----------------------------------------------------------------------------
# Gaussian-related functions
FWHM_COEFF = 2 * np.sqrt(2 * np.log(2))
def gaussian_fit(x_data, y_data, offset=0):
"""
Centre-of-mass and width. Lifted from image_processing.imageCentreofMass()
"""
x0 = np.average(x_data, weights=y_data)
sx = np.sqrt(np.average((x_data - x0) ** 2, weights=y_data))
# Gaussian fit
baseline = y_data.min()
p_0 = (y_data.max(), x0 + offset, sx, baseline)
try:
p_f, _ = curve_fit(gauss1d, x_data, y_data, p_0, maxfev=10000)
return p_f
except (RuntimeError, TypeError) as e:
print(e)
return None
def gauss1d(x, height, x0, sigma, offset):
return height * np.exp(-0.5 * ((x - x0) / sigma) ** 2) + offset
def to_fwhm(sigma):
return abs(sigma * FWHM_COEFF)
# -----------------------------------------------------------------------------
# Centroid
THRESHOLD = 510 # pixel counts above which a hit candidate is assumed
CURVE_A = 2.19042931e-02 # curvature parameters as determined elsewhere
CURVE_B = -3.02191568e-07
def _esrf_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
gs = 2
base = image.mean()
cp = np.argwhere(image[gs // 2: -gs // 2, gs // 2: -gs // 2] > threshold) + np.array([gs // 2, gs // 2])
if len(cp) > 100000:
raise RuntimeError('Threshold too low or acquisition time too long')
res = []
for cy, cx in cp:
spot = image[cy - gs // 2: cy + gs // 2 + 1, cx - gs // 2: cx + gs // 2 + 1] - base
spot[spot < 0] = 0
if (spot > image[cy, cx]).sum() == 0:
mx = np.average(np.arange(cx - gs // 2, cx + gs // 2 + 1), weights=spot.sum(axis=0))
my = np.average(np.arange(cy - gs // 2, cy + gs // 2 + 1), weights=spot.sum(axis=1))
my -= (curvature[0] + curvature[1] * mx) * mx
res.append((my, mx))
return res
def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
"""find the position of photons with sub-pixel precision
A photon is supposed to have hit the detector if the intensity within a
2-by-2 square exceeds a threshold. In this case the position of the photon
is calculated as the center-of-mass in a 4-by-4 square.
Return the list of x,y coordinate pairs, corrected by the curvature.
"""
base = image.mean()
corners = image[1:, 1:] + image[:-1, 1:] + image[1:, :-1] + image[:-1, :-1]
threshold = corners.mean() + 3.5 * corners.std()
middle = corners[1:-1, 1:-1]
candidates = (
(middle > threshold)
* (middle >= corners[:-2, 1:-1]) * (middle > corners[2:, 1:-1])
* (middle >= corners[1:-1, :-2]) * (middle > corners[1:-1, 2:])
* (middle >= corners[:-2, :-2]) * (middle > corners[2:, :-2])
* (middle >= corners[:-2, 2:]) * (middle > corners[2:, 2:]))
cp = np.argwhere(candidates)
if len(cp) > 10000:
raise RuntimeError(
"too many peaks, threshold too low or acquisition time too high")
res = []
for cy, cx in cp:
spot = image[cy: cy + 4, cx: cx + 4] - base
mx = np.average(np.arange(cx, cx + 4), weights=spot.sum(axis=0))
my = np.average(np.arange(cy, cy + 4), weights=spot.sum(axis=1))
my -= (curvature[0] + curvature[1] * mx) * mx
res.append((my, mx))
return res
centroid = _new_centroid
def decentroid(res):
res = np.array(res)
ret = np.zeros(shape=(res.max(axis=0) + 1).astype(int))
for cy, cx in res:
if cx > 0 and cy > 0:
ret[int(cy), int(cx)] += 1
return ret
# -----------------------------------------------------------------------------
# Integral
FACTOR = 3
RANGE = [300, 400]
BINS = abs(np.subtract(*RANGE)) * FACTOR
def parabola(x, a, b, c=0):
return (a * x + b) * x + c
def integrate(image, factor=FACTOR, range=RANGE, curvature=(CURVE_A, CURVE_B), ):
image = image - image.mean()
x = np.arange(image.shape[1])[None, :]
y = np.arange(image.shape[0])[:, None]
ys = factor * (y - parabola(x, curvature[1], curvature[0]))
ysf = np.floor(ys)
rang = (factor * range[0], factor * range[1])
bins = rang[1] - rang[0]
lhy, lhx = np.histogram(ysf.ravel(), bins=bins, weights=((ys - ysf) * image).ravel(), range=rang)
rhy, rhx = np.histogram((ysf + 1).ravel(), bins=bins, weights=(((ysf + 1) - ys) * image).ravel(), range=rang)
lvy, lvx = np.histogram(ysf.ravel(), bins=bins, weights=(ys - ysf).ravel(), range=rang)
rvy, rvx = np.histogram((ysf + 1).ravel(), bins=bins, weights=((ysf + 1) - ys).ravel(), range=rang)
return (lhy + rhy) / (lvy + rvy)
# -----------------------------------------------------------------------------
# hRIXS class
class hRIXS:
# run
PROPOSAL = 2769
# image range
X_RANGE = np.s_[1300:-100]
Y_RANGE = np.s_[:]
# centroid
THRESHOLD = THRESHOLD # pixel counts above which a hit candidate is assumed
CURVE_A = CURVE_A # curvature parameters as determined elsewhere
CURVE_B = CURVE_B
# integral
FACTOR = FACTOR
RANGE = RANGE
BINS = abs(np.subtract(*RANGE)) * FACTOR
METHOD = 'centroid' # ['centroid', 'integral']
@classmethod
def set_params(cls, **params):
for key, value in params.items():
setattr(cls, key.upper(), value)
def __set_params(self, **params):
self.__class__.set_params(**params)
self.refresh()
@classmethod
def get_params(cls, *params):
if not params:
params = ('proposal', 'x_range', 'y_range',
'threshold', 'curve_a', 'curve_b',
'factor', 'range', 'bins',
'method')
return {param: getattr(cls, param.upper()) for param in params}
def refresh(self):
cls = self.__class__
for cached in ['_centroid', '_integral']:
getattr(cls, cached).fget.cache_clear()
def __init__(self, images, norm=None):
self.images = images
self.norm = norm
# class/instance method compatibility
self.set_params = self.__set_params
@classmethod
def from_run(cls, runNB, proposal=None, first_wrong=False):
if proposal is None:
proposal = cls.PROPOSAL
run, data = tb.load(proposal, runNB=runNB, fields=['hRIXS_det'])
# Get slow train data
mnemo = tb.mnemonics_for_run(run)['SCS_slowTrain']
slow_train = run[mnemo['source'], mnemo['key']].ndarray().sum()
return cls(images=data['hRIXS_det'][1 if first_wrong else 0:].data,
norm=slow_train)
@property
@lru_cache()
def _centroid(self):
return sum((centroid(image[self.Y_RANGE, self.X_RANGE].T,
threshold=self.THRESHOLD,
curvature=(self.CURVE_A, self.CURVE_B), )
for image in self.images), [])
def _centroid_spectrum(self, bins=None, range=None, normalize=True):
if bins is None:
bins = self.BINS
if range is None:
range = self.RANGE
r = np.array(self._centroid)
hy, hx = np.histogram(r[:, 0], bins=bins, range=range)
if normalize and self.norm is not None:
hy = hy / self.norm
return (hx[:-1] + hx[1:]) / 2, hy
@property
@lru_cache()
def _integral(self):
return sum((integrate(image[self.Y_RANGE, self.X_RANGE].T,
factor=self.FACTOR,
range=self.RANGE,
curvature=(self.CURVE_A, self.CURVE_B))
for image in self.images))
def _integral_spectrum(self, normalize=True):
values = self._integral
if normalize and self.norm is not None:
values = values / self.norm
return np.arange(values.size), values
@property
def corrected(self):
return decentroid(self._centroid)
def spectrum(self, normalize=True):
spec_func = (self._centroid_spectrum if self.METHOD.lower() == 'centroid'
else self._integral_spectrum)
return spec_func(normalize=normalize)
def __sub__(self, other):
px, py = self.spectrum()
mx, my = other.spectrum()
return (px + mx) / 2, py - my
def __add__(self, other):
    • I don't like that the result of this is directly a spectrum, because this prevents us from adding more than two runs. run3 + run5 already returns a spectrum, doing run3 + run5 + run7 won't fly. So it would be better to require (run3 + run5 + run7).spectrum().

Please register or sign in to reply
return self.__class__(images=list(self.images) + list(other.images),
norm=self.norm + other.norm)
Loading