diff --git a/src/toolbox_scs/constants.py b/src/toolbox_scs/constants.py index bc412c2782baeed560d39ab939ce8b06cb0b6b30..8c70dd577821c0a25f732ed60945550a5e1db8c1 100644 --- a/src/toolbox_scs/constants.py +++ b/src/toolbox_scs/constants.py @@ -66,6 +66,18 @@ mnemonics = { "M2BEND": ({'source': 'SA3_XTD10_MIRR-2/MOTOR/BENDER', 'key': 'actualPosition.value', 'dim': None},), + "hRIXS_det": ({'source': 'SCS_HRIXS_DET/CAM/CAMERA:daqOutput', + 'key': 'data.image.pixels', + 'dim': ['x', 'y']},), + "chem_X": ({'source': 'SCS_CHEM_JET/MOTOR/MANA_X', + 'key': 'actualPosition.value', + 'dim': None},), + "chem_Y": ({'source': 'SCS_CHEM_JET/MOTOR/MANA_Y', + 'key': 'actualPosition.value', + 'dim': None},), + "chem_Z": ({'source': 'SCS_CHEM_JET/MOTOR/MANA_Z', + 'key': 'actualPosition.value', + 'dim': None},), "VSLIT": ({'source': 'SA3_XTD10_VSLIT/MDL/BLADE', 'key': 'actualGap.value', 'dim': None},), @@ -75,6 +87,12 @@ mnemonics = { "HSLIT": ({'source': 'SCS_XTD10_HSLIT/MDL/BLADE', 'key': 'actualGap.value', 'dim': None},), + "H_BLADE_LEFT": ({'source': 'SCS_XTD10_HSLIT/MOTOR/BLADE_LEFT', + 'key': 'actualPosition.value', + 'dim': None},), + "H_BLADE_RIGHT": ({'source': 'SCS_XTD10_HSLIT/MOTOR/BLADE_RIGHT', + 'key': 'actualPosition.value', + 'dim': None},), "transmission": ({'source': 'SA3_XTD10_VAC/MDL/GATT_TRANSMISSION_MONITOR', 'key': 'Estimated_Tr.value', 'dim': None}, diff --git a/src/toolbox_scs/detectors/__init__.py b/src/toolbox_scs/detectors/__init__.py index da11c8f515d266f771e05d9e76a52ae87d47e51f..b6c4ac5e0801a730e28860694b4a49f74e9aa6fc 100644 --- a/src/toolbox_scs/detectors/__init__.py +++ b/src/toolbox_scs/detectors/__init__.py @@ -5,6 +5,7 @@ from .dssc import * from .dssc_data import * from .dssc_misc import * from .dssc_processing import * +from .hrixs import * from .pes import * from .xgm import * @@ -16,6 +17,7 @@ __all__ = ( + dssc_data.__all__ + dssc_misc.__all__ + dssc_processing.__all__ + + hrixs.__all__ + pes.__all__ + xgm.__all__ ) diff --git a/src/toolbox_scs/detectors/hrixs.py b/src/toolbox_scs/detectors/hrixs.py new file mode 100644 index 0000000000000000000000000000000000000000..59d51e237a87a9a0b3d3d42cb22b2fd75ae7e2c6 --- /dev/null +++ b/src/toolbox_scs/detectors/hrixs.py @@ -0,0 +1,371 @@ +from functools import lru_cache + +import numpy as np +from scipy.optimize import curve_fit +from scipy.signal import fftconvolve + +import toolbox_scs as tb + + +__all__ = [ + 'find_curvature', + 'correct_curvature', + 'get_spectrum', + 'energy_calibration', + 'calibrate', + 'gaussian_fit', + 'to_fwhm' +] + + +# ----------------------------------------------------------------------------- +# Curvature + +def find_curvature(image, frangex=None, frangey=None, + deg=2, axis=1, offset=100): + # Resolve arguments + x_range = (0, image.shape[1]) + if frangex is not None: + x_range = (max(frangex[0], x_range[0]), min(frangex[1], x_range[1])) + y_range = (0, image.shape[0]) + if frangex is not None: + y_range = (max(frangey[0], y_range[0]), min(frangey[1], y_range[1])) + + axis_range = y_range if axis == 1 else x_range + axis_dim = image.shape[axis - 1] + + # Get kernel + integral = image[slice(*y_range), slice(*x_range)].mean(axis=axis) + roi = np.ones([axis_range[1] - axis_range[0], axis_dim]) + ref = roi * integral[:, np.newaxis] + + # Get sliced image + slice_ = [slice(None), slice(None)] + slice_[axis - 1] = slice(max(axis_range[0] - offset, 0), + min(axis_range[1] + offset, axis_dim)) + sliced = image[tuple(slice_)] + if axis == 0: + sliced = sliced.T + + # Get curvature factor from cross correlation + crosscorr = fftconvolve(sliced, + ref[::-1, :], + axes=0, ) + shifts = np.argmax(crosscorr, axis=0) + curv = np.polyfit(np.arange(axis_dim), shifts, deg=deg) + return curv[:-1][::-1] + + +def correct_curvature(image, factor=None, axis=1): + if factor is None: + return + + if axis == 1: + image = image.T + + ydim, xdim = image.shape + x = np.arange(xdim + 1) + y = np.arange(ydim + 1) + xx, yy = np.meshgrid(x[:-1] + 0.5, y[:-1] + 0.5) + xxn = xx - factor[0] * yy - factor[1] * yy ** 2 + ret = np.histogramdd((xxn.flatten(), yy.flatten()), + bins=[x, y], + weights=image.flatten())[0] + + return ret if axis == 1 else ret.T + + +def get_spectrum(image, factor=None, axis=0, + pixel_range=None, energy_range=None, ): + start, stop = (0, image.shape[axis - 1]) + if pixel_range is not None: + start = max(pixel_range[0] or start, start) + stop = min(pixel_range[1] or stop, stop) + + edge = image.sum(axis=axis)[start:stop] + bins = np.arange(start, stop + 1) + centers = (bins[1:] + bins[:-1]) * 0.5 + if factor is not None: + centers, edge = calibrate(centers, edge, + factor=factor, + range_=energy_range) + + return centers, edge + + +# ----------------------------------------------------------------------------- +# Energy calibration + + +def energy_calibration(channels, energies): + return np.polyfit(channels, energies, deg=1) + + +def calibrate(x, y=None, factor=None, range_=None): + if factor is not None: + x = np.polyval(factor, x) + + if y is not None and range_ is not None: + start = np.argmin(np.abs((x - range_[0]))) + stop = np.argmin(np.abs((x - range_[1]))) + # Calibrated energies have a different direction + x, y = x[stop:start], y[stop:start] + + return x, y + + +# ----------------------------------------------------------------------------- +# Gaussian-related functions + + +FWHM_COEFF = 2 * np.sqrt(2 * np.log(2)) + + +def gaussian_fit(x_data, y_data, offset=0): + """ + Centre-of-mass and width. Lifted from image_processing.imageCentreofMass() + """ + + x0 = np.average(x_data, weights=y_data) + sx = np.sqrt(np.average((x_data - x0) ** 2, weights=y_data)) + + # Gaussian fit + baseline = y_data.min() + p_0 = (y_data.max(), x0 + offset, sx, baseline) + try: + p_f, _ = curve_fit(gauss1d, x_data, y_data, p_0, maxfev=10000) + return p_f + except (RuntimeError, TypeError) as e: + print(e) + return None + + +def gauss1d(x, height, x0, sigma, offset): + return height * np.exp(-0.5 * ((x - x0) / sigma) ** 2) + offset + + +def to_fwhm(sigma): + return abs(sigma * FWHM_COEFF) + + +# ----------------------------------------------------------------------------- +# Centroid + +THRESHOLD = 510 # pixel counts above which a hit candidate is assumed +CURVE_A = 2.19042931e-02 # curvature parameters as determined elsewhere +CURVE_B = -3.02191568e-07 + + +def _esrf_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)): + gs = 2 + base = image.mean() + cp = np.argwhere(image[gs // 2: -gs // 2, gs // 2: -gs // 2] > threshold) + np.array([gs // 2, gs // 2]) + if len(cp) > 100000: + raise RuntimeError('Threshold too low or acquisition time too long') + + res = [] + for cy, cx in cp: + spot = image[cy - gs // 2: cy + gs // 2 + 1, cx - gs // 2: cx + gs // 2 + 1] - base + spot[spot < 0] = 0 + if (spot > image[cy, cx]).sum() == 0: + mx = np.average(np.arange(cx - gs // 2, cx + gs // 2 + 1), weights=spot.sum(axis=0)) + my = np.average(np.arange(cy - gs // 2, cy + gs // 2 + 1), weights=spot.sum(axis=1)) + my -= (curvature[0] + curvature[1] * mx) * mx + res.append((my, mx)) + return res + + +def _new_centroid(image, threshold=THRESHOLD, curvature=(CURVE_A, CURVE_B)): + """find the position of photons with sub-pixel precision + + A photon is supposed to have hit the detector if the intensity within a + 2-by-2 square exceeds a threshold. In this case the position of the photon + is calculated as the center-of-mass in a 4-by-4 square. + + Return the list of x,y coordinate pairs, corrected by the curvature. + """ + base = image.mean() + corners = image[1:, 1:] + image[:-1, 1:] + image[1:, :-1] + image[:-1, :-1] + threshold = corners.mean() + 3.5 * corners.std() + middle = corners[1:-1, 1:-1] + candidates = ( + (middle > threshold) + * (middle >= corners[:-2, 1:-1]) * (middle > corners[2:, 1:-1]) + * (middle >= corners[1:-1, :-2]) * (middle > corners[1:-1, 2:]) + * (middle >= corners[:-2, :-2]) * (middle > corners[2:, :-2]) + * (middle >= corners[:-2, 2:]) * (middle > corners[2:, 2:])) + cp = np.argwhere(candidates) + if len(cp) > 10000: + raise RuntimeError( + "too many peaks, threshold too low or acquisition time too high") + + res = [] + for cy, cx in cp: + spot = image[cy: cy + 4, cx: cx + 4] - base + mx = np.average(np.arange(cx, cx + 4), weights=spot.sum(axis=0)) + my = np.average(np.arange(cy, cy + 4), weights=spot.sum(axis=1)) + my -= (curvature[0] + curvature[1] * mx) * mx + res.append((my, mx)) + return res + + +centroid = _new_centroid + + +def decentroid(res): + res = np.array(res) + ret = np.zeros(shape=(res.max(axis=0) + 1).astype(int)) + for cy, cx in res: + if cx > 0 and cy > 0: + ret[int(cy), int(cx)] += 1 + return ret + + +# ----------------------------------------------------------------------------- +# Integral + +FACTOR = 3 +RANGE = [300, 400] +BINS = abs(np.subtract(*RANGE)) * FACTOR + + +def parabola(x, a, b, c=0): + return (a * x + b) * x + c + + +def integrate(image, factor=FACTOR, range=RANGE, curvature=(CURVE_A, CURVE_B), ): + image = image - image.mean() + x = np.arange(image.shape[1])[None, :] + y = np.arange(image.shape[0])[:, None] + ys = factor * (y - parabola(x, curvature[1], curvature[0])) + ysf = np.floor(ys) + rang = (factor * range[0], factor * range[1]) + bins = rang[1] - rang[0] + lhy, lhx = np.histogram(ysf.ravel(), bins=bins, weights=((ys - ysf) * image).ravel(), range=rang) + rhy, rhx = np.histogram((ysf + 1).ravel(), bins=bins, weights=(((ysf + 1) - ys) * image).ravel(), range=rang) + lvy, lvx = np.histogram(ysf.ravel(), bins=bins, weights=(ys - ysf).ravel(), range=rang) + rvy, rvx = np.histogram((ysf + 1).ravel(), bins=bins, weights=((ysf + 1) - ys).ravel(), range=rang) + return (lhy + rhy) / (lvy + rvy) + + +# ----------------------------------------------------------------------------- +# hRIXS class + + +class hRIXS: + # run + PROPOSAL = 2769 + + # image range + X_RANGE = np.s_[1300:-100] + Y_RANGE = np.s_[:] + + # centroid + THRESHOLD = THRESHOLD # pixel counts above which a hit candidate is assumed + CURVE_A = CURVE_A # curvature parameters as determined elsewhere + CURVE_B = CURVE_B + + # integral + FACTOR = FACTOR + RANGE = RANGE + BINS = abs(np.subtract(*RANGE)) * FACTOR + + METHOD = 'centroid' # ['centroid', 'integral'] + + @classmethod + def set_params(cls, **params): + for key, value in params.items(): + setattr(cls, key.upper(), value) + + def __set_params(self, **params): + self.__class__.set_params(**params) + self.refresh() + + @classmethod + def get_params(cls, *params): + if not params: + params = ('proposal', 'x_range', 'y_range', + 'threshold', 'curve_a', 'curve_b', + 'factor', 'range', 'bins', + 'method') + return {param: getattr(cls, param.upper()) for param in params} + + def refresh(self): + cls = self.__class__ + for cached in ['_centroid', '_integral']: + getattr(cls, cached).fget.cache_clear() + + def __init__(self, images, norm=None): + self.images = images + self.norm = norm + + # class/instance method compatibility + self.set_params = self.__set_params + + @classmethod + def from_run(cls, runNB, proposal=None, first_wrong=False): + if proposal is None: + proposal = cls.PROPOSAL + + run, data = tb.load(proposal, runNB=runNB, fields=['hRIXS_det']) + + # Get slow train data + mnemo = tb.mnemonics_for_run(run)['SCS_slowTrain'] + slow_train = run[mnemo['source'], mnemo['key']].ndarray().sum() + + return cls(images=data['hRIXS_det'][1 if first_wrong else 0:].data, + norm=slow_train) + + @property + @lru_cache() + def _centroid(self): + return sum((centroid(image[self.Y_RANGE, self.X_RANGE].T, + threshold=self.THRESHOLD, + curvature=(self.CURVE_A, self.CURVE_B), ) + for image in self.images), []) + + def _centroid_spectrum(self, bins=None, range=None, normalize=True): + if bins is None: + bins = self.BINS + if range is None: + range = self.RANGE + + r = np.array(self._centroid) + hy, hx = np.histogram(r[:, 0], bins=bins, range=range) + if normalize and self.norm is not None: + hy = hy / self.norm + + return (hx[:-1] + hx[1:]) / 2, hy + + @property + @lru_cache() + def _integral(self): + return sum((integrate(image[self.Y_RANGE, self.X_RANGE].T, + factor=self.FACTOR, + range=self.RANGE, + curvature=(self.CURVE_A, self.CURVE_B)) + for image in self.images)) + + def _integral_spectrum(self, normalize=True): + values = self._integral + if normalize and self.norm is not None: + values = values / self.norm + return np.arange(values.size), values + + @property + def corrected(self): + return decentroid(self._centroid) + + def spectrum(self, normalize=True): + spec_func = (self._centroid_spectrum if self.METHOD.lower() == 'centroid' + else self._integral_spectrum) + return spec_func(normalize=normalize) + + def __sub__(self, other): + px, py = self.spectrum() + mx, my = other.spectrum() + return (px + mx) / 2, py - my + + def __add__(self, other): + return self.__class__(images=list(self.images) + list(other.images), + norm=self.norm + other.norm)