Skip to content
Snippets Groups Projects
Commit ad0907d3 authored by Martin Teichmann's avatar Martin Teichmann
Browse files

Merge branch 'hrixs' into 'master'

hRIXS functions

See merge request !170
parents dc77837d e9b7b74b
No related branches found
No related tags found
1 merge request!170hRIXS functions
...@@ -66,6 +66,18 @@ mnemonics = { ...@@ -66,6 +66,18 @@ mnemonics = {
"M2BEND": ({'source': 'SA3_XTD10_MIRR-2/MOTOR/BENDER', "M2BEND": ({'source': 'SA3_XTD10_MIRR-2/MOTOR/BENDER',
'key': 'actualPosition.value', 'key': 'actualPosition.value',
'dim': None},), 'dim': None},),
"hRIXS_det": ({'source': 'SCS_HRIXS_DET/CAM/CAMERA:daqOutput',
'key': 'data.image.pixels',
'dim': ['x', 'y']},),
"chem_X": ({'source': 'SCS_CHEM_JET/MOTOR/MANA_X',
'key': 'actualPosition.value',
'dim': None},),
"chem_Y": ({'source': 'SCS_CHEM_JET/MOTOR/MANA_Y',
'key': 'actualPosition.value',
'dim': None},),
"chem_Z": ({'source': 'SCS_CHEM_JET/MOTOR/MANA_Z',
'key': 'actualPosition.value',
'dim': None},),
"VSLIT": ({'source': 'SA3_XTD10_VSLIT/MDL/BLADE', "VSLIT": ({'source': 'SA3_XTD10_VSLIT/MDL/BLADE',
'key': 'actualGap.value', 'key': 'actualGap.value',
'dim': None},), 'dim': None},),
...@@ -75,6 +87,12 @@ mnemonics = { ...@@ -75,6 +87,12 @@ mnemonics = {
"HSLIT": ({'source': 'SCS_XTD10_HSLIT/MDL/BLADE', "HSLIT": ({'source': 'SCS_XTD10_HSLIT/MDL/BLADE',
'key': 'actualGap.value', 'key': 'actualGap.value',
'dim': None},), 'dim': None},),
'key': 'actualPosition.value',
'dim': None},),
'key': 'actualPosition.value',
'dim': None},),
"transmission": ({'source': 'SA3_XTD10_VAC/MDL/GATT_TRANSMISSION_MONITOR', "transmission": ({'source': 'SA3_XTD10_VAC/MDL/GATT_TRANSMISSION_MONITOR',
'key': 'Estimated_Tr.value', 'key': 'Estimated_Tr.value',
'dim': None}, 'dim': None},
...@@ -5,6 +5,7 @@ from .dssc import * ...@@ -5,6 +5,7 @@ from .dssc import *
from .dssc_data import * from .dssc_data import *
from .dssc_misc import * from .dssc_misc import *
from .dssc_processing import * from .dssc_processing import *
from .hrixs import *
from .pes import * from .pes import *
from .xgm import * from .xgm import *
...@@ -16,6 +17,7 @@ __all__ = ( ...@@ -16,6 +17,7 @@ __all__ = (
+ dssc_data.__all__ + dssc_data.__all__
+ dssc_misc.__all__ + dssc_misc.__all__
+ dssc_processing.__all__ + dssc_processing.__all__
+ hrixs.__all__
+ pes.__all__ + pes.__all__
+ xgm.__all__ + xgm.__all__
) )
from functools import lru_cache
import numpy as np
from scipy.optimize import curve_fit
from scipy.signal import fftconvolve
import toolbox_scs as tb
__all__ = [
# -----------------------------------------------------------------------------
# Curvature
def find_curvature(image, frangex=None, frangey=None,
deg=2, axis=1, offset=100):
# Resolve arguments
x_range = (0, image.shape[1])
if frangex is not None:
x_range = (max(frangex[0], x_range[0]), min(frangex[1], x_range[1]))
y_range = (0, image.shape[0])
if frangex is not None:
y_range = (max(frangey[0], y_range[0]), min(frangey[1], y_range[1]))
axis_range = y_range if axis == 1 else x_range
axis_dim = image.shape[axis - 1]
# Get kernel
integral = image[slice(*y_range), slice(*x_range)].mean(axis=axis)
roi = np.ones([axis_range[1] - axis_range[0], axis_dim])
ref = roi * integral[:, np.newaxis]
# Get sliced image
slice_ = [slice(None), slice(None)]
slice_[axis - 1] = slice(max(axis_range[0] - offset, 0),
min(axis_range[1] + offset, axis_dim))
sliced = image[tuple(slice_)]
if axis == 0:
sliced = sliced.T
# Get curvature factor from cross correlation
crosscorr = fftconvolve(sliced,
ref[::-1, :],
axes=0, )
shifts = np.argmax(crosscorr, axis=0)
curv = np.polyfit(np.arange(axis_dim), shifts, deg=deg)
return curv[:-1][::-1]
def correct_curvature(image, factor=None, axis=1):
if factor is None:
if axis == 1:
image = image.T
ydim, xdim = image.shape
x = np.arange(xdim + 1)
y = np.arange(ydim + 1)
xx, yy = np.meshgrid(x[:-1] + 0.5, y[:-1] + 0.5)
xxn = xx - factor[0] * yy - factor[1] * yy ** 2
ret = np.histogramdd((xxn.flatten(), yy.flatten()),
bins=[x, y],
return ret if axis == 1 else ret.T
def get_spectrum(image, factor=None, axis=0,
pixel_range=None, energy_range=None, ):
start, stop = (0, image.shape[axis - 1])
if pixel_range is not None:
start = max(pixel_range[0] or start, start)
stop = min(pixel_range[1] or stop, stop)
edge = image.sum(axis=axis)[start:stop]
bins = np.arange(start, stop + 1)
centers = (bins[1:] + bins[:-1]) * 0.5
if factor is not None:
centers, edge = calibrate(centers, edge,
return centers, edge
# -----------------------------------------------------------------------------
# Energy calibration
def energy_calibration(channels, energies):
return np.polyfit(channels, energies, deg=1)
def calibrate(x, y=None, factor=None, range_=None):
if factor is not None:
x = np.polyval(factor, x)
if y is not None and range_ is not None:
start = np.argmin(np.abs((x - range_[0])))
stop = np.argmin(np.abs((x - range_[1])))
# Calibrated energies have a different direction
x, y = x[stop:start], y[stop:start]
return x, y
# -----------------------------------------------------------------------------
# Gaussian-related functions
FWHM_COEFF = 2 * np.sqrt(2 * np.log(2))
def gaussian_fit(x_data, y_data, offset=0):
Centre-of-mass and width. Lifted from image_processing.imageCentreofMass()
x0 = np.average(x_data, weights=y_data)
sx = np.sqrt(np.average((x_data - x0) ** 2, weights=y_data))
# Gaussian fit
baseline = y_data.min()
p_0 = (y_data.max(), x0 + offset, sx, baseline)
p_f, _ = curve_fit(gauss1d, x_data, y_data, p_0, maxfev=10000)
return p_f
except (RuntimeError, TypeError) as e:
return None
def gauss1d(x, height, x0, sigma, offset):
return height * np.exp(-0.5 * ((x - x0) / sigma) ** 2) + offset
def to_fwhm(sigma):
return abs(sigma * FWHM_COEFF)
# -----------------------------------------------------------------------------
# Centroid
THRESHOLD = 510 # pixel counts above which a hit candidate is assumed
CURVE_A = 2.19042931e-02 # curvature parameters as determined elsewhere
CURVE_B = -3.02191568e-07
def _esrf_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
gs = 2
base = image.mean()
cp = np.argwhere(image[gs // 2: -gs // 2, gs // 2: -gs // 2] > threshold) + np.array([gs // 2, gs // 2])
if len(cp) > 100000:
raise RuntimeError('Threshold too low or acquisition time too long')
res = []
for cy, cx in cp:
spot = image[cy - gs // 2: cy + gs // 2 + 1, cx - gs // 2: cx + gs // 2 + 1] - base
spot[spot < 0] = 0
if (spot > image[cy, cx]).sum() == 0:
mx = np.average(np.arange(cx - gs // 2, cx + gs // 2 + 1), weights=spot.sum(axis=0))
my = np.average(np.arange(cy - gs // 2, cy + gs // 2 + 1), weights=spot.sum(axis=1))
my -= (curvature[0] + curvature[1] * mx) * mx
res.append((my, mx))
return res
def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)):
"""find the position of photons with sub-pixel precision
A photon is supposed to have hit the detector if the intensity within a
2-by-2 square exceeds a threshold. In this case the position of the photon
is calculated as the center-of-mass in a 4-by-4 square.
Return the list of x,y coordinate pairs, corrected by the curvature.
base = image.mean()
corners = image[1:, 1:] + image[:-1, 1:] + image[1:, :-1] + image[:-1, :-1]
threshold = corners.mean() + 3.5 * corners.std()
middle = corners[1:-1, 1:-1]
candidates = (
(middle > threshold)
* (middle >= corners[:-2, 1:-1]) * (middle > corners[2:, 1:-1])
* (middle >= corners[1:-1, :-2]) * (middle > corners[1:-1, 2:])
* (middle >= corners[:-2, :-2]) * (middle > corners[2:, :-2])
* (middle >= corners[:-2, 2:]) * (middle > corners[2:, 2:]))
cp = np.argwhere(candidates)
if len(cp) > 10000:
raise RuntimeError(
"too many peaks, threshold too low or acquisition time too high")
res = []
for cy, cx in cp:
spot = image[cy: cy + 4, cx: cx + 4] - base
mx = np.average(np.arange(cx, cx + 4), weights=spot.sum(axis=0))
my = np.average(np.arange(cy, cy + 4), weights=spot.sum(axis=1))
my -= (curvature[0] + curvature[1] * mx) * mx
res.append((my, mx))
return res
centroid = _new_centroid
def decentroid(res):
res = np.array(res)
ret = np.zeros(shape=(res.max(axis=0) + 1).astype(int))
for cy, cx in res:
if cx > 0 and cy > 0:
ret[int(cy), int(cx)] += 1
return ret
# -----------------------------------------------------------------------------
# Integral
RANGE = [300, 400]
BINS = abs(np.subtract(*RANGE)) * FACTOR
def parabola(x, a, b, c=0):
return (a * x + b) * x + c
def integrate(image, factor=FACTOR, range=RANGE, curvature=(CURVE_A, CURVE_B), ):
image = image - image.mean()
x = np.arange(image.shape[1])[None, :]
y = np.arange(image.shape[0])[:, None]
ys = factor * (y - parabola(x, curvature[1], curvature[0]))
ysf = np.floor(ys)
rang = (factor * range[0], factor * range[1])
bins = rang[1] - rang[0]
lhy, lhx = np.histogram(ysf.ravel(), bins=bins, weights=((ys - ysf) * image).ravel(), range=rang)
rhy, rhx = np.histogram((ysf + 1).ravel(), bins=bins, weights=(((ysf + 1) - ys) * image).ravel(), range=rang)
lvy, lvx = np.histogram(ysf.ravel(), bins=bins, weights=(ys - ysf).ravel(), range=rang)
rvy, rvx = np.histogram((ysf + 1).ravel(), bins=bins, weights=((ysf + 1) - ys).ravel(), range=rang)
return (lhy + rhy) / (lvy + rvy)
# -----------------------------------------------------------------------------
# hRIXS class
class hRIXS:
# run
# image range
X_RANGE = np.s_[1300:-100]
Y_RANGE = np.s_[:]
# centroid
THRESHOLD = THRESHOLD # pixel counts above which a hit candidate is assumed
CURVE_A = CURVE_A # curvature parameters as determined elsewhere
# integral
BINS = abs(np.subtract(*RANGE)) * FACTOR
METHOD = 'centroid' # ['centroid', 'integral']
def set_params(cls, **params):
for key, value in params.items():
setattr(cls, key.upper(), value)
def __set_params(self, **params):
def get_params(cls, *params):
if not params:
params = ('proposal', 'x_range', 'y_range',
'threshold', 'curve_a', 'curve_b',
'factor', 'range', 'bins',
return {param: getattr(cls, param.upper()) for param in params}
def refresh(self):
cls = self.__class__
for cached in ['_centroid', '_integral']:
getattr(cls, cached).fget.cache_clear()
def __init__(self, images, norm=None):
self.images = images
self.norm = norm
# class/instance method compatibility
self.set_params = self.__set_params
def from_run(cls, runNB, proposal=None, first_wrong=False):
if proposal is None:
proposal = cls.PROPOSAL
run, data = tb.load(proposal, runNB=runNB, fields=['hRIXS_det'])
# Get slow train data
mnemo = tb.mnemonics_for_run(run)['SCS_slowTrain']
slow_train = run[mnemo['source'], mnemo['key']].ndarray().sum()
return cls(images=data['hRIXS_det'][1 if first_wrong else 0:].data,
def _centroid(self):
return sum((centroid(image[self.Y_RANGE, self.X_RANGE].T,
curvature=(self.CURVE_A, self.CURVE_B), )
for image in self.images), [])
def _centroid_spectrum(self, bins=None, range=None, normalize=True):
if bins is None:
bins = self.BINS
if range is None:
range = self.RANGE
r = np.array(self._centroid)
hy, hx = np.histogram(r[:, 0], bins=bins, range=range)
if normalize and self.norm is not None:
hy = hy / self.norm
return (hx[:-1] + hx[1:]) / 2, hy
def _integral(self):
return sum((integrate(image[self.Y_RANGE, self.X_RANGE].T,
curvature=(self.CURVE_A, self.CURVE_B))
for image in self.images))
def _integral_spectrum(self, normalize=True):
values = self._integral
if normalize and self.norm is not None:
values = values / self.norm
return np.arange(values.size), values
def corrected(self):
return decentroid(self._centroid)
def spectrum(self, normalize=True):
spec_func = (self._centroid_spectrum if self.METHOD.lower() == 'centroid'
else self._integral_spectrum)
return spec_func(normalize=normalize)
def __sub__(self, other):
px, py = self.spectrum()
mx, my = other.spectrum()
return (px + mx) / 2, py - my
def __add__(self, other):
return self.__class__(images=list(self.images) + list(other.images),
norm=self.norm + other.norm)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment