Something went wrong on our end
-
Thomas Kluyver authoredThomas Kluyver authored
agipdutils.py 25.24 KiB
import copy
from typing import Tuple
import numpy as np
from scipy.signal import cwt, find_peaks_cwt, ricker
from sklearn.mixture import GaussianMixture
from sklearn.preprocessing import StandardScaler
from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution
def assemble_constant_dict(
corr_bools,
pc_bools,
memory_cells,
bias_voltage,
gain_setting,
acquisition_rate,
photon_energy,
beam_energy=None,
only_dark=False,
gain_mode=AgipdGainMode.ADAPTIVE_GAIN,
integration_time=None
):
"""
Assemble a dictionary with the iCalibrationDB constant names and
the operating conditions for retrieving the required constants
for correction.
:param corr_bools: (Dict) A dict of booleans for applying
specific corrections
:param pc_bools: (List) A list of booleans to enable SlopesPC retrieval
:param memory_cells: (Int) Number of memory cells
:param bias_voltage: (Int) Bias Voltage
:param gain_setting: (Float) Gain setting
:param acquisition_rate: (Float) Acquisition rate
:param photon_energy: (Float) Photon energy
:param integration_time: (Float) Integration time
:param beam_energy: (Float) Beam Energy
:param only_dark: (Bool) Indicating a retrieval for dark constants only from db
:param gain_mode: Operation mode of the detector (default to adaptive gain)
:return: const_dict: (Dict) An assembled dictionary that can be used
to retrieve the required constants
"""
darkcond = [
"Dark",
{
"memory_cells": memory_cells,
"bias_voltage": bias_voltage,
"acquisition_rate": acquisition_rate,
"gain_setting": gain_setting,
"gain_mode": gain_mode,
"integration_time": integration_time,
"pixels_x": 512,
"pixels_y": 128,
},
]
const_dict = {
"Offset": ["zeros", (128, 512, memory_cells, 3), darkcond],
"Noise": ["zeros", (128, 512, memory_cells, 3), darkcond],
"ThresholdsDark": ["ones", (128, 512, memory_cells, 5), darkcond],
"BadPixelsDark": ["zeros", (128, 512, memory_cells, 3), darkcond],
}
if not (corr_bools.get("only_offset") or only_dark):
if any(pc_bools):
const_dict["BadPixelsPC"] = ["zeros", (memory_cells, 128, 512), darkcond]
const_dict["SlopesPC"] = ["ones", (128, 512, memory_cells, 10), darkcond]
if corr_bools.get("xray_corr"):
# Add illuminated conditions
illumcond = [
"Illuminated",
{"beam_energy": beam_energy, "photon_energy": photon_energy},
]
illumcond[1].update(darkcond[1])
const_dict["BadPixelsFF"] = ["zeros", (128, 512, memory_cells), illumcond]
const_dict["SlopesFF"] = ["ones", (128, 512, memory_cells, 2), illumcond]
return const_dict
# contiguous_regions() by Joe Kington on Stackoverflow
# https://stackoverflow.com/a/4495197/434217
# Used here under Stackoverflow's default CC-BY-SA 3.0 license.
def contiguous_regions(condition):
"""Finds contiguous True regions of the boolean array "condition". Returns
a 2D array where the first column is the start index of the region and the
second column is the end index."""
# Find the indices of changes in "condition"
d = np.diff(condition)
idx, = d.nonzero()
# We need to start things after the change in "condition". Therefore,
# we'll shift the index by 1 to the right.
idx += 1
if condition[0]:
# If the start of condition is True prepend a 0
idx = np.r_[0, idx]
if condition[-1]:
# If the end of condition is True, append the length of the array
idx = np.r_[idx, condition.size] # Edit
# Reshape the result into two columns
idx.shape = (-1,2)
return idx
def get_shadowed_stripe(data, threshold, fraction):
"""
Return 1D bool array of which pixel rows are shadowed.
Shadowed region is presented by the list of lines of pixels along
numpy axis 1. Regions are defined with respect of threshold
and fraction. Margin of one pixel is used.
Double-pixels are excluded, so only regular square pixels are used.
:param data: (numpy.ndarray, rank=2) offset corrected single image
:param threshold: (float) a threshold, below which
pixes is considered as dark
:param fraction: (float) a fraction of pixels in a line along axis 1
below which a full line is considered as dark
"""
# Identify rows with > fraction of their pixels < threshold
npx_all = np.count_nonzero(~np.isnan(data), axis=1)
npx_sh = np.count_nonzero(data < threshold, axis=1)
rows_to_use = npx_sh / npx_all > fraction
# TODO: is this necessary?
# The previous code excludes the first and last rows that would otherwise
# be considered shadowed, with no explanation of why.
# For now, I'll preserve the behaviour. -TK
row_idxs = np.nonzero(rows_to_use)[0]
if len(row_idxs) > 0:
rows_to_use[row_idxs[0]] = False
rows_to_use[row_idxs[-1]] = False
# Ignore rows with double-width pixels
rows_to_use[64:512:64] = False
rows_to_use[63:511:64] = False
# TODO: is this necessary?
# The previous code here and in baseline_correct_via_stripe ignored any
# stripes <= 2 pixels width. Is this by design, or just an accident of
# ignoring the double-width pixels (which are in stripes of 2 together)?
# For now, I'll preserve the behaviour. -TK
shadow_bounds = contiguous_regions(rows_to_use)
stripe_widths = shadow_bounds[:, 1] - shadow_bounds[:, 0]
stripes_too_narrow = shadow_bounds[stripe_widths <= 2]
for start, end in stripes_too_narrow:
rows_to_use[start:end] = False
return rows_to_use
def baseline_correct_via_stripe(d, g, m, frac_high_med):
""" Correct baseline shifts using shadowed stripes
:param d: the data to correct, should be offset corrected single image
:param frac_high_med: ratio of high to medium PC slopes
:param g: gain array matching `d` array
:param m: bad pixel mask
"""
# Restrict the mask to those bad pixels obtained from darks.
# This has ben introduced since the BadPixelsFF constants can add
# additional masking that causes the baseline correction to fail due
# to the abort conditions len(idx) < 3 below.
# (see calibration/planning#96)
m = m & (BadPixels.OFFSET_OUT_OF_THRESHOLD |
BadPixels.NOISE_OUT_OF_THRESHOLD |
BadPixels.OFFSET_NOISE_EVAL_ERROR |
BadPixels.NO_DARK_DATA)
dd = copy.copy(d)
dd[g != 0] = np.nan # only high gain data
dd[m != 0] = np.nan # only good pixels
sh_rows = get_shadowed_stripe(dd, 30, 0.95)
if sh_rows.sum() < 3:
return d, 0
shift = np.nanmedian(dd[sh_rows, :])
d[g == 0] -= shift
d[g == 1] -= shift / frac_high_med
return d, shift
def baseline_correct_via_noise(d, noise, g, threshold):
""" Correct baseline shifts by evaluating position of the noise peak
:param: d: the data to correct, should be a single image
:param noise: the mean noise for the cell id of `d`
:param g: gain array matching `d` array
:param threshold: threshold below which corrected is not performed
Correction is done by identifying the left-most (significant) peak
in the histogram of `d` and shifting it to be centered on 0.
This is done via a continous wavelet transform (CWT), using a Ricker
wavelet.
Only high gain data is evaulated, and data larger than 50 ADU,
equivalent of roughly a 9 keV photon is ignored.
Two passes are executed: the first shift is accurate to 10 ADU and
will catch baseline shifts smaller then -2000 ADU. CWT is evaluated
for peaks of widths one, three and five time the noise.
The baseline is then shifted by the position of the summed CWT maximum.
In a second pass histogram is evaluated within a range of
+- 5*noise of the initial shift value, for peak of width noise.
Baseline shifts larger than the maximum threshold or positive shifts
larger 10 are discarded, and the original data is returned, otherwise
the shift corrected data is returned.
"""
seln = (g == 0) & (d <= 50)
h, e = np.histogram(d[seln], bins=210, range=(-2000, 100))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, 3. * noise, 5. * noise])
except:
return d, 0
cwtall = np.sum(cwtmatr, axis=0)
marg = np.argmax(cwtall)
pc = c[marg]
high_res_range = (d > pc - 5 * noise) & (d < pc + 5 * noise)
h, e = np.histogram(d[seln & high_res_range], bins=200,
range=(pc - 5 * noise, pc + 5 * noise))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, ])
except:
return d, 0
marg = np.argmax(cwtmatr)
pc = c[marg]
# too large shift to be easily decernable via noise
if pc > 10 or pc < threshold:
return d, 0
return d - pc, pc
def correct_baseline_via_hist(d, pcm, g):
""" Correct baseline shifts by matching edges of high and medium
gain histogram
:param d: single image to correct
:param pcm: relative gain slope for medium gain of each pixel in `d`
:param g: gain values for `d`
As a preparation the median value of medium gain pixels is shifted in
increments of 50 ADU as long as it is negative.
Correction is then performed by evaluating histograms for high gain
and medium gain pixels in `d`. The right-most significant bin of the
high gain histogram is then shifted such that it coincides with the
left-most significant bin of the medium gain histogram. Significant
here means that bin counts are at least 10% of the average bin count
of the histogram for all bins larger 10 counts. This initial evaluation
uses histograms in range between -10000 and +10000 ADU with a
resolution of 100 ADU per bin. The shift required to match both
histogram borders is an initial estimate for the baseline shift.
Finally, the mean bin count of the five outermost bins of the high and
medium gain histograms is compared. The baseline is shifted by
increments of 1 ADU until they are within 20% of each other.
From this point on additional shifts are performed while the
deviation stays within 30% of each other. The final shift is then
evaluated as the point of minimal deviation of these values.
A maximum iteration count of 200 is imposed on the procedure. If no
convergence is reached within this limit, the original data is
returned, otherwise the baseline corrected data is returned.
"""
dd = copy.copy(d)
# shift while the medium gain produces negative values on average
pc = 0
it = 0
max_it = 100
while np.nanmedian(dd[g == 1] - pc) < 0:
pc -= 50
if it > max_it:
return d, 0
it += 1
def min_hist_distance(pc: int,
bins: int = 100,
ran: Tuple[int, int] = (-10000, 10000),
minbin: int = 10) -> float:
hh, e = np.histogram(dd[g == 0] - pc, bins=bins, range=ran)
hm, e = np.histogram((dd[g == 1] - pc) * pcm[g == 1], bins=bins,
range=ran)
# right most significant value of high gain histogram
hhm = np.nanmean(hh[hh > minbin])
rmh = hh.size - np.argmax((hh > 0.1 * hhm)[::-1])
# left most significant value of high gain histogram
hmm = np.nanmean(hm[hm > minbin])
lmm = np.argmax((hm > 0.1 * hmm))
med_pcm = np.nanmedian(pcm[g == 1])
eh = e[rmh]
el = e[lmm]
pc += (el - eh) / (med_pcm - 1)
return pc
# set a pc one halfstep higher than initial guesstimate
pc += 50
pc = min_hist_distance(pc)
# now start rolling back until we have similar signal levels
def comp_signal_level(pc, minbin=1):
hh, e = np.histogram(dd[g == 0] - pc, bins=100, range=(5000, 7000))
hm, e = np.histogram((dd[g == 1] - pc) * pcm[g == 1], bins=100,
range=(5000, 7000))
# right most significant value of high gain histogram
hhm = np.nanmean(hh[hh > minbin])
rmh = hh.size - np.argmax((hh > 0.1 * hhm)[::-1])
# left most significant value of high gain histogram
hmm = np.nanmean(hm[hm > minbin])
lmm = np.argmax((hm > 0.1 * hmm))
reg = 5
ret = np.abs(np.sum(hh[rmh - reg:rmh]) - np.sum(hm[lmm:lmm + reg]))
ret /= np.sum(hh[rmh - reg:rmh])
return ret
it = 0
max_it = 200
opc = pc
m_it_break = False
while comp_signal_level(pc) > 0.2:
pc += 1
if it > max_it:
pc = opc
m_it_break = True
break
it += 1
it = 0
if not m_it_break:
pcs = [pc]
slevs = []
slev = comp_signal_level(pc)
slevs.append(slev)
while slev < 0.3:
pc += 1
slev = comp_signal_level(pc)
slevs.append(slev)
pcs.append(pc)
it += 1
pc = pcs[np.argmin(slevs)]
return d - pc, pc
def correct_baseline_via_hist_asic(d, g):
""" Correct diagonal falloff on ASICs by matching corner histograms
:param d: single image data
:param g: gain values for `d`
Corrections are performed for the top and bottom row of ASICs
seperately.
In easch row, starting from the beam-hole closest ASIC, the histogram
of a square region of size 8x8 pixels is evaluted for its maximum
bin count location (only medium gain values), and compared to a
histogram produced from a neighbouring region on the next ASIC.
Double sized pixels are avoided.
The reasoning is that the histograms should not dramatically differ
for continuously distributed photon events. Each ASIC is then shifted
such that histograms match and the corrected data is returned.
Warning: this feature is very experimental!
"""
rsize = 8
def hist_ends(dm, bins=5000, ran=(-5000, 5000),
minbin=4 / (16 / rsize)):
hm, e = np.histogram(dm, bins=bins, range=ran)
# left most significant value of medium gain histogram
# hmm = np.nanmean(hm[hm > minbin])
lmm = np.argmax((hm > minbin))
return e[lmm], np.sum(hm)
for i in range(1, 8):
# upper asics
casic = np.concatenate(
(d[i * 64 - rsize:i * 64 - 1, 64:64 + rsize].flatten(),
d[i * 64 + 1:i * 64 + rsize, 64 - rsize:64].flatten()))
casic_g = np.concatenate(
(g[i * 64 - rsize:i * 64 - 1, 64:64 + rsize].flatten(),
g[i * 64 + 1:i * 64 + rsize, 64 - rsize:64].flatten()))
idxm = casic_g == 1
cme, cms = hist_ends(casic[idxm])
asic = d[i * 64 + 1:i * 64 + rsize, 64:64 + rsize].flatten()
asic_g = g[i * 64 + 1:i * 64 + rsize, 64:64 + rsize].flatten()
idxm = asic_g == 1
me, ms = hist_ends(asic[idxm])
if cms > rsize * rsize / 10 and ms > rsize * rsize / 10:
pc = cme - me
else:
pc = 0
if np.abs(pc) > 500:
# somthing when wrong
pc = 0
d[i * 64:(i + 1) * 64, 64:] += pc
for i in range(0, 7):
# lower asics
casic = np.concatenate((d[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
64:64 + rsize].flatten(),
d[(i + 1) * 64 + 1:(i + 1) * 64 + rsize,
64 - rsize:64].flatten()))
casic_g = np.concatenate((g[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
64:64 + rsize].flatten(),
g[(i + 1) * 64 + 1:(i + 1) * 64 + rsize,
64 - rsize:64].flatten()))
idxm = casic_g == 1
cme, cms = hist_ends(casic[idxm])
asic = d[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
64 - rsize:64].flatten()
asic_g = g[(i + 1) * 64 - rsize:(i + 1) * 64 - 1,
64 - rsize:64].flatten()
idxm = asic_g == 1
me, ms = hist_ends(asic[idxm])
if cms > rsize * rsize / 10 and ms > rsize * rsize / 10:
pc = cme - me
else:
pc = 0
if np.abs(pc) > 500:
# somthing went wrong
pc = 0
d[i * 64:(i + 1) * 64, :64] += pc
return d, np.nan
def make_noisy_adc_mask(bmask, noise_threshold):
""" Mask entire ADC if they are noise above a relative threshold
"""
msk = np.count_nonzero(
((bmask & BadPixels.NOISE_OUT_OF_THRESHOLD != 0)
| (bmask & BadPixels.OFFSET_NOISE_EVAL_ERROR != 0)).astype(
np.int), axis=0)
fmask = np.zeros_like(msk).astype(np.uint32)
adc_i = bmask.shape[1] // 8
adc_j = bmask.shape[2] // 8
for i in range(adc_i):
for j in range(adc_j):
adc_cnt = np.count_nonzero(
msk[i * adc_i:(i + 1) * adc_i, j * adc_j:(j + 1) * adc_j] >
bmask.shape[0] // 32)
if adc_cnt / (adc_i * adc_j) >= noise_threshold:
fmask[i * adc_i:(i + 1) * adc_i,
j * adc_j:(j + 1) * adc_j] = BadPixels.NOISY_ADC
return fmask
def match_asic_borders(d, chunk=8, channel=0):
""" Match border pixels of the two ASIC rows to the same median value
:param d: single image data
:param chunk: number of pixels on each ASIC boundary to generate
mean values for
:param channel: Module index of given data
Each ASIC has `n=64/chunk` mean values calculated along its two border
pixel rows. The deviation of chunk pairs is then evaluated and the
upper/lower ASICs are shifted by the mean deviation for the
right/left hemisphere of the detector.
The corrected data is returned.
"""
for i in range(8):
these_asics = d[:, i * 64:(i + 1) * 64, :]
diffs = np.zeros((d.shape[0], 8))
for k in range(64 // chunk):
ldat = these_asics[:, k * chunk:(k + 1) * chunk, 62:64]
lowerasic = np.nanmedian(ldat, axis=(1, 2))
udat = these_asics[:, k * chunk:(k + 1) * chunk, 64:66]
upperasic = np.nanmedian(udat, axis=(1, 2))
low_up = lowerasic - upperasic
up_low = upperasic - lowerasic
diff = low_up if channel < 8 else up_low
diffs[:, k] = diff
mn = np.nanmean(diffs, axis=1)[:, None, None] / 2
if channel < 8:
d[:, i * 64:(i + 1) * 64, 64:] += mn
d[:, i * 64:(i + 1) * 64, :64] -= mn
else:
d[:, i * 64:(i + 1) * 64, :64] += mn
d[:, i * 64:(i + 1) * 64, 64:] -= mn
return d
def melt_snowy_pixels(raw, im, gain, rgain, resolution=None):
""" Identify (and optionally interpolate) 'snowy' pixels
:param raw: raw data
:param im: offset and relative gain corrected data:
:param gain: gain values for `raw`
:param rgain: raw gain data, scaled by the threshold for
high-to-medium gain switching
:param resolution: resolution strategy, should be of enum-type
`SnowResolution`
Snowy pixels are pixels which are already identified as pixels in
the medium gain stage by their gain values, but which have
transitional image values between the largest high gain value and
the smalles medium gain value. As these values may be encountered again
in the context of medium gain, they are ambiguous and cannot
readily be identified by simple thresholding alone.
It is attempted to identify these snowy pixels by using a Gaussian
mixture clustering algorithm acting on multispectral input.
Positions in the cluster are identified as follows:
x: abs(p_r-p_bg)*p_rg**2
y: p_r
where p_r is a given pixel raw value, p_bg is the mean value of the
8 direct neighbor pixels, and p_rg is the raw gain value of the pixel.
Note that these cluster params are purely empirically derived,
and do not have any connection to ASIC design etc.
Only pixels in medium gain are evaluated, and evaluation is
image-wise, but subdivided further into ASICs.
The so-created graph [[x_0, x_1, ..., x_n], [y_0, y_1, ..., y_n]] is
scaled using a `sklearn.StandardScaler` and input into a two-component
`GaussianMixture` clustering algorithm. This results in a set of
labels, identifying pixels as likely snowy, or not. However,
for a given image we do not know which label is which from the
output. Hence, labels are differentiated under the assumption that
snowy pixel occur to be out-of-context, i.e. their surrounding pixels
are more likely at lower signal values, than if the pixel is in a region
were gain switching led to a large value.
Depending on the resolution strategy so-identified pixels are either
set to `np.nan` or to the interpolated value of the direct (high-gain)
neighbors.
The corrected data is returned alongside an updated gain mask and bad
pixel
mask.
"""
snow_mask = np.zeros(im.shape, np.uint32)
for k in range(im.shape[0]):
for i in range(8):
for j in range(2):
fidx = gain[k, i * 64:(i + 1) * 64,
j * 64:(j + 1) * 64] == 1
fidx_h = gain[k, i * 64:(i + 1) * 64,
j * 64:(j + 1) * 64] == 0
# need at least two pixels in medium gain to be able to
# cluster in two groups
if np.count_nonzero(fidx) < 2:
continue
# data for a given asic
asic = raw[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
asic_g = gain[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
asic_r = rgain[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
asic_h = copy.copy(asic)
asic_h[~fidx_h] = np.nan
# calculate the background of by averaging the 8 direct
# neighbours of each pixel
rl = np.roll(asic, 1, axis=0)
rr = np.roll(asic, -1, axis=0)
ru = np.roll(asic, 1, axis=1)
rd = np.roll(asic, -1, axis=1)
rlu = np.roll(rl, 1, axis=1)
rld = np.roll(rl, -1, axis=1)
rru = np.roll(rr, 1, axis=1)
rrd = np.roll(rr, -1, axis=1)
bg = (rl + rr + ru + rd + rlu + rld + rru + rrd) / 8
# calculate the background of by averaging the 8 direct
# neighbours of each pixel
# here only for high gain pixels
rl = np.roll(asic_h, 1, axis=0)
rr = np.roll(asic_h, -1, axis=0)
ru = np.roll(asic_h, 1, axis=1)
rd = np.roll(asic_h, -1, axis=1)
rlu = np.roll(asic_h, 1, axis=1)
rld = np.roll(asic_h, -1, axis=1)
rru = np.roll(asic_h, 1, axis=1)
rrd = np.roll(asic_h, -1, axis=1)
bg_h = np.nanmean(
np.array([rl, rr, ru, rd, rlu, rld, rru, rrd]), axis=0)
# construct a graph
graph = np.array(
[np.abs(asic[fidx] - bg[fidx]) * asic_r[fidx] ** 2,
asic[fidx]]).T
# scale it
graph = StandardScaler().fit_transform(graph)
# perform clustering
spc = GaussianMixture(n_components=2, random_state=0)
spc.fit(graph)
# and get labels
labels = spc.predict(graph)
asic = im[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64]
asic_msk = snow_mask[k, i * 64:(i + 1) * 64,
j * 64:(j + 1) * 64]
mim = asic[fidx]
asicb = bg_h
mimb = asicb[fidx]
mimg = asic_g[fidx]
mmsk = asic_msk[fidx]
# identify which labels are which
mn1 = np.nanmean(bg[fidx][labels == 0])
mn2 = np.nanmean(bg[fidx][labels == 1])
implabel = 1
if mn1 > mn2:
implabel = 0
# if we've misidentified, then we will have to many
# snowy pixels
# happens if the ASIC is generally on a high signal level
if np.count_nonzero([labels == implabel]) > 64 * 64 / 3:
continue
# or a large portion of misidenfied label covers the ASIC
if np.count_nonzero([labels != implabel]) > 0.8 * 64 * 64:
continue
# set to NaN if requested
if resolution is SnowResolution.NONE:
mim[labels == implabel] = np.nan
# or use the interpolated value
elif resolution is SnowResolution.INTERPOLATE:
mim[labels == implabel] = mimb[labels == implabel]
mimg[labels == implabel] = 0
# identify these pixels in a bad pixel mask
mmsk[labels == implabel] = BadPixels.INTERPOLATED
# now set back to data
asic[fidx] = mim
asic_g[fidx] = mimg
asic_msk[fidx] = mmsk
im[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64] = asic
gain[k, i * 64:(i + 1) * 64, j * 64:(j + 1) * 64] = asic_g
snow_mask[k, i * 64:(i + 1) * 64,
j * 64:(j + 1) * 64] = asic_msk
return im, gain, snow_mask