Skip to content
Snippets Groups Projects
Commit 320dde54 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

get memory cells and acquistion rate from file

parent ac0bdeca
No related branches found
No related tags found
1 merge request!309Back_propagation+Fixes/update agipd ff
%% Cell type:markdown id: tags:
# Gain Characterization (Flat Fields) #
The following code characterizes the gain of the AGIPD detector from flat field data, i.e. data with X-rays of defined intensity. This data should fullfil the following requirements:
* intensity should be such that single photon peaks are visible
* data for all modules should be present
* no shadowing should occur on any of the modules
* each pixel should have at minimum arround 100 events per photon peak per memory cell
* if central beam edges are visible, they should not be significantly more intense
Characterization is done by a weighted average algorithm, which evaluates the peak locations for all pixels
and memory cells of a given module. These locations are then fitted to a linear function of the average peak
location in each module, such that it yield a relative gain correction.
%% Cell type:code id: tags:
``` python
# the following lines should be adjusted depending on data
in_folder = '/gpfs/exfel/exp/MID/201931/p900091/raw/' # path to input data, required
modules = [3] # modules to work on, required, range allowed
out_folder = "/gpfs/exfel/exp/MID/201931/p900091/usr/FF/2.2" # path to output to, required
runs = [484, 485,] # runs to use, required, range allowed
sequences = [0,1,2,3]#,4,5,6,7,8] #,5,6,7,8,9,10] # sequences files to use, range allowed
cluster_profile = "noDB" # The ipcluster profile to use
local_output = True # output constants locally
db_output = True # output constants to database
db_output = False # output constants to database
bias_voltage = 300 # detector bias voltage
cal_db_interface = "tcp://max-exfl016:8026#8035" # the database interface to use
mem_cells = 250 # number of memory cells used
mem_cells = 0 # number of memory cells used
interlaced = False # assume interlaced data format, for data prior to Dec. 2017
fit_hook = True # fit a hook function to medium gain slope
rawversion = 2 # RAW file format version
instrument = "MID"
photon_energy = 9.2 # the photon energy in keV
offset_store = "" # for file-baed access
high_res_badpix_3d = False # set this to True if you need high-resolution 3d bad pixel plots. Runtime: ~ 1h
db_input = True # retreive data from calibration database, setting offset-store will overwrite this
deviation_threshold = 0.75 # peaks with an absolute location deviation larger than the medium are are considere bad
acqrate = 2.2 # acquisition rate
acqrate = 0. # acquisition rate
use_dir_creation_date = True
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC.ms e.g. 2019-07-04 11:02:41.00
gain_setting = 0.1 # gain setting can have value 0 or 1, Default=0.1 for no (None) gain-setting
karabo_da_control = "AGIPD1MCTRL00" # karabo DA for control infromation
```
%% Cell type:code id: tags:
``` python
# std library imports
from datetime import datetime
import dateutil
from functools import partial
import warnings
warnings.filterwarnings('ignore')
import os
import h5py
# numpy and matplot lib specific
import numpy as np
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
%matplotlib inline
# parallel processing via ipcluster
# make sure a cluster is running with ipcluster start --n=32, give it a while to start
from ipyparallel import Client
client = Client(profile=cluster_profile)
view = client[:]
view.use_dill()
# pyDetLib imports
import XFELDetAna.xfelpycaltools as xcal
import XFELDetAna.xfelpyanatools as xana
from XFELDetAna.util import env
env.iprofile = cluster_profile
profile = cluster_profile
from iCalibrationDB import ConstantMetaData, Constants, Conditions, Detectors, Versions
from cal_tools.agipdlib import get_gain_setting
from cal_tools.agipdlib import get_num_cells, get_acq_rate, get_gain_setting
from cal_tools.enums import BadPixels
from cal_tools.influx import InfluxLogger
from cal_tools.plotting import show_overview, plot_badpix_3d
from cal_tools.tools import gain_map_files, parse_runs, run_prop_seq_from_path, get_notebook_name, get_dir_creation_date, get_random_db_interface
```
%% Cell type:code id: tags:
``` python
# usually no need to change these lines
sensor_size = [128, 512]
block_size = [128, 512]
QUADRANTS = 4
MODULES_PER_QUAD = 4
DET_FILE_INSET = "AGIPD"
IL_MODE = interlaced
max_cells = mem_cells if not interlaced else mem_cells*2
memory_cells = mem_cells
# Define constant creation time.
if creation_time:
try:
creation_time = datetime.strptime(creation_time, '%Y-%m-%d %H:%M:%S.%f')
except Exception as e:
print(f"creation_time value error: {e}."
"Use same format as YYYY-MM-DD HR:MN:SC.ms e.g. 2019-07-04 11:02:41.00/n")
creation_time = None
print("Given creation time wont be used.")
else:
creation_time = None
if not creation_time and use_dir_creation_date:
ntries = 100
while ntries > 0:
try:
creation_time = get_dir_creation_date(in_folder, runs[0])
break
except OSError:
pass
ntries -= 1
print("Using {} as creation time".format(creation_time))
runs = parse_runs(runs)
if offset_store != "":
db_input = False
else:
db_input = True
limit_trains = 20
limit_trains_eval = None
print("Parameters are:")
print("Memory cells: {}/{}".format(memory_cells, max_cells))
print("Runs: {}".format(runs))
print("Modules: {}".format(modules))
print("Sequences: {}".format(sequences))
print("Interlaced mode: {}".format(IL_MODE))
print("Using DB: {}".format(db_output))
if instrument == "SPB":
loc = "SPB_DET_AGIPD1M-1"
dinstance = "AGIPD1M1"
else:
loc = "MID_DET_AGIPD1M-1"
dinstance = "AGIPD1M2"
cal_db_interface = get_random_db_interface(cal_db_interface)
# these lines can usually stay as is
fbase = "{}/{{}}/RAW-{{}}-AGIPD{{:02d}}-S{{:05d}}.h5".format(in_folder)
gains = np.arange(3)
cells = np.arange(max_cells)
run, prop, seq = run_prop_seq_from_path(in_folder)
logger = InfluxLogger(detector="AGIPD", instrument=instrument, mem_cells=mem_cells,
notebook=get_notebook_name(), proposal=prop)
channel = 0
fname = fbase.format(runs[0], runs[0].upper(), channel, 0)
if acqrate == 0.:
acqrate = get_acq_rate(fname, loc, channel)
if acqrate == 0:
acqrate = None
if instrument == "SPB":
dinstance = "AGIPD1M1"
else:
dinstance = "AGIPD1M2"
if mem_cells == 0:
cells = get_num_cells(fname, loc, channel)
mem_cells = cells # avoid setting twice
cal_db_interface = get_random_db_interface(cal_db_interface)
IL_MODE = interlaced
max_cells = mem_cells if not interlaced else mem_cells*2
memory_cells = mem_cells
print("Interlaced mode: {}".format(IL_MODE))
cells = np.arange(max_cells)
print(f"Acquisition rate and memory cells set from file {fname} are "
f"{acqrate} MHz and {max_cells}, respectively")
```
%% Cell type:code id: tags:
``` python
control_fname = f'{in_folder}/{runs[0]}/RAW-{runs[0].upper()}-{karabo_da_control}-S00000.h5'
if gain_setting == 0.1:
if creation_time.replace(tzinfo=None) < dateutil.parser.parse('2020-01-31'):
print("Set gain-setting to None for runs taken before 2020-01-31")
gain_setting = None
else:
try:
gain_setting = get_gain_setting(control_fname, h5path_ctrl)
except Exception as e:
print(f'Error while reading gain setting from: \n{control_fname}')
print(e)
print("Gain setting is not found in the control information")
print("Data will not be processed")
sequences = []
print(f"Gain setting: {gain_setting}")
```
%% Cell type:markdown id: tags:
For the characterization offset maps for each module are needed. The are read in either locally or through the XFEL calibration database.
%% Cell type:code id: tags:
``` python
from dateutil import parser
offset_g = {}
noise_g = {}
thresholds_g = {}
pc_g = {}
if not db_input:
print("Offset, noise and thresholds have been read in from: {}".format(offset_store))
store_file = h5py.File(offset_store, "r")
for i in modules:
qm = "Q{}M{}".format(i//4+1, i%4+1)
offset_g[qm] = np.array(store_file["{}/Offset/0/data".format(qm)])
noise_g[qm] = np.array(store_file["{}/Noise/0/data".format(qm)])
thresholds_g[qm] = np.array(store_file["{}/Threshold/0/data".format(qm)])
store_file.close()
else:
print("Offset, noise and thresholds have been read in from calibration database:")
first_ex = True
for i in modules:
qm = "Q{}M{}".format(i//4+1, i%4+1)
metadata = ConstantMetaData()
offset = Constants.AGIPD.Offset()
metadata.calibration_constant = offset
det = getattr(Detectors, dinstance)
# set the operating condition
condition = Conditions.Dark.AGIPD(memory_cells=max_cells, bias_voltage=bias_voltage,
acquisition_rate=acqrate, gain_setting=gain_setting)
metadata.detector_condition = condition
# specify the a version for this constant
if creation_time is None:
metadata.calibration_constant_version = Versions.Now(device=getattr(det, qm))
else:
metadata.calibration_constant_version = Versions.Timespan(device=getattr(det, qm), start=creation_time)
metadata.retrieve(cal_db_interface, when=creation_time.isoformat(), timeout=300000)
offset_g[qm] = offset.data
if first_ex:
print("Offset map was injected on: {}".format(metadata.calibration_constant_version.begin_at))
metadata = ConstantMetaData()
noise = Constants.AGIPD.Noise()
metadata.calibration_constant = noise
# set the operating condition
condition = Conditions.Dark.AGIPD(memory_cells=max_cells, bias_voltage=bias_voltage,
acquisition_rate=acqrate, gain_setting=gain_setting)
metadata.detector_condition = condition
# specify the a version for this constant
if creation_time is None:
metadata.calibration_constant_version = Versions.Now(device=getattr(det, qm))
else:
metadata.calibration_constant_version = Versions.Timespan(device=getattr(det, qm), start=creation_time)
metadata.retrieve(cal_db_interface, when=creation_time.isoformat(), timeout=300000)
noise_g[qm] = noise.data
if first_ex:
print("Noise map was injected on: {}".format(metadata.calibration_constant_version.begin_at))
metadata = ConstantMetaData()
thresholds = Constants.AGIPD.ThresholdsDark()
metadata.calibration_constant = thresholds
# set the operating condition
condition = Conditions.Dark.AGIPD(memory_cells=max_cells, bias_voltage=bias_voltage,
acquisition_rate=acqrate, gain_setting=gain_setting)
metadata.detector_condition = condition
# specify the a version for this constant
if creation_time is None:
metadata.calibration_constant_version = Versions.Now(device=getattr(det, qm))
else:
metadata.calibration_constant_version = Versions.Timespan(device=getattr(det, qm), start=creation_time)
metadata.retrieve(cal_db_interface, when=creation_time.isoformat(), timeout=300000)
thresholds_g[qm] = thresholds.data
if first_ex:
print("Threshold map was injected on: {}".format(metadata.calibration_constant_version.begin_at))
metadata = ConstantMetaData()
pc = Constants.AGIPD.SlopesPC()
metadata.calibration_constant = pc
# set the operating condition
condition = Conditions.Dark.AGIPD(memory_cells=max_cells, bias_voltage=bias_voltage,
acquisition_rate=acqrate, gain_setting=gain_setting)
metadata.detector_condition = condition
# specify the a version for this constant
if creation_time is None:
metadata.calibration_constant_version = Versions.Now(device=getattr(det, qm))
else:
metadata.calibration_constant_version = Versions.Timespan(device=getattr(det, qm), start=creation_time)
metadata.retrieve(cal_db_interface, when=creation_time.isoformat(), timeout=300000)
pc_g[qm] = np.nanmedian(pc.data[0,...])/pc.data
if first_ex:
print("PC map was injected on: {}".format(metadata.calibration_constant_version.begin_at))
first_ex = False
```
%% Cell type:markdown id: tags:
## Initial peak estimates ##
First initial peak estimates need to be defined. This is done by inspecting histograms created from (a subset of) the offset-corrected data for peak locations and peak heights.
%% Cell type:code id: tags:
``` python
def hist_single_module(fbase, runs, sequences, sensor_size, memory_cells, block_size,
il_mode, limit_trains, profile, rawversion, instrument, inp):
""" This function calculates a per-pixel histogram for a single module
Runs and sequences give the data to calculate histogram from
"""
channel, offset, thresholds, pc, noise = inp
import XFELDetAna.xfelpycaltools as xcal
import numpy as np
import h5py
from XFELDetAna.util import env
env.iprofile = profile
def baseline_correct_via_noise(d, noise, g, baseline_corr_noise_threshold=1000):
""" Correct baseline shifts by evaluating position of the noise peak
:param: d: the data to correct, should be a single image
:param noise: the mean noise for the cell id of `d`
:param g: gain array matching `d` array
Correction is done by identifying the left-most (significant) peak
in the histogram of `d` and shifting it to be centered on 0.
This is done via a continous wavelet transform (CWT), using a Ricker
wavelet.
Only high gain data is evaulated, and data larger than 50 ADU,
equivalent of roughly a 9 keV photon is ignored.
Two passes are executed: the first shift is accurate to 10 ADU and
will catch baseline shifts smaller then -2000 ADU. CWT is evaluated
for peaks of widths one, three and five time the noise.
The baseline is then shifted by the position of the summed CWT maximum.
In a second pass histogram is evaluated within a range of
+- 5*noise of the initial shift value, for peak of width noise.
Baseline shifts larger than the maximum threshold or positive shifts
larger 10 are discarded, and the original data is returned, otherwise
the shift corrected data is returned.
"""
import copy
from scipy.signal import cwt, ricker
# we work on a copy to be able to filter values by setting them to
# np.nan
dd = copy.copy(d)
dd[g != 0] = np.nan # only high gain data
dd[dd > 50] = np.nan # only noise data
h, e = np.histogram(dd, bins=210, range=(-2000, 100))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, 3. * noise, 5. * noise])
except:
return d
cwtall = np.sum(cwtmatr, axis=0)
marg = np.argmax(cwtall)
pc = c[marg]
high_res_range = (dd > pc - 5 * noise) & (dd < pc + 5 * noise)
dd[~high_res_range] = np.nan
h, e = np.histogram(dd, bins=200,
range=(pc - 5 * noise, pc + 5 * noise))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, ])
except:
return d
marg = np.argmax(cwtmatr)
pc = c[marg]
# too large shift to be easily decernable via noise
if pc > 10 or pc < -baseline_corr_noise_threshold:
return d
return d - pc
# function needs to be inline for parallell processing
def read_fun(filename, channel):
""" A reader function used by pyDetLib
"""
infile = h5py.File(filename, "r", driver="core")
if rawversion == 2:
count = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/count".format(instrument, channel)])
first = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/first".format(instrument, channel)])
last_index = int(first[count != 0][-1]+count[count != 0][-1])
first_index = int(first[count != 0][0])
else:
status = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/status".format(instrument, channel)])
if np.count_nonzero(status != 0) == 0:
return
last = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/last".format(instrument, channel)])
last_index = int(last[status != 0][-1])
first_index = int(last[status != 0][0])
if limit_trains is not None:
last_index = min(limit_trains*memory_cells+first_index, last_index)
im = np.array(infile["/INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/data".format(instrument, channel)][first_index:last_index,...])
carr = infile["/INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/cellId".format(instrument, channel)][first_index:last_index]
cells = np.squeeze(np.array(carr))
infile.close()
if il_mode:
ga = im[1::2, 0, ...]
im = im[0::2, 0, ...].astype(np.float32)
else:
ga = im[:, 1, ...]
im = im[:, 0, ...].astype(np.float32)
im = np.rollaxis(im, 2)
im = np.rollaxis(im, 2, 1)
ga = np.rollaxis(ga, 2)
ga = np.rollaxis(ga, 2, 1)
return im, ga, cells
offset_cor = xcal.OffsetCorrection(sensor_size,
offset,
nCells=memory_cells,
blockSize=block_size,
gains=[0,1,2])
offset_cor.mapper = offset_cor._view.map_sync
offset_cor.debug() # force non-parallel processing since outer function will run concurrently
hist_calc = xcal.HistogramCalculator(sensor_size,
bins=4000,
range=(-4000, 8000),
blockSize=block_size)
hist_calc.mapper = hist_calc._view.map_sync
hist_calc.debug() # force non-parallel processing since outer function will run concurrently
for run in runs:
for seq in sequences:
fname = fbase.format(run, run.upper(), channel, seq)
d, ga, c = read_fun(fname, channel)
vidx = (c < offset.shape[2]) & (c < thresholds.shape[2])
c = c[vidx]
d = d[:,:,vidx]
ga = ga[:,:,vidx]
# we need to do proper gain thresholding
g = np.zeros(ga.shape, np.uint8)
g[...] = 2
g[ga < thresholds[...,c,1]] = 1
g[ga < thresholds[...,c,0]] = 0
d = offset_cor.correct(d, cellTable=c, gainMap=g)
for i in range(d.shape[2]):
mn_noise = np.nanmean(noise[...,c[i]])
d[...,i] = baseline_correct_via_noise(d[...,i],
mn_noise,
g[..., i])
d *= np.moveaxis(pc[c,...], 0, 2)
hist_calc.fill(d)
h, e, c, _ = hist_calc.get()
return h, e, c
inp = []
for i in modules:
qm = "Q{}M{}".format(i//4+1, i%4+1)
inp.append((i, offset_g[qm], thresholds_g[qm], pc_g[qm][0,...], noise_g[qm][...,0]))
p = partial(hist_single_module, fbase, runs, sequences,
sensor_size, memory_cells, block_size, IL_MODE, limit_trains, profile, rawversion, instrument)
res_uncorr = list(map(p, inp))
```
%% Cell type:code id: tags:
``` python
d = []
qms = []
for i, r in enumerate(res_uncorr):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
qms.append(qm)
h, e, c = r
d.append({
'x': c,
'y': h,
'drawstyle': 'steps-mid'
})
fig = xana.simplePlot(d, y_log=False,
figsize="2col",
aspect=2,
x_range=(-50, 500),
x_label="Intensity (ADU)",
y_label="Counts")
fig.savefig("{}/FF_module_{}_peak_pos.png".format(out_folder, ",".join(qms)))
```
%% Cell type:code id: tags:
``` python
# these should be quite stable
peak_estimates = [0, 55, 105, 155]
peak_heights = [5e7, 5e6, 1e6, 5e5]
peak_sigma = [5., 5.,5., 5.,]
print("Using the following peak estimates: {}".format(peak_estimates))
```
%% Cell type:markdown id: tags:
## Calculate relative gain per module ##
Using the so obtained estimates, we calculate the relative gain per module. For this we use the weighted average method implemented in pyDetLib.
%% Cell type:code id: tags:
``` python
block_size = [64, 64]
def relgain_single_module(fbase, runs, sequences, peak_estimates,
peak_heights, peak_sigma, memory_cells, sensor_size,
block_size, il_mode, profile, limit_trains_eval, rawversion, instrument, inp):
""" A function for calculated the relative gain of a single AGIPD module
"""
# import needed inline for parallel processing
import XFELDetAna.xfelpycaltools as xcal
import numpy as np
import h5py
from XFELDetAna.util import env
env.iprofile = profile
channel, offset, thresholds, noise, pc = inp
def baseline_correct_via_noise(d, noise, g, baseline_corr_noise_threshold=1000):
""" Correct baseline shifts by evaluating position of the noise peak
:param: d: the data to correct, should be a single image
:param noise: the mean noise for the cell id of `d`
:param g: gain array matching `d` array
Correction is done by identifying the left-most (significant) peak
in the histogram of `d` and shifting it to be centered on 0.
This is done via a continous wavelet transform (CWT), using a Ricker
wavelet.
Only high gain data is evaulated, and data larger than 50 ADU,
equivalent of roughly a 9 keV photon is ignored.
Two passes are executed: the first shift is accurate to 10 ADU and
will catch baseline shifts smaller then -2000 ADU. CWT is evaluated
for peaks of widths one, three and five time the noise.
The baseline is then shifted by the position of the summed CWT maximum.
In a second pass histogram is evaluated within a range of
+- 5*noise of the initial shift value, for peak of width noise.
Baseline shifts larger than the maximum threshold or positive shifts
larger 10 are discarded, and the original data is returned, otherwise
the shift corrected data is returned.
"""
import copy
from scipy.signal import cwt, ricker
# we work on a copy to be able to filter values by setting them to
# np.nan
dd = copy.copy(d)
dd[g != 0] = np.nan # only high gain data
dd[dd > 50] = np.nan # only noise data
h, e = np.histogram(dd, bins=210, range=(-2000, 100))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, 3. * noise, 5. * noise])
except:
return d
cwtall = np.sum(cwtmatr, axis=0)
marg = np.argmax(cwtall)
pc = c[marg]
high_res_range = (dd > pc - 5 * noise) & (dd < pc + 5 * noise)
dd[~high_res_range] = np.nan
h, e = np.histogram(dd, bins=200,
range=(pc - 5 * noise, pc + 5 * noise))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, ])
except:
return d
marg = np.argmax(cwtmatr)
pc = c[marg]
# too large shift to be easily decernable via noise
if pc > 10 or pc < -baseline_corr_noise_threshold:
return d
return d - pc
# function needs to be inline for parallell processing
def read_fun(filename, channel):
""" A reader function used by pyDetLib
"""
infile = h5py.File(filename, "r", driver="core")
if rawversion == 2:
count = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/count".format(instrument, channel)])
first = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/first".format(instrument, channel)])
last_index = int(first[count != 0][-1]+count[count != 0][-1])
first_index = int(first[count != 0][0])
else:
status = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/status".format(instrument, channel)])
if np.count_nonzero(status != 0) == 0:
return
last = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/last".format(instrument, channel)])
last_index = int(last[status != 0][-1])
first_index = int(last[status != 0][0])
if limit_trains is not None:
last_index = min(limit_trains*memory_cells+first_index, last_index)
im = np.array(infile["/INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/data".format(instrument, channel)][first_index:last_index,...])
carr = infile["/INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/cellId".format(instrument, channel)][first_index:last_index]
cells = np.squeeze(np.array(carr))
infile.close()
if il_mode:
ga = im[1::2, 0, ...]
im = im[0::2, 0, ...].astype(np.float32)
else:
ga = im[:, 1, ...]
im = im[:, 0, ...].astype(np.float32)
im = np.rollaxis(im, 2)
im = np.rollaxis(im, 2, 1)
ga = np.rollaxis(ga, 2)
ga = np.rollaxis(ga, 2, 1)
return im, ga, cells
offset_cor = xcal.OffsetCorrection(sensor_size, offset, nCells=memory_cells,
blockSize=block_size, gains=[0,1,2])
offset_cor.mapper = offset_cor._view.map_sync
rel_gain = xcal.GainMapCalculator(sensor_size,
peak_estimates,
peak_sigma,
nCells=memory_cells,
peakHeights = peak_heights,
noiseMap=noise,
deviationType="relative")
rel_gain.mapper = rel_gain._view.map_sync
for run in runs:
for seq in sequences:
fname = fbase.format(run, run.upper(), channel, seq)
d, ga, c = read_fun(fname, channel)
# we need to do proper gain thresholding
vidx = (c < offset.shape[2]) & (c < thresholds.shape[2])
c = c[vidx]
d = d[:,:,vidx]
ga = ga[:,:,vidx]
# we need to do proper gain thresholding
g = np.zeros(ga.shape, np.uint8)
g[...] = 2
g[ga < thresholds[...,c,1]] = 1
g[ga < thresholds[...,c,0]] = 0
d = offset_cor.correct(d, cellTable=c, gainMap=g)
for i in range(d.shape[2]):
mn_noise = np.nanmean(noise[...,c[i]])
d[...,i] = baseline_correct_via_noise(d[...,i],
mn_noise,
g[..., i])
d *= np.moveaxis(pc[c,...], 0, 2)
rel_gain.fill(d, cellTable=c)
gain_map = rel_gain.get()
gain_map_func = rel_gain.getUsingFunc(inverse=False)
pks, stds = rel_gain.getRawPeaks()
return gain_map, pks, stds, gain_map_func
inp = []
for i in modules:
qm = "Q{}M{}".format(i//4+1, i%4+1)
inp.append((i, offset_g[qm], thresholds_g[qm], noise_g[qm][...,0], pc_g[qm][0,...]))
start = datetime.now()
p = partial(relgain_single_module, fbase, runs, sequences,
peak_estimates, peak_heights, peak_sigma, memory_cells,
sensor_size, block_size, IL_MODE, profile, limit_trains_eval, rawversion, instrument)
res_gain = list(map(p, inp)) # don't run concurently as inner function are parelllized
duration = (datetime.now()-start).total_seconds()
logger.runtime_summary_entry(success=True, runtime=duration)
logger.send()
```
%% Cell type:markdown id: tags:
Finally, we inspect the results, by plotting the number of entries, peak locations and resulting gain maps for each peak. In the course of doing so, we identify bad pixels by either having 0 entries for a peak, or having `nan` values for the peak location.
%% Cell type:code id: tags:
``` python
gain_m = {}
flatsff = {}
gainoff_g = {}
entries_g = {}
peaks_g = {}
mask_g = {}
cell_to_preview = 4
masks_eval_peaks = [1, 2]
global_eval_peaks = [1]
global_meds = {}
for i, r in enumerate(res_gain):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
gain, pks, std, gfunc = r
gains, errors, chisq, valid, max_dev, stats = gfunc
_, entries, stds, sow = gain
gain_db = np.zeros((gains.shape[0], gains.shape[1], memory_cells))
gain_mdb = np.zeros((gains.shape[0], gains.shape[1], memory_cells))
entries_db = np.zeros((gains.shape[0], gains.shape[1], memory_cells, 5))
gainoff_db = np.zeros((gains.shape[0], gains.shape[1], memory_cells))
mask_db = np.zeros((gains.shape[0], gains.shape[1], memory_cells), np.uint32)
gainoff_g[qm] = gainoff_db
gain_m[qm] = gain_mdb
entries_g[qm] = entries_db
peaks_g[qm] = pks
# create a mask for unregular pixels
# first bit set if first peak has nan entries
for pk in masks_eval_peaks:
mask_db[~(np.isfinite(gain_mdb) & np.isfinite(gainoff_db))] |= BadPixels.FF_GAIN_EVAL_ERROR.value
mask_db[(np.abs(1-gain_mdb/np.nanmedian(gain_mdb) > deviation_threshold) )] |= BadPixels.FF_GAIN_DEVIATION.value
# second bit set if entries are 0 for first peak
mask_db[entries[...,1] == 0] |= BadPixels.FF_NO_ENTRIES.value
zero_oc = np.count_nonzero((entries > 0).astype(np.int), axis=3)
mask_db[zero_oc <= len(peak_estimates)-2] |= BadPixels.FF_NO_ENTRIES.value
# third bit set if entries of a given adc show significant noise
stds = []
for ii in range(8):
for jj in range(8):
stds.append(np.std(entries_db[ii*16:(ii+1)*16,jj*64+2:(jj+1)*64-2,:,1], axis=(0,1)))
avg_stds = np.median(np.array(stds), axis=0)
for ii in range(8):
for jj in range(8):
std = np.std(entries_db[ii*16:(ii+1)*16,jj*64+2:(jj+1)*64-2,:,1], axis=(0,1))
if np.any(std > 5*avg_stds):
mask_db[ii*16:(ii+1)*16,jj*64:(jj+1)*64,std > 5*avg_stds] |= BadPixels.FF_GAIN_DEVIATION
mask_g[qm] = mask_db
flat = np.zeros((gains.shape[0], gains.shape[1], memory_cells, 3))
for j in range(2,len(peak_estimates)):
flat[...,j-2] = np.mean(entries[...,j]/np.mean(entries[...,j]))
flat = np.mean(flat, axis=3)
#flat_db = np.zeros((gains.shape[0], gains.shape[1], memory_cells))
#for j in range(2):
# flat_db[...,j::2] = flat
flatsff[qm] = flat
global_meds[qm] = np.nanmedian(pks[...,global_eval_peaks][np.max(mask_db, axis=2) != 0])
```
%% Cell type:markdown id: tags:
## Evaluated peak locations ##
The following plot shows the evaluated peak locations for each peak. Peak ids increase downward:
%% Cell type:code id: tags:
``` python
from mpl_toolkits.axes_grid1 import AxesGrid
cell_to_preview = 4
for qm, data in peaks_g.items():
print("The module shown is: {}".format(qm))
print("The cell shown is: {}".format(cell_to_preview))
print("Evaluated peaks: {}".format(data.shape[-1]))
fig = plt.figure(figsize=(15,15))
grid = AxesGrid(fig, 111,
nrows_ncols=(data.shape[-1], 1),
axes_pad=0.1,
share_all=True,
label_mode="L",
cbar_location="right",
cbar_mode="each",
cbar_size="7%",
cbar_pad="2%")
for j in range(data.shape[-1]):
d = data[...,cell_to_preview,j]
d[~np.isfinite(d)] = 0
im = grid[j].imshow(d, interpolation="nearest", vmin=0.9*np.nanmedian(d), vmax=max(1.1*np.nanmedian(d), 50))
cb = grid.cbar_axes[j].colorbar(im)
cb.set_label_text("Peak location (ADU)")
```
%% Cell type:markdown id: tags:
## Gain Slopes And Offsets ##
The gain slopes and offsets are deduced by fitting a linear function $y = mx + b$ to the evaluated peaks. Gains are normalized to all pixels and all memory cells of a module by using the average peak locations a $x$ values. Thus slopes centered around 1 are to be expected.
%% Cell type:code id: tags:
``` python
for i, r in enumerate(res_gain):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
gain, pks, std, gfunc = r
gains, errors, chisq, valid, max_dev, stats = gfunc
_, entries, stds, sow = gain
fig = plt.figure(figsize=(15,8))
ax = fig.add_subplot(211)
im = ax.imshow(gains[...,0], interpolation="nearest", vmin=0.85, vmax=1.15)
cb = fig.colorbar(im)
cb.set_label("Gain slope m")
fig.savefig("{}/gain_m_mod{}.png".format(out_folder, qm))
ax = fig.add_subplot(212)
ax.hist(gains[...,0].flatten(), range=(0.5, 1.5), bins=100)
ax.set_ylabel("Occurences")
ax.set_xlabel("Gain slope m")
ax.semilogy()
```
%% Cell type:markdown id: tags:
The gain offsets b are expected to be centered around 0 and minimal, as data is already offset substracted.
%% Cell type:code id: tags:
``` python
for i, r in enumerate(res_gain):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
gain, pks, std, gfunc = r
gains, errors, chisq, valid, max_dev, stats = gfunc
_, entries, stds, sow = gain
fig = plt.figure(figsize=(15,8))
ax = fig.add_subplot(211)
im = ax.imshow(gains[...,1], interpolation="nearest", vmin=-25, vmax=25)
cb = fig.colorbar(im)
cb.set_label("Gain offset b")
fig.savefig("{}/gain_b_mod{}.png".format(out_folder, qm))
ax = fig.add_subplot(212)
ax.hist(gains[...,1].flatten(), range=(-25, 25), bins=100)
ax.set_ylabel("Occurences")
ax.set_xlabel("Gain offset b")
ax.semilogy()
```
%% Cell type:markdown id: tags:
## Bad Pixels ##
Bad pixels are defined as any of the following criteria:
* the gain evaluation did not converge, or location of noise peak deviates by more than than the deviation threshold from the median location.
* the number of entries for the noise peak in 0.
* the standard deviation of the number of entries is larger than 5 times the standard deviation for the ASIC the pixel is on.
%% Cell type:code id: tags:
``` python
print("The deviation threshold is: {}".format(deviation_threshold))
```
%% Cell type:code id: tags:
``` python
for i, r in enumerate(res_gain):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
mask_db = mask_g[qm]
fig = plt.figure(figsize=(15,8))
ax = fig.add_subplot(211)
im = ax.imshow(np.log2(mask_db[...,cell_to_preview]), interpolation="nearest", vmin=0, vmax=32)
cb = fig.colorbar(im)
cb.set_label("Mask value")
fig.savefig("{}/mask_mod{}.png".format(out_folder, qm))
ax = fig.add_subplot(212)
ax.hist(np.log2(mask_db.flatten()), range=(0, 32), bins=32, normed=True)
ax.set_ylabel("Occurences")
ax.set_xlabel("Mask value")
ax.semilogy()
```
%% Cell type:code id: tags:
``` python
cols = {BadPixels.FF_GAIN_EVAL_ERROR.value: (BadPixels.FF_GAIN_EVAL_ERROR.name, '#FF000080'),
BadPixels.FF_GAIN_DEVIATION.value: (BadPixels.FF_GAIN_DEVIATION.name, '#0000FF80'),
BadPixels.FF_NO_ENTRIES.value: (BadPixels.FF_NO_ENTRIES.name, '#00FF0080'),
BadPixels.FF_GAIN_EVAL_ERROR.value |
BadPixels.FF_GAIN_DEVIATION.value: ('EVAL+DEV', '#DD00DD80'),
BadPixels.FF_GAIN_EVAL_ERROR.value |
BadPixels.FF_NO_ENTRIES.value: ('EVAL+NO_ENTRY', '#00DDDD80'),
BadPixels.FF_GAIN_DEVIATION.value |
BadPixels.FF_NO_ENTRIES.value: ('DEV+NO_ENTRY', '#DDDD0080'),
}
rebin = 32 if not high_res_badpix_3d else 2
gain = 0
for mod, data in mask_g.items():
plot_badpix_3d(data, cols, title=mod, rebin_fac=rebin)
```
%% Cell type:code id: tags:
``` python
if local_output:
ofile = "{}/agipd_gain_store_{}_modules_{}.h5".format(out_folder,
"_".join([str(r) for r in runs]),
"_".join([str(m) for m in modules]))
store_file = h5py.File(ofile, "w")
for i, r in enumerate(res_gain):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
gain, pks, std, gfunc = r
gains, errors, chisq, valid, max_dev, stats = gfunc
gainmap, entires, stds, sow = gain
store_file["/{}/Gain/0/data".format(qm)] = gains[...,0]
store_file["/{}/GainOffset/0/data".format(qm)] = gains[...,1]
store_file["/{}/Flat/0/data".format(qm)] = flatsff[qm]
store_file["/{}/Entries/0/data".format(qm)] = entires
store_file["/{}/BadPixels/0/data".format(qm)] = mask_g[qm]
store_file.close()
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = proposal + ' ' + ' '.join(list(map(str,runs)))
```
%% Cell type:code id: tags:
``` python
if db_output:
for i, r in enumerate(res_gain):
ii = list(modules)[i]
qm = "Q{}M{}".format(ii//4+1, ii%4+1)
gain, pks, std, gfunc = r
gains, errors, chisq, valid, max_dev, stats = gfunc
gainmap, entires, stds, sow = gain
det = getattr(Detectors, dinstance)
device = getattr(det, qm)
# gains related
metadata = ConstantMetaData()
cgain = Constants.AGIPD.SlopesFF()
cgain.data = gains
metadata.calibration_constant = cgain
# set the operating condition
condition = Conditions.Illuminated.AGIPD(memory_cells, bias_voltage, 9.2,
pixels_x=512, pixels_y=128, beam_energy=None,
acquisition_rate=acqrate, gain_setting=gain_setting)
metadata.detector_condition = condition
# specify the a version for this constant
if creation_time is None:
metadata.calibration_constant_version = Versions.Now(device=getattr(det, qm))
else:
metadata.calibration_constant_version = Versions.Timespan(device=getattr(det, qm), start=creation_time)
metadata.calibration_constant_version.raw_data_location = file_loc
metadata.send(cal_db_interface, timeout=300000)
# bad pixels
metadata = ConstantMetaData()
bp = Constants.AGIPD.BadPixelsFF()
bp.data = mask_g[qm]
metadata.calibration_constant = bp
# set the operating condition
condition = Conditions.Illuminated.AGIPD(memory_cells, bias_voltage, 9.2,
pixels_x=512, pixels_y=128, beam_energy=None,
acquisition_rate=acqrate, gain_setting=gain_setting)
metadata.detector_condition = condition
# specify the a version for this constant
if creation_time is None:
metadata.calibration_constant_version = Versions.Now(device=getattr(det, qm))
else:
metadata.calibration_constant_version = Versions.Timespan(device=getattr(det, qm), start=creation_time)
metadata.calibration_constant_version.raw_data_location = file_loc
metadata.send(cal_db_interface, timeout=300000)
```
%% Cell type:markdown id: tags:
## Sanity check ##
Finally, we perform a correction of the data used to derive the gain constants with said constants. We expected an improvement in peak FWHM, if constants have been deduced correctly.
%% Cell type:code id: tags:
``` python
def hist_single_module(fbase, runs, sequences, il_mode, profile, limit_trains, memory_cells, rawversion, instrument, inp):
channel, offset, thresholds, relgain, noise, pc = inp
gain, pks, std, gfunc = relgain
gains, errors, chisq, valid, max_dev, stats = gfunc
import XFELDetAna.xfelpycaltools as xcal
import numpy as np
import h5py
import copy
from XFELDetAna.util import env
env.iprofile = profile
sensor_size = [128, 512]
block_size = sensor_size
def baseline_correct_via_noise(d, noise, g, baseline_corr_noise_threshold=1000):
""" Correct baseline shifts by evaluating position of the noise peak
:param: d: the data to correct, should be a single image
:param noise: the mean noise for the cell id of `d`
:param g: gain array matching `d` array
Correction is done by identifying the left-most (significant) peak
in the histogram of `d` and shifting it to be centered on 0.
This is done via a continous wavelet transform (CWT), using a Ricker
wavelet.
Only high gain data is evaulated, and data larger than 50 ADU,
equivalent of roughly a 9 keV photon is ignored.
Two passes are executed: the first shift is accurate to 10 ADU and
will catch baseline shifts smaller then -2000 ADU. CWT is evaluated
for peaks of widths one, three and five time the noise.
The baseline is then shifted by the position of the summed CWT maximum.
In a second pass histogram is evaluated within a range of
+- 5*noise of the initial shift value, for peak of width noise.
Baseline shifts larger than the maximum threshold or positive shifts
larger 10 are discarded, and the original data is returned, otherwise
the shift corrected data is returned.
"""
import copy
from scipy.signal import cwt, ricker
# we work on a copy to be able to filter values by setting them to
# np.nan
dd = copy.copy(d)
dd[g != 0] = np.nan # only high gain data
dd[dd > 50] = np.nan # only noise data
h, e = np.histogram(dd, bins=210, range=(-2000, 100))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, 3. * noise, 5. * noise])
except:
return d
cwtall = np.sum(cwtmatr, axis=0)
marg = np.argmax(cwtall)
pc = c[marg]
high_res_range = (dd > pc - 5 * noise) & (dd < pc + 5 * noise)
dd[~high_res_range] = np.nan
h, e = np.histogram(dd, bins=200,
range=(pc - 5 * noise, pc + 5 * noise))
c = (e[1:] + e[:-1]) / 2
try:
cwtmatr = cwt(h, ricker, [noise, ])
except:
return d
marg = np.argmax(cwtmatr)
pc = c[marg]
# too large shift to be easily decernable via noise
if pc > 10 or pc < -baseline_corr_noise_threshold:
return d
return d - pc
def read_fun(filename, channel):
infile = h5py.File(filename, "r", driver="core")
if rawversion == 2:
count = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/count".format(instrument, channel)])
first = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/first".format(instrument, channel)])
last_index = int(first[count != 0][-1]+count[count != 0][-1])
first_index = int(first[count != 0][0])
else:
status = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/status".format(instrument, channel)])
if np.count_nonzero(status != 0) == 0:
return
last = np.squeeze(infile["/INDEX/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/last".format(instrument, channel)])
last_index = int(last[status != 0][-1])
first_index = int(last[status != 0][0])
if limit_trains is not None:
last_index = min(limit_trains*memory_cells+first_index, last_index)
im = np.array(infile["/INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/data".format(instrument, channel)][first_index:last_index,...])
carr = infile["/INSTRUMENT/{}_DET_AGIPD1M-1/DET/{}CH0:xtdf/image/cellId".format(instrument, channel)][first_index:last_index]
cells = np.squeeze(np.array(carr))
infile.close()
if il_mode:
ga = im[1::2, 0, ...]
im = im[0::2, 0, ...].astype(np.float32)
else:
ga = im[:, 1, ...]
im = im[:, 0, ...].astype(np.float32)
im = np.rollaxis(im, 2)
im = np.rollaxis(im, 2, 1)
ga = np.rollaxis(ga, 2)
ga = np.rollaxis(ga, 2, 1)
return im, ga, cells
offset_cor = xcal.OffsetCorrection(sensor_size, offset, nCells=memory_cells, blockSize=block_size, gains=[0,1,2])
offset_cor.debug()
hist_calc = xcal.HistogramCalculator(sensor_size, bins=2000, range=(0, 2000), blockSize=block_size)
hist_calc.debug()
hist_calc_uncorr = xcal.HistogramCalculator(sensor_size, bins=2000, range=(0, 2000), blockSize=block_size)
hist_calc_uncorr.debug()
for run in runs:
for seq in sequences[:2]:
fname = fbase.format(run, run.upper(), channel, seq)
d, ga, c = read_fun(fname, channel)
# we need to do proper gain thresholding
vidx = (c < offset.shape[2]) & (c < thresholds.shape[2])
c = c[vidx]
d = d[:,:,vidx]
ga = ga[:,:,vidx]
g = np.zeros(ga.shape, np.uint8)
g[...] = 2
g[ga < thresholds[...,c,1]] = 1
g[ga < thresholds[...,c,0]] = 0
d = offset_cor.correct(d, cellTable=c, gainMap=g)
for i in range(d.shape[2]):
mn_noise = np.nanmean(noise[...,c[i]])
d[...,i] = baseline_correct_via_noise(d[...,i],
mn_noise,
g[..., i])
d *= np.moveaxis(pc[c,...], 0, 2)
hist_calc_uncorr.fill(d)
d = (d-gains[..., 1][...,None])/gains[..., 0][...,None]
#d = d/gains[..., 0][...,None]
hist_calc.fill(d)
h, e, c, _ = hist_calc.get()
hu = hist_calc_uncorr.get()
return h, e, c, hu[0]
inp = []
idx = 0
for i in modules:
qm = "Q{}M{}".format(i//4+1, i%4+1)
inp.append((i, offset_g[qm], thresholds_g[qm], res_gain[idx], noise_g[qm][...,0], pc_g[qm][0,...]))
idx += 1
p = partial(hist_single_module, fbase, runs, sequences, IL_MODE, profile, limit_trains, memory_cells, rawversion, instrument)
#res = view.map_sync(p, inp)
res = list(map(p, inp))
```
%% Cell type:code id: tags:
``` python
from iminuit import Minuit
from iminuit.util import make_func_code, describe
from IPython.display import HTML, display
import tabulate
# fitting
par_ests = {}
par_ests["mu0"] = 0
par_ests["mu1"] = 50
par_ests["mu2"] = 100
par_ests["limit_mu0"] = [-5, 5]
par_ests["limit_mu1"] = [35, 65]
par_ests["limit_mu2"] = [100, 150]
par_ests["s0"] = 10
par_ests["s1"] = 10
par_ests["s2"] = 10
par_ests["limit_A0"] = [1e5, 1e12]
par_ests["limit_A1"] = [1e4, 1e10]
par_ests["limit_A2"] = [1e3, 1e10]
par_ests["throw_nan"] = False
par_ests["pedantic"] = False
par_ests["print_level"] = 1
def gaussian3(x, mu0, s0, A0, mu1, s1, A1, mu2, s2, A2):
return (A0/np.sqrt(2*np.pi*s0**2)*np.exp(-0.5*((x-mu0)/s0)**2) +
A1/np.sqrt(2*np.pi*s1**2)*np.exp(-0.5*((x-mu1)/s1)**2) +
A2/np.sqrt(2*np.pi*s2**2)*np.exp(-0.5*((x-mu2)/s2)**2))
f_sig = describe(gaussian3)[1:]
class _Chi2Functor:
def __init__(self, f, x, y, err):
self.f = f
self.x = x
self.y = y
self.err = err
f_sig = describe(f)
# this is how you fake function
# signature dynamically
self.func_code = make_func_code(
f_sig[1:]) # docking off independent variable
self.func_defaults = None # this keeps numpy.vectorize happy
def __call__(self, *arg):
# notice that it accept variable length
# positional arguments
# chi2 = sum((y-self.f(x,*arg))**2 for x,y in zip(self.x,self.y))
return np.sum(((self.f(self.x, *arg) - self.y) ** 2) / self.err)
d = []
y_range_max = 0
table = []
headers = ['Module',
'FWHM (cor.) [ADU]', 'Separation (cor.) [$\sigma$]',
'FWHM (uncor.) [ADU]', 'Separation (uncor.) [$\sigma$]',
'Improvement'
]
for i, r in enumerate(res):
qm = "Q{}M{}".format(i//4+1, i%4+1)
row = []
row.append(qm)
h, e, c, hu = r
d.append({
'x': c,
'y': h,
'drawstyle': 'steps-mid',
'label': '{}: corrected'.format(qm),
'marker': '.',
'color': 'blue',
})
idx = (h > 0) & np.isfinite(h)
h_non_zero = h[idx]
c_non_zero = c[idx]
par_ests["A0"] = np.float(h[np.argmin(abs(c-0))])
par_ests["A1"] = np.float(h[np.argmin(abs(c-50))])
par_ests["A2"] = np.float(h[np.argmin(abs(c-100))])
wrapped = _Chi2Functor(gaussian3, c_non_zero, h_non_zero,
np.sqrt(h_non_zero))
m = Minuit(wrapped, **par_ests)
fmin = m.migrad()
xt = np.arange(0, 200)
yt = gaussian3(xt, m.values['mu0'], m.values['s0'], m.values['A0'],
m.values['mu1'], m.values['s1'], m.values['A1'],
m.values['mu2'], m.values['s2'], m.values['A2'])
d.append({
'x': xt,
'y': yt,
'label': '{}: corrected (fit)'.format(qm),
'color': 'green',
'drawstyle': 'line',
'linewidth': 2
})
d.append({
'x': c,
'y': hu,
'drawstyle': 'steps-mid',
'label': '{}: uncorrected'.format(qm),
'marker': '.',
'color': 'grey',
'alpha': 0.5
})
row += [m.values['s1']*2.35, (m.values['mu1']-m.values['mu0'])/m.values['s1']]
idx = (hu > 0) & np.isfinite(hu)
h_non_zero = hu[idx]
c_non_zero = c[idx]
wrapped = _Chi2Functor(gaussian3, c_non_zero, h_non_zero,
np.sqrt(h_non_zero))
m = Minuit(wrapped, **par_ests)
fmin = m.migrad()
xt = np.arange(0, 200)
yt = gaussian3(xt, m.values['mu0'], m.values['s0'], m.values['A0'],
m.values['mu1'], m.values['s1'], m.values['A1'],
m.values['mu2'], m.values['s2'], m.values['A2'])
d.append({
'x': xt,
'y': yt,
'label': '{}: uncorrected (fit)'.format(qm),
'color': 'red',
'linewidth': 2
})
row += [m.values['s1']*2.35, (m.values['mu1']-m.values['mu0'])/m.values['s1']]
row.append("{:0.2f} %".format(100*(row[3]/row[1]-1)))
y_range_max = max(y_range_max, np.max(h[c>25])*1.5)
# output table
table.append(row)
fig = xana.simplePlot(d, y_log=False, figsize="2col",
aspect=2,
x_range=(0, 200),
legend='top-right-frame',
y_range=(0, y_range_max),
x_label='Energy (ADU)',
y_label='Counts')
display(HTML(tabulate.tabulate(table, tablefmt='html', headers=headers,
numalign="right", floatfmt="0.2f")))
```
%% Cell type:code id: tags:
``` python
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment