Skip to content
Snippets Groups Projects

[AGIPD] [PC] FEAT: Optimisation of the PC characterisation notebook.

Merged Vratko Rovensky requested to merge feat/new_agipd_pc into master
Compare and
1 file
+ 834
739
Compare changes
  • Side-by-side
  • Inline
%% Cell type:markdown id: tags:
# Characterize AGIPD Pulse Capacitor Data #
Author: S. Hauf, Version 1.0
Author: Detector Operations Group, Version 1.0
The following code characterizes AGIPD gain via data take with the pulse capacitor source (PCS). The PCS allows scanning through the high and medium gains of AGIPD, by subsequently intecreasing the number of charge pulses from a on-ASIC capicitor, thus increasing the charge a pixel sees in a given integration time.
Because induced charge does not originate from X-rays on the sensor, the gains evaluated here will later need to be rescaled with gains deduced from X-ray data.
PCS data is organized into multiple runs, as the on-ASIC current source cannot supply all pixels of a given module with charge at the same time. Hence, only certain pixel rows will have seen charge for a given image. These rows then first need to be combined into single module images again.
PCS data is organized into multiple runs, as the on-ASIC current source cannot supply all pixels of a given module with charge at the same time. Hence, only certain pixel rows will have seen charge for a given image. These rows then first need to be combined into single module images again. This script uses new style of merging, which does not support old data format.
We then use a K-means clustering algorithm to identify components in the resulting per-pixel data series, matching to three general regions:
* a high gain slope
* a transition region, where gain switching occurs
* a medium gain slope.
The same regions are present in the gain-bit data and are used to deduce the switching threshold.
The resulting slopes are then fitted with a linear function and a combination of a linear and exponential decay function to determine the relative gains of the pixels with respect to the module. Additionally, we deduce masks for bad pixels form the data.
%% Cell type:code id: tags:
``` python
cluster_profile = "noDB" # The ipcluster profile to use
in_folder = '/gpfs/exfel/exp/SPB/202130/p900188/raw/' # path to input data, required
out_folder = "/gpfs/exfel/data/scratch/jsztuk/test/pc" # path to output to, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
runs = [92, 93, 94, 95, 96, 97, 98, 99] # runs to use, required, range allowed
n_sequences = 5 # number of sequence files, starting for 0 to evaluate
modules = [-1] # modules to work on, required, range allowed
karabo_da = ["all"]
karabo_da_control = "AGIPD1MCTRL00" # karabo DA for control infromation
karabo_id_control = "SPB_IRU_AGIPD1M1"
karabo_id_control = "SPB_IRU_AGIPD1M1" # karabo-id for the control device e.g. "MID_EXP_AGIPD1M1", or "SPB_IRU_AGIPD1M1"
karabo_id = "SPB_DET_AGIPD1M-1"
h5path_ctrl = '/CONTROL/{}/MDL/FPGA_COMP' # path to control information
ctrl_source_template = '{}/MDL/FPGA_COMP' # path to control information
instrument_source_template = '{}/DET/{}:xtdf' # path in the HDF5 file to images
receiver_template = "{}CH0" # inset for receiver devices
use_dir_creation_date = True
delta_time = 0 # offset to the creation time (e.g. useful in case we want to force the system to use diff. dark constants)
cal_db_interface = "tcp://max-exfl016:8019" # the database interface to use
local_output = True # output constants locally
db_output = False # output constants to database
bias_voltage = 300 # detector bias voltage
mem_cells = 0. # number of memory cells used, use 0 to auto-derive
acq_rate = 0. # the detector acquisition rate, use 0 to try to auto-determine
gain_setting = 0.1 # gain setting can have value 0 or 1, Default=0.1 for no (None) gain-setting
integration_time = -1 # integration time, negative values for auto-detection.
interlaced = False # assume interlaced data format, for data prior to Dec. 2017
fit_hook = True # fit a hook function to medium gain slope
rawversion = 2 # RAW file format version
bias_voltage = -1 # detector bias voltage, negative values for auto-detection.
mem_cells = -1 # number of memory cells used, negative values for auto-detection.
acq_rate = -1. # the detector acquisition rate, negative values for auto-detection.
gain_setting = -1 # gain setting can have value 0 or 1, negative values for auto-detection.
integration_time = -1 # integration time, negative values for auto-detection.
FF_gain = 7.3 # ADU/keV, gain from FF to convert ADU to keV
fit_hook = False # fit a hook function to medium gain slope --> run without hook
high_res_badpix_3d = False # set this to True if you need high-resolution 3d bad pixel plots. Runtime: ~ 1h
hg_range = [30,210]#[0,-150] # range for linear fit. If upper edge is negative use clustering result (bound_hg - 20)
mg_range = [-277,600]#[-350,600] # range for linear fit. If lower edge is negative use clustering result (bound_mg + 20)
sigma_dev_cut = 5.0 # parameters outside the range median +- sigma_dev_cut*MAD are replaced with the median
```
%% Cell type:code id: tags:
``` python
# imports, usually no need to change anything here
import os
import warnings
from datetime import datetime, timedelta
from functools import partial
warnings.filterwarnings('ignore')
import dateutil.parser
import h5py
import matplotlib
import numpy as np
from ipyparallel import Client
import matplotlib.pyplot as plt
from matplotlib import gridspec
from matplotlib.colors import LogNorm, PowerNorm
import matplotlib.patches as patches
from mpl_toolkits.axes_grid1 import AxesGrid
%matplotlib inline
import XFELDetAna.xfelpyanatools as xana
from cal_tools.agipdlib import (
get_acq_rate, get_gain_setting, get_integration_time, get_num_cells
)
from extra_data import RunDirectory
from cal_tools.agipdlib import AgipdCtrl
from cal_tools.enums import BadPixels
from cal_tools.plotting import plot_badpix_3d, show_overview
from cal_tools.tools import (
gain_map_files,
get_constant_from_db_and_time,
get_dir_creation_date,
get_notebook_name,
get_pdu_from_db,
get_report,
module_index_to_qm,
parse_runs,
run_prop_seq_from_path,
send_to_db,
)
from iCalibrationDB import Conditions, ConstantMetaData, Constants, Detectors, Versions
# make sure a cluster is running with ipcluster start --n=32, give it a while to start
view = Client(profile=cluster_profile)[:]
view.use_dill()
```
%% Cell type:code id: tags:
IL_MODE = interlaced
maxcells = mem_cells if not interlaced else mem_cells*2
``` python
cells = mem_cells
path_temp = in_folder+"/r{:04d}/"
image_name_temp = 'RAW-R{:04d}-AGIPD{:02d}-S{:05d}.h5'
seqs = n_sequences
print("Parameters are:")
print("Memory cells: {}/{}".format(cells, maxcells))
if mem_cells < 0:
print("Memory cells: auto-detection on")
else:
print("Memory cells set by user: {}".format(mem_cells))
print("Runs: {}".format(runs))
print("Modules: {}".format(modules))
print("Sequences: {}".format(seqs))
print("Interlaced mode: {}".format(IL_MODE))
run, prop, seq = run_prop_seq_from_path(in_folder)
instrument = karabo_id.split("_")[0]
if instrument == "HED":
nmods = 8
else:
nmods = 16
print(f"Detector in use is {karabo_id}")
if karabo_da == ["all"]:
if modules[0] == -1:
modules = list(range(nmods))
karabo_da = ["AGIPD{:02d}".format(i) for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
```
%% Cell type:markdown id: tags:
## Read in data and merge ##
The number of bursts in each sequence file is determined from the sequence files of the first module.
%% Cell type:code id: tags:
``` python
run = runs[0]
bursts_per_file = []
first_run = runs[0]
channel = 0
control_fname = f'{in_folder}/r{run:04d}/RAW-R{run:04d}-{karabo_da_control}-S00000.h5'
slow_paths = (control_fname, karabo_id_control)
ctrl_src = ctrl_source_template.format(karabo_id_control)
instrument_src = instrument_source_template.format(karabo_id, receiver_template).format(channel)
for seq in range(seqs):
fname = os.path.join(path_temp.format(run),
image_name_temp.format(run, channel, seq))
print('Reading ',fname)
if acq_rate == 0.:
acq_rate = get_acq_rate(
fast_paths=(fname, karabo_id, channel), slow_paths=slow_paths)
print("Acquisition rate set from file: {} MHz".format(acq_rate))
if mem_cells == 0:
cells = get_num_cells(fname, karabo_id, channel)
maxcells = cells
mem_cells = cells # avoid setting twice
print("Memory cells set from file: {}".format(cells))
f = h5py.File(fname, 'r')
if rawversion == 2:
count = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/count".format(karabo_id, channel)])
bursts_per_file.append(np.count_nonzero(count))
else:
status = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/status".format(karabo_id, channel)])
bursts_per_file.append(np.count_nonzero(status != 0))
f.close()
bursts_per_file = np.array(bursts_per_file)
print("Bursts per sequence file are: {}".format(bursts_per_file))
agipd_cond = AgipdCtrl(
run_dc=RunDirectory(f'{in_folder}/r{first_run:04d}'),
image_src=instrument_src,
ctrl_src=ctrl_src,
raise_error=False, # to be able to process very old data without gain_setting value
)
```
%% Cell type:code id: tags:
``` python
# Define creation time
creation_time=None
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run)
creation_time = get_dir_creation_date(in_folder, first_run)
creation_time = creation_time + timedelta(hours=delta_time)
print(f"Using {creation_time} as creation time of constant.")
if not creation_time and use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run)
# Read AGIPD parameter conditions.
if acq_rate < 0.:
acq_rate = agipd_cond.get_acq_rate()
if mem_cells < 0:
mem_cells = agipd_cond.get_num_cells()
# TODO: look for alternative for passing creation_time
if gain_setting < 0:
gain_setting = agipd_cond.get_gain_setting(creation_time)
if bias_voltage < 0:
bias_voltage = agipd_cond.get_bias_voltage(karabo_id_control)
if integration_time < 0:
integration_time = agipd_cond.get_integration_time()
if creation_time:
print("Using {} as creation time".format(creation_time.isoformat()))
print(f"Acquisition rate: {acq_rate} MHz")
print(f"Memory cells: {mem_cells}")
print(f"Gain setting: {gain_setting}")
print(f"Integration time: {integration_time}")
print(f"Using {creation_time} as creation time of constant.")
```
%% Cell type:markdown id: tags:
## Read in data and merge ##
The new merging cannot handle the old data formats before Dec. 2017.
%% Cell type:code id: tags:
``` python
if "{" in h5path_ctrl:
h5path_ctrl = h5path_ctrl.format(karabo_id_control)
def merge_data(runs, in_folder, karabo_id, channel):
if gain_setting == 0.1:
if creation_time.replace(tzinfo=None) < dateutil.parser.parse('2020-01-31'):
print("Set gain-setting to None for runs taken before 2020-01-31")
gain_setting = None
else:
try:
gain_setting = get_gain_setting(control_fname, h5path_ctrl)
except Exception as e:
print(f'Error while reading gain setting from: \n{control_fname}')
print(e)
print("Gain setting is not found in the control information")
print("Data will not be processed")
sequences = []
print(f"Gain setting: {gain_setting}")
in_folder = in_folder+"/r{:04d}/"
def count_min_bursts(in_folder, runs, channel):
bursts = np.zeros(len(runs))
for i, run in enumerate(runs):
run_folder_path = in_folder.format(run)
r = RunDirectory(run_folder_path)
instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(channel)
c = r.get_array(instrument_source, 'image.length')
total_frame = np.count_nonzero(c)
cells = r.detector_info(instrument_source)['frames_per_train']
bursts[i] = total_frame // cells
bursts_total = np.min(bursts).astype(int)
return bursts_total, cells
bursts_total, cells = count_min_bursts(in_folder, runs, channel)
pc_data = {'analog': np.zeros((bursts_total, cells, 128, 512)),
'digital': np.zeros((bursts_total, cells, 128, 512)),
'cellID': np.zeros(((bursts_total) * cells))
}
for counter, run in enumerate(runs):
run_folder_path = in_folder.format(run)
print('Run: ', run_folder_path)
r = RunDirectory(run_folder_path)
instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(channel)
print('Module: ', instrument_source)
d = r.get_array(instrument_source, 'image.data')
d = d.values.reshape((d.shape[0] // cells, cells, 2, 512, 128))
d = np.moveaxis(d, 4, 3)
c = r.get_array(instrument_source, 'image.cellId')
print('Bursts: {}, Mem cells: {}\n'.format(c.shape[0] // cells, cells))
for i in range(7 - counter, 64, 8): # first run starts in row 7 and then decreases in each run by 1
pc_data['analog'][..., i, :] = d[:bursts_total, :, 0, i, :]
pc_data['digital'][..., i, :] = d[:bursts_total, :, 1, i, :]
for i in range(64 + counter, 128, 8): # separated module to 2 halves to catch the injection pattern correctly
pc_data['analog'][..., i, :] = d[:bursts_total, :, 0, i, :]
pc_data['digital'][..., i, :] = d[:bursts_total, :, 1, i, :]
if counter == 0:
pc_data['cellID'][...] = c[:bursts_total * cells, 0]
del d
del c
return pc_data['analog'], pc_data['digital'], pc_data['cellID']
if integration_time < 0:
integration_time = get_integration_time(control_fname, h5path_ctrl)
print(f"Integration time: {integration_time}")
```
%% Cell type:code id: tags:
``` python
def read_and_merge_module_data(cells, path_temp, image_name_temp,
runs, seqs, il_mode, rawversion, instrument, channel):
import os
start = datetime.now()
p = partial(merge_data, runs, in_folder, karabo_id)
# chunk this a bit, so that we don't overuse available memory
res = list(map(p, modules))
end = datetime.now() - start
print('Duration of merging: ', end)
```
import h5py
import numpy as np
%% Cell type:markdown id: tags:
## Plotting of merged PC signal
def cal_bursts_per_file(run, dseq=0):
%% Cell type:markdown id: tags:
bursts_per_file = []
channel = 0
The merged image of the injected pulse capacitor signal is plotted here for selected trains and a memory cell.
The left side shows images of analog signal, while the right side shows digital signal image. Rows visualize the signal in a given train.
for seq in range(dseq, seqs+dseq):
#print(run, channel, seq)
fname = os.path.join(path_temp.format(run),
image_name_temp.format(run, channel, seq))
#print('Reading ',fname)
with h5py.File(fname, 'r') as f:
if rawversion == 2:
count = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/count".format(karabo_id, channel)][()])
bursts_per_file.append(np.count_nonzero(count))
del count
else:
status = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/status".format(karabo_id, channel)][()])
bursts_per_file.append(np.count_nonzero(status != 0))
del status
if bursts_per_file[0] == 0:
return cal_bursts_per_file(run, dseq=dseq+1) # late start of daq
return np.array(bursts_per_file), dseq
#bursts_per_file = np.hstack([0, bursts_per_file])
bursts_total = np.max([np.sum(cal_bursts_per_file(run)[0]) for run in runs])
cfac = 2 if il_mode else 1
def read_raw_data_file(fname, channel, cells = cells, cells_tot = cells, bursts = 250,
skip_first_burst = True, first_burst_length = cells):
data = None
cellID_all = None
with h5py.File(fname, 'r') as f:
#print('Reading ',fname)
image_path_temp = 'INSTRUMENT/{}/DET/{}CH0:xtdf/image/data'.format(karabo_id, channel)
cellID_path_temp = 'INSTRUMENT/{}/DET/{}CH0:xtdf/image/cellId'.format(karabo_id, channel)
if rawversion == 2:
count = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/count".format(karabo_id, channel)])
first = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/first".format(karabo_id, channel)])
last_index = int(first[count != 0][-1]+count[count != 0][-1])
first_index = int(first[count != 0][0])
else:
status = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/status".format(karabo_id, channel)])
if np.count_nonzero(status != 0) == 0:
return
last = np.squeeze(f["/INDEX/{}/DET/{}CH0:xtdf/image/last".format(karabo_id, channel)])
last_index = int(last[status != 0][-1])
first_index = int(last[status != 0][0])
#print(first_index, last_index)
data = f[image_path_temp][first_index:last_index,...][()]
cellID_all = np.squeeze(f[cellID_path_temp][first_index:last_index,...][()])
data = data[cellID_all<cells, ...]
#bursts = int(data.shape[0]/adcells)
#print('Bursts: ', bursts)
analog = np.zeros((bursts - skip_first_burst, cells//cfac, 128, 512))
digital = np.zeros((bursts - skip_first_burst, cells//cfac, 128, 512))
cellID = np.zeros(( (bursts - skip_first_burst) * cells))
offset = skip_first_burst * first_burst_length
%% Cell type:code id: tags:
for b in range(min(bursts, data.shape[0]//cells-1) - skip_first_burst-1):
try:
``` python
def PC_image_plotting(res, burst_of_interest, cell_of_interest):
analog[b, : cells//cfac, ...] = np.swapaxes(data[b * cells_tot + offset : b * cells_tot + cells + offset : cfac,
0, ...], -1, -2)
digital[b, : cells//cfac, ...] = np.swapaxes(data[b * cells_tot + cfac - 1 + skip_first_burst * first_burst_length :
b * cells_tot + cells + cfac - 1 + offset :cfac, cfac%2, ...], -1, -2)
cellID[ b * cells : (b + 1) * cells] = cellID_all[b * cells_tot + offset : b * cells_tot + cells + offset].flatten()
except:
#print(b * cells_tot + offset, b * cells_tot + cells + offset)
#print(b, offset, cells, data.shape[0]//cells)
raise AttributeError("Foo")
return {'analog': analog, 'digital': digital, 'cellID': cellID}
fig, axs = plt.subplots(len(burst_of_interest), 2, figsize=(10, 9))
fig.subplots_adjust(left=0.02, bottom=0.06, right=0.95, top=0.94, wspace=0.25)
yticks_major=np.arange(0,129,64)
xticks_major=np.arange(0,512,64)
pc_data = {'analog': np.zeros((bursts_total, cells//cfac, 128, 512)),
'digital': np.zeros((bursts_total, cells//cfac, 128, 512)),
'cellID': np.zeros(((bursts_total) * cells))
}
pc_data_merged = {'analog': np.zeros((bursts_total, cells//cfac, 128, 512)),
'digital': np.zeros((bursts_total, cells//cfac, 128, 512)),
'cellID': np.zeros(((bursts_total) * cells))
}
for img_type in range(0,2):
if img_type == 0:
title = 'PC signal (ADU)'
img_title = 'Analog'
scale_min = 5000
scale_max = 11000
else:
title = 'PC gain signal (ADU)'
img_title = 'Digital'
scale_min = 4500
scale_max = 8500
for x, b, in enumerate(burst_of_interest):
im1 = axs[x, img_type].imshow(res[0][img_type][b, cell_of_interest, ...], interpolation='nearest',
cmap='jet', origin='lower',vmin=scale_min, vmax=scale_max, aspect='equal')
cbar = fig.colorbar(im1, ax=axs[x, img_type], fraction=0.1, pad=0.2, orientation='horizontal')
cbar.set_label(title)
axs[x, img_type].set_title("{}, Burst: {}, Cell: {}".format(img_title, b, cell_of_interest))
axs[x,img_type].set_xticks(xticks_major)
axs[x,img_type].xaxis.grid(True, which='major', linewidth=0.5, color='grey')
axs[x,img_type].set_yticks(yticks_major)
axs[x,img_type].yaxis.grid(True, which='major', linewidth=0.5, color='grey')
axs[x,img_type].set_xlabel('Columns')
axs[x,img_type].set_ylabel('Rows')
plt.show()
return fig
```
for run_idx, run in enumerate(runs):
bursts_per_file, dseq = cal_bursts_per_file(run)
print("Run {}: bursts per file: {} -> {} total".format(run, bursts_per_file, np.sum(bursts_per_file)))
#Read files in
last_burst = 0
for seq in range(dseq, seqs+dseq):
fname = os.path.join(path_temp.format(run),
image_name_temp.format(run, channel, seq))
if seq-dseq == 0:
skip_first_burst = True
else:
skip_first_burst = False
bursts = bursts_per_file[seq-dseq]
%% Cell type:markdown id: tags:
try:
aa = read_raw_data_file(fname, channel, bursts = bursts,
skip_first_burst = skip_first_burst,
first_burst_length = cells)
pc_data['analog'][last_burst : last_burst+bursts_per_file[seq-dseq]-skip_first_burst, ...] = aa['analog']
pc_data['digital'][last_burst : last_burst+bursts_per_file[seq-dseq]-skip_first_burst, ...] = aa['digital']
pc_data['cellID'][last_burst * cells : (last_burst+bursts_per_file[seq-dseq]-skip_first_burst) * cells, ...] = aa['cellID']
except Exception as e:
print(e)
pc_data['analog'][last_burst : last_burst+bursts_per_file[seq-dseq]-skip_first_burst, ...] = 0
pc_data['digital'][last_burst : last_burst+bursts_per_file[seq-dseq]-skip_first_burst, ...] = 0
pc_data['cellID'][last_burst * cells : (last_burst+bursts_per_file[seq-dseq]-skip_first_burst) * cells, ...] = 0
finally:
last_burst += bursts_per_file[seq-dseq]-skip_first_burst
# Copy injected rows
for row_i in range(8):
try:
pc_data_merged['analog'][:,:,row_i * 8 + (7 - run_idx),:] = pc_data['analog'][:bursts_total,:cells//cfac,row_i * 8 + (7 - run_idx),:]
pc_data_merged['analog'][:,:,64 + row_i * 8 + run_idx ,:] = pc_data['analog'][:bursts_total,:cells//cfac, 64 + row_i * 8 + run_idx,:]
pc_data_merged['digital'][:,:,row_i * 8 + (7 - run_idx),:] = pc_data['digital'][:bursts_total,:cells//cfac,row_i * 8 + (7 - run_idx),:]
pc_data_merged['digital'][:,:,64 + row_i * 8 + run_idx ,:] = pc_data['digital'][:bursts_total,:cells//cfac, 64 + row_i * 8 + run_idx,:]
except Exception as e:
print(e)
#Check cellIDs
#Copy cellIDs of first run
if run_idx == 0:
pc_data_merged['cellID'][...] = pc_data['cellID'][...]
#Check cellIDs of all the other runs
#else:
# print('cellID difference:{}'.format(np.sum(pc_data_merged['cellID']-pc_data['cellID'])))
return pc_data_merged['analog'], pc_data_merged['digital'], pc_data_merged['cellID']
### PC signal of a selected cell and bursts
start = datetime.now()
p = partial(read_and_merge_module_data, maxcells, path_temp, image_name_temp,
runs, seqs, IL_MODE, rawversion, instrument)
# chunk this a bit, so that we don't overuse available memory
res = list(map(p, modules))
%% Cell type:code id: tags:
``` python
burst_of_interest = [100, 200, 550]
cell_of_interest = 4
for module in modules:
fig = PC_image_plotting(res, burst_of_interest, cell_of_interest)
```
%% Cell type:markdown id: tags:
## Slope clustering and Fitting ##
The following two cells contain the actual algorithm logic as well as a preview of a single pixel and memory cells visualizing the data and the concepts.
We start out with calculating an estimate of the slope in proximity of a given data value. This is done by calculating the slopes of a given value with 15 neighbours and averaging the result. Values are then clustered by these slopes into three regions via a K-means algorithm.
* for the first region a linear function is fitted to the data, determining the gain slope and offset for the high gain mode.
$$y = mx + b$$
* for the second and third region a composite function of the form:
* for the second and third region a composite function (so-called hook function) of the form:
$$y = A*e^{-(x-O)/C}+mx+b$$
is fitted, covering both the transition region and the medium gain slope.
If variable {fit_hook} is set to False (default) only the medium gain region is fitted with linear function and the transition region is omitted in the fitting procedure.
Clustering results are not used if {hg_range} or {mg_range} are defined as positive integers, thus setting borders for the high gain region (hg_range) or medium gain region (mg_region). Data inside each region are fitted with linear function. Hook function fit to the data is not allowed in this case.
%% Cell type:code id: tags:
``` python
from iminuit import Minuit
from iminuit.util import describe, make_func_code
from sklearn.cluster import KMeans
def calc_m_cluster(x, y):
scan_range = 15
ms = np.zeros((x.shape[0], scan_range))
for i in range(scan_range):
xdiffs = x - np.roll(x, i+1)
ydiffs = y - np.roll(y, i+1)
m = ydiffs/xdiffs
ms[:,i] = m
m = np.mean(ms, axis=1)
k = KMeans(n_clusters=3, n_jobs=-2)
k.fit(m.reshape(-1, 1))
ms = []
for lbl in np.unique(k.labels_):
xl = x[k.labels_ == lbl]
xd = np.reshape(xl, (len(xl), 1))
xdiff = xd - xd.transpose()
yl = y[k.labels_ == lbl]
yd = np.reshape(yl, (len(yl), 1))
ydiff = yd - yd.transpose()
ms.append(np.mean(np.nanmean(ydiff/xdiff, axis=0)))
return ms, k.labels_, k.cluster_centers_
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def calc_m_cluster2(x, y, r1=5, r2=0, r3=1.5):
scan_range = 15
ms = np.zeros((x.shape[0], scan_range))
for i in range(scan_range):
xdiffs = x - np.roll(x, i+1)
ydiffs = y - np.roll(y, i+1)
m = ydiffs/xdiffs
ms[:,i] = m
m = np.mean(ms, axis=1)
m[scan_range//2:-scan_range//2+1] = np.mean(rolling_window(m, scan_range),-1)
reg1 = m > r1
reg2 = m < r2
reg3 = (m > r2) & (m < r3)
reg4 = ~(reg1 | reg2 | reg3)
labels = [reg1, reg2, reg3, reg4]
regions = np.zeros_like(x, np.uint8)
for r, lbl in enumerate(labels):
regions[lbl] = r
scan_range = 30
mregions = np.round(np.mean(rolling_window(regions, scan_range),-1))
# change from np.nan to -1
regions[...] = -1
regions[scan_range//2:-scan_range//2+1] = mregions
labels = [regions == 0, regions == 1, regions == 2, regions == 3]
idx = np.arange(x.size)
maxlbl = x.size-1
for i in range(0, len(labels)-1):
nidx = labels[i+1]
if np.any(nidx):
maxlbl = np.max(idx[nidx])
cidx = idx > maxlbl
if np.any(cidx):
labels[i][cidx] = False
ms = []
for lbl in labels:
xl = x[lbl]
xd = np.reshape(xl, (len(xl), 1))
xdiff = xd - xd.transpose()
yl = y[lbl]
yd = np.reshape(yl, (len(yl), 1))
ydiff = yd - yd.transpose()
ms.append(np.mean(np.nanmean(ydiff/xdiff, axis=0)))
return ms, labels, None
def fit_data(fun, x, y, yerr, par_ests):
par_ests["throw_nan"] = False
par_ests["pedantic"] = False
par_ests["print_level"] = 0
f_sig = describe(fun)[1:]
class _Chi2Functor:
def __init__(self, f, x, y, err):
self.f = f
self.x = x[y != 0]
self.y = y[y != 0]
self.err = err[y != 0]
f_sig = describe(f)
# this is how you fake function
# signature dynamically
self.func_code = make_func_code(
f_sig[1:]) # docking off independent variable
self.func_defaults = None # this keeps numpy.vectorize happy
def __call__(self, *arg):
# notice that it accept variable length
# positional arguments
# chi2 = sum((y-self.f(x,*arg))**2 for x,y in zip(self.x,self.y))
return np.sum(((self.f(self.x, *arg) - self.y) ** 2) / self.err)
wrapped = _Chi2Functor(fun, x, y, yerr)
m = Minuit(wrapped, **par_ests)
fmin = m.migrad()
return m.values
def lin_fun(x, m, b):
return m*x+b
def hook_fun(x, a, c, o, m, b):
return a*np.exp(-(x-o)/c)+m*x+b
```
%% Cell type:code id: tags:
``` python
from cal_tools.tools import get_constant_from_db_and_time
offsets = {}
noises = {}
thresholds = {}
for k_da, mod in zip(karabo_da, modules):
offset, when = get_constant_from_db_and_time(karabo_id, k_da,
Constants.AGIPD.Offset(),
Conditions.Dark.AGIPD(
memory_cells=mem_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
integration_time=integration_time),
np.zeros((128, 512, mem_cells, 3)),
cal_db_interface,
creation_time=creation_time)
offsets[mod] = np.array(offset.data)
noise, when = get_constant_from_db_and_time(karabo_id, k_da,
Constants.AGIPD.Noise(),
Conditions.Dark.AGIPD(
memory_cells=mem_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
integration_time=integration_time),
np.zeros((128, 512, mem_cells, 3)),
cal_db_interface, creation_time=creation_time)
noises[mod] = np.array(noise.data)
threshold, when = get_constant_from_db_and_time(karabo_id, k_da,
Constants.AGIPD.ThresholdsDark(),
Conditions.Dark.AGIPD(
memory_cells=mem_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
integration_time=integration_time),
np.zeros((128, 512, mem_cells, 3)),
cal_db_interface, creation_time=creation_time)
thresholds[mod] = np.array(threshold.data)
```
%% Cell type:code id: tags:
``` python
test_pixels = []
tpix_range1 = [(0,16), (0,64)]
for i in range(*tpix_range1[0]):
for j in range(*tpix_range1[1]):
test_pixels.append((j,i))
test_cells = [4, 38, 64, 128]#, 200, 249]
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
from mpl_toolkits.axes_grid1 import ImageGrid
for mod, r in zip(modules, res):
dig, ana, cellId = r
ana, dig, cellId = r
d = []
d2 = []
d3 = []
H = [0, 0, 0, 0]
ex, ey = None, None
offset = offsets[mod]
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3,1)
x = np.arange(dig.shape[0])
y = dig[:,cell, pix[0], pix[1]]
x = np.arange(ana.shape[0])
y = ana[:,cell, pix[0], pix[1]]
yd = dig[:,cell, pix[0], pix[1]]
thresh_trans = thresholds[mod][pix[0], pix[1], cell,0]
vidx = (y > 1000) & np.isfinite(y)
x = x[vidx]
y = y[vidx]
yd = yd[vidx]
if x.shape[0] == 0:
continue
ms, labels, centers = calc_m_cluster2(x, y)
bound = None
bound_m = None
markers = ['o','.','x','v']
colors = ['b', 'r', 'g', 'k']
ymin = y.min()
#### Transition region gain is set based on dark thresholds. ####
msk = yd[labels[1]] < thresh_trans
label_filter = np.where(labels[1])[0]
labels[1][label_filter[msk]] = False
labels[0][label_filter[msk]] = True
################################################################
for i, lbl in enumerate(labels):
if np.any(lbl):
#ym = y[lbl]-y[lbl].min()
if i == 0:
gain = 0
else:
gain = 1
ym = y[lbl] - offset[pix[0], pix[1], cell, gain]
#if i != 0:
# ym += y[labels[0]].max()-y[labels[0]].min()
h, ex, ey = np.histogram2d(x[lbl], ym, range=((0, 600), (-500, 6000)), bins=(300, 650))
H[i] += h
fig = plt.figure(figsize=(10,10))
fig = plt.figure(figsize=(10,7))
ax = fig.add_subplot(111)
ax.grid(lw=1.5)
for i in range(3):
H[i][H[i]==0] = np.nan
ax.imshow(H[0].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='summer', alpha=0.7, vmin=0, vmax=1000)
ax.imshow(H[1].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='spring', alpha=0.7, vmin=0, vmax=100)
ax.imshow(H[2].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='winter', alpha=0.7, vmin=0, vmax=1000)
ax.set_ylabel("AGIPD response (ADU)")
ax.set_xlabel("PC scan point (#)")
ax.set_ylabel("AGIPD response (ADU)", fontsize=12)
ax.set_xlabel("PC scan point (#)", fontsize=12)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.show()
```
%% Cell type:markdown id: tags:
### Examples from Pixel Subset ###
The follwing is an visualization of the clustering and fitting for a subset of pixels. If data significantly mismatches expectations, the clustering and fitting algorithms should fail for this subset:
* the first plot shows the clustering results for pixels which were sucessfully evaluated
* the second plot shows the clustering results for pixels which failed to evaluate
* the second plot shows the clustering results for pixels which failed to evaluate--> change this
* the third plot shows the fits and fit residuals for the pixel clusters shown in the first plot
Non-smooth behaviour is an indication that you are errorously processing interleaved data that is not, or vice versa, or have the wrong number of memory cells set.
We do this twice for different detector regions
%% Cell type:code id: tags:
``` python
test_pixels = []
tpix_range1 = [(250,254), (60,64)]
for i in range(*tpix_range1[0]):
for j in range(*tpix_range1[1]):
test_pixels.append((j,i))
test_cells = [4, 38]
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
def find_params(x, y, yd, thresh_trans):
for mod, r in zip(modules, res):
dig, ana, cellId = r
d = []
d2 = []
d3 = []
offset = offsets[mod]
noise = noises[mod]
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3,1)
ms, labels, centers = calc_m_cluster2(x, y)
bound = None
bound_m = None
if labels[1].any():
bound = np.min(x[labels[1]])
bound_m = ms[1]
threshold = (np.mean(yd[labels[0]])+np.mean(yd[labels[2]])) / 2
#### Transition region gain is set based on dark thresholds. ####
msk = yd[labels[1]] < thresh_trans
label_filter = np.where(labels[1])[0]
labels[1][label_filter[msk]] = False
labels[0][label_filter[msk]] = True
try:
if np.max(x[labels[0]] > 220): # step value above which dark threholds are not considered
labels[0][label_filter[msk]] = False
except ValueError:
print('Could not determine maximum value.')
pass
###############################################################
if bound is None or bound < 20: #and False:
ya = dig[:,cell, pix[0], pix[1]][vidx]
msa, labels, centers = calc_m_cluster2(x, ya, 25, -10, 25)
if np.count_nonzero(labels[0]) > 0:
bound = np.min(x[labels[0]])
bound_m = ms[3]
else:
bound = 0
bound_m = 0
x = np.arange(dig.shape[0])
y = dig[:,cell, pix[0], pix[1]]
if hg_range[1] > 0 :
hg_bound_high = hg_range[1]
hg_bound_low = hg_range[0]
else :
hg_bound_high = bound - 20
hg_bound_low = 0
try:
if fit_hook:
mg_bound_high = len(x)
try:
mg_bound_low = np.max(x[labels[1]]) + 20
except ValueError: #raised if `y` is empty.
mg_bound_low = bound + 100
else :
if mg_range[0] > 0 :
mg_bound_low = mg_range[0]
mg_bound_high = mg_range[1]
else :
mg_bound_high = len(x)
try:
mg_bound_low = np.max(x[labels[1]]) + 20
except ValueError: #raised if `y` is empty.
mg_bound_low = bound + 100
except Exception as e:
if "zero-size array" in str(e):
pass
else:
print(e)
vidx = (y > 1000) & np.isfinite(y)
x = x[vidx]
y = y[vidx]
return hg_bound_low, hg_bound_high, mg_bound_low, mg_bound_high, bound, bound_m, labels, threshold
```
ms, labels, centers = calc_m_cluster2(x, y)
bound = None
bound_m = None
markers = ['o','.','x','v']
colors = ['b', 'r', 'g', 'k']
for i, lbl in enumerate(labels):
if i == 0:
gain = 0
else:
gain = 1
d.append({'x': x[lbl],
'y': y[lbl] - offset[pix[0], pix[1], cell, gain],
'marker': markers[i],
'color': colors[i],
'linewidth': 0
})
#if ms[i] < 0: # slope separating two regions
# bound = np.min(x[lbl])
# bound_m = ms[i]
if labels[1].any():
bound = np.min(x[labels[1]])
bound_m = ms[1]
if bound is None or bound < 20 and False:
ya = ana[:,cell, pix[0], pix[1]][vidx]
msa, labels, centers = calc_m_cluster2(x, ya, 25, -10, 25)
if np.count_nonzero(labels[0]) > 0:
%% Cell type:code id: tags:
bound = np.min(x[labels[0]])
bound_m = ms[3]
else:
avg_g = np.nanmean(ya)
bound = np.max(x[y < avg_g])
bound_m = ms[3]
``` python
def evaluate_fitting_ROI(modules, res, tpix_range, test_cells, roi):
markers = ['o', '.', 'x', 'v']
colors = ['tab:blue', 'tab:red', 'tab:green', 'k']
test_pixels = []
fig1, ax1 = plt.subplots(figsize=(9, 5))
ax1.grid(zorder=0, lw=1.5)
ax1.set_ylabel("PC pixel signal (ADU)", fontsize=11)
ax1.set_xlabel('step #', fontsize=11)
fig2, ax2 = plt.subplots(figsize=(9, 5))
ax2.grid(zorder=0, lw=1.5)
ax2.set_ylabel("PC gain signal (ADU)", fontsize=11)
ax2.set_xlabel('step #', fontsize=11)
fig3 = plt.figure(figsize=(9, 5))
gs = gridspec.GridSpec(2, 1, height_ratios=[3, 1])
ax3 = plt.subplot(gs[0])
ax4 = plt.subplot(gs[1])
ax3.grid(zorder=0, lw=1.5)
ax3.set_ylabel('PC signal (keV)', fontsize=11)
ax4.set_ylabel('Relative\ndeviation', fontsize=11)
ax4.set_xlabel('step #', fontsize=11)
for i in range(*tpix_range[0]):
for j in range(*tpix_range[1]):
test_pixels.append((j,i))
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
#print(bound)
# fit linear slope
if not np.isnan(bound_m):
xl = x[(x<bound-20)]
yl = y[(x<bound-20)] - offset[pix[0], pix[1], cell, 0]
if yl.shape[0] != 0:
parms = {'m': bound_m, 'b': np.min(yl)}
for mod, r in zip(modules, res):
ana, dig, cellId = r
d = []
d2 = []
d3 = []
offset = offsets[mod]
noise = noises[mod]
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3, 1)
x = np.arange(ana.shape[0])
y = ana[:,cell, pix[0], pix[1]]
yd = dig[:,cell, pix[0], pix[1]]
thresh_trans = thresholds[mod][pix[0], pix[1], cell,0]
vidx = (y > 1000) & np.isfinite(y) & np.isfinite(x)
x = x[vidx]
y = y[vidx]
yd = yd[vidx]
errors = np.ones(xl.shape)*noise[pix[0], pix[1], cell, 0]
fitted = fit_data(lin_fun, xl, yl, errors , parms)
yf = lin_fun(xl, fitted['m'], fitted['b'])
max_devl = np.max(np.abs((yl-yf)/yl))
hg_bound_low, hg_bound_high, mg_bound_low, mg_bound_high, bound, bound_m, labels, threshold = find_params(x, y, yd, thresh_trans)
d3.append({'x': xl,
'y': yf,
'color': 'k',
'linewidth': 1,
'y2': (yf-yl)/errors
})
# fit hook slope
if fit_hook:
idx = (x >= bound) & (y > 0) & np.isfinite(x) & np.isfinite(y)
xh = x[idx]
yh = y[idx] - offset[pix[0], pix[1], cell, 1]
if len(yh[yh > 0]) == 0:
break
parms = {'m': bound_m/10 if bound_m/10>0.3 else 0.5, 'b': np.min(yh[yh > 0]), 'a': np.max(yh), 'c': 5, 'o': bound-1}
parms["limit_m"] = [0.3, 2.0]
parms["limit_c"] = [1., 1000]
errors = np.ones(xh.shape)*noise[pix[0], pix[1], cell, 1]
fitted = fit_data(hook_fun, xh, yh, errors, parms)
yf = hook_fun(xh, fitted['a'], fitted['c'], fitted['o'], fitted['m'], fitted['b'])
max_devh = np.max(np.abs((yh-yf)/yh))
#print(fitted)
d3.append({'x': xh,
'y': yf,
'color': 'red',
'linewidth': 1,
'y2': (yf-yh)/errors
})
for i, lbl in enumerate(labels):
if i == 0:
gain = 0
x = np.arange(ana.shape[0])
y = ana[:,cell, pix[0], pix[1]]
else:
gain = 1
vidx = (y > 1000) & np.isfinite(y)
x = x[vidx]
y = y[vidx]
ax1.plot(x[lbl], (y[lbl] - offset[pix[0], pix[1], cell, gain]), ls='None',
marker=markers[i], color=colors[i], alpha=0.3)
ax2.plot(x[lbl], yd[lbl], ls='None', marker=markers[i], color=colors[i], alpha=0.3)
ax2.plot(np.array([x[0], x[-1]]), np.ones(2) * threshold, lw=1, color='k', alpha=0.3)
#ms, labels, centers = calc_m_cluster2(x, y, 25, -10, 25)
if len(y[labels[0]]) != 0 and len(y[labels[2]]) != 0:
threshold = (np.mean(y[labels[0]])+np.mean(y[labels[2]]))/2
# fit linear slope
idx = (x > hg_bound_low) & (x < hg_bound_high)
xl = x[idx]
yl = y[idx] - y[0]
for i, lbl in enumerate(labels):
if yl.shape[0] != 0:
parms = {'m': bound_m, 'b': np.min(yl)}
parms["limit_m"] = [0., 50.]
errors = np.ones(xl.shape) * noise[pix[0], pix[1], cell, 0]
fitted = fit_data(lin_fun, xl, yl, errors , parms)
hg_slope = fitted['m']
pc_high_l = fitted['b']
d2.append({'x': x[lbl],
'y': y[lbl],
'marker': markers[i],
'color': colors[i],
'lw': None
yf = lin_fun(xl, hg_slope, pc_high_l)
})
ax3.plot(xl, (yl / FF_gain), color='tab:blue', ls='None',alpha=0.5, marker='o')
ax3.plot(xl, (yf / FF_gain), color='navy', lw=1, zorder=9)
ax4.plot(xl, ((yl-yf) / yl), lw=1, color='navy')
# fit hook slope
try:
if fit_hook:
idx = (x >= mg_bound_low) & (x < mg_bound_high)
xh = x[idx]
yh = y[idx] - offset[pix[0], pix[1], cell, 1]
parms = {'m': bound_m/10 if bound_m/10>0.3 else 0.5,
'b': np.min(yh[yh > -2000]),
'a': np.max(yh), 'c': 5, 'o': bound-1}
parms["limit_m"] = [0., 2.0]
parms["limit_c"] = [1., 1000]
errors = np.ones(xh.shape) * noise[pix[0], pix[1], cell, 1]
fitted = fit_data(hook_fun, xh, yh, errors, parms)
mg_slope = fitted['m']
pc_med_l = fitted['b']
slope_ratio = hg_slope / mg_slope
md_additional_offset = pc_high_l - pc_med_l * slope_ratio
yf = lin_fun(xh, mg_slope * slope_ratio, pc_med_l)
corr_yh = yh * slope_ratio + md_additional_offset
max_devh = np.median(np.abs((corr_yh - yf) / corr_yh))
d2.append({'x': np.array([x[0], x[-1]]),
'y': np.ones(2)*threshold,
else:
idx = (x >= mg_bound_low) & (x < mg_bound_high)
xh = x[idx]
yh = y[idx] - offset[pix[0], pix[1], cell, 1]
if len(yh[yh > -2000]) == 0:
break
parms = {'m': bound_m/10 if bound_m/10>0.3 else 0.5,
'b': np.min(yh[yh > -2000]),
}
parms["limit_m"] = [0., 2.0]
errors = np.ones(xh.shape) * noise[pix[0], pix[1], cell, 1]
fitted = fit_data(lin_fun, xh, yh, errors, parms)
mg_slope = fitted['m']
pc_med_l = fitted['b']
slope_ratio = hg_slope / mg_slope
md_additional_offset = pc_high_l - pc_med_l * slope_ratio
yf = lin_fun(xh, mg_slope * slope_ratio, pc_med_l)
corr_yh = yh * slope_ratio + md_additional_offset
max_devh = np.median(np.abs((corr_yh - yf) / corr_yh))
ax3.plot(xh, corr_yh/FF_gain, color='tab:green', ls='None', alpha=0.3, marker='x')
ax3.plot(xh, yf/FF_gain, color='green', lw=1, zorder=10)
ax4.plot(xh, ((corr_yh-yf)/corr_yh), lw=1, color='green')
except Exception as e:
if "zero-size array" in str(e):
pass
else:
print(e)
'color': 'k',
'lw': 1
fig1.show()
fig2.show()
fig3.show()
fig1.savefig("{}/module_{}_{}_pixel_plot.png".format(out_folder, mod, roi))
fig2.savefig("{}/module_{}_{}_pixel_plot_gain.png".format(out_folder, mod, roi))
fig3.savefig("{}/module_{}_{}_pixel_plot_fits.png".format(out_folder, mod, roi))
```
})
%% Cell type:markdown id: tags:
#threshold = (np.min(y[x<bound]) + np.max(y[x>=bound]))/2
## ROI 1
%% Cell type:code id: tags:
fig = xana.simplePlot(d, y_label="PC pixel signal (ADU)", figsize='2col', aspect=2,
x_label="step #")
fig.savefig("{}/module_{}_pixel_plot.png".format(out_folder, mod))
``` python
tpix_range1 = [(250,254), (60,63)]
test_cells = [4, 38]
roi = 'ROI1'
evaluate_fitting_ROI(modules, res, tpix_range1, test_cells, roi)
```
fig = xana.simplePlot(d2, y_label="PC gain signal (ADU)", figsize='2col', aspect=2,
x_label="step #")
fig.savefig("{}/module_{}_pixel_plot_gain.png".format(out_folder, mod))
%% Cell type:markdown id: tags:
fig = xana.simplePlot(d3, secondpanel=True, y_label="PC signal (ADU)", figsize='2col', aspect=2,
x_label="step #", y2_label="Residuals ($\sigma$)", y2_range=(-5,5))
fig.savefig("{}/module_{}_pixel_plot_fits.png".format(out_folder, mod))
```
## ROI 2
%% Cell type:code id: tags:
``` python
test_pixels = []
tpix_range2 = [(96,128), (32,64)]
for i in range(*tpix_range2[0]):
for j in range(*tpix_range2[1]):
test_pixels.append((j,i))
for mod, r in zip(modules, res):
dig, ana, cellId = r
d = []
d2 = []
d3 = []
offset = offsets[mod]
noise = noises[mod]
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3,1)
x = np.arange(dig.shape[0])
y = dig[:,cell, pix[0], pix[1]]
vidx = (y > 1000) & np.isfinite(y)
x = x[vidx]
y = y[vidx]
ms, labels, centers = calc_m_cluster2(x, y)
bound = None
bound_m = None
markers = ['o','.','x','v']
colors = ['b', 'r', 'g', 'k']
for i, lbl in enumerate(labels):
if i == 0:
gain = 0
else:
gain = 1
d.append({'x': x[lbl],
'y': y[lbl] - offset[pix[0], pix[1], cell, gain],
'marker': markers[i],
'color': colors[i],
'linewidth': 0
})
#if ms[i] < 0: # slope separating two regions
# bound = np.min(x[lbl])
# bound_m = ms[i]
if len(x[labels[1]]):
bound = np.min(x[labels[1]])
bound_m = ms[1]
# fit linear slope
idx = (x >= bound) & (y > 0) & np.isfinite(x) & np.isfinite(y)
xl = x[(x<bound-20)]
yl = y[(x<bound-20)] - offset[pix[0], pix[1], cell, 0]
errors = np.ones(xl.shape)*noise[pix[0], pix[1], cell, 0]
if yl.shape[0] != 0:
parms = {'m': bound_m, 'b': np.min(yl)}
fitted = fit_data(lin_fun, xl, yl, errors, parms)
yf = lin_fun(xl, fitted['m'], fitted['b'])
max_devl = np.max(np.abs((yl-yf)/yl))
xtt = np.arange(ana.shape[0])
ytt = ana[:,cell, pix[0], pix[1]]
vidx = (ytt > 1000) & np.isfinite(ytt)
xtt = xtt[vidx]
ytt = ytt[vidx]
#ms, labels, centers = calc_m_cluster2(x, y, 25, -10, 25)
if len(y[labels[0]]) != 0 and len(y[labels[2]]) != 0:
threshold = (np.mean(ytt[labels[0]])+np.mean(ytt[labels[2]]))/2
if threshold > 10000 or threshold < 4000:
d3.append({
'x': xl,
'y': yf,
'color': 'k',
'linewidth': 1,
'y2': (yf-yl)/errors
})
if bound is None:
ya = ana[:,cell, pix[0], pix[1]][vidx]
msa, labels, centers = calc_m_cluster2(x, ya, 25, -10, 25)
if np.count_nonzero(labels[0]) > 0:
bound = np.min(x[labels[0]])
bound_m = ms[3]
else:
avg_g = np.nanmean(ya)
bound = np.max(x[y < avg_g])
bound_m = ms[3]
# fit hook slope
try:
if fit_hook and len(yh[yh > 0]) !=0:
idx = (x >= bound) & (y > 0) & np.isfinite(x) & np.isfinite(y)
xh = x[idx]
yh = y[idx] - offset[pix[0], pix[1], cell, 1]
errors = np.ones(xh.shape)*noise[pix[0], pix[1], cell, 1]
parms = {
'm': np.abs(bound_m/10),
'b': np.min(yh[yh > 0]),
'a': np.max(yh),
'c': 5.,
'o': bound-1
}
parms["limit_m"] = [0.3, 2.0]
parms["limit_c"] = [1., 1000]
fitted = fit_data(hook_fun, xh, yh, errors, parms)
yf = hook_fun(xh, fitted['a'], fitted['c'], fitted['o'], fitted['m'], fitted['b'])
max_devh = np.max(np.abs((yh-yf)/yh))
#print(fitted)
if threshold > 10000 or threshold < 4000 or fitted['m'] < 0.2:
d3.append({
'x': xh,
'y': yf,
'color': 'red',
'linewidth': 1,
'y2': (yf-yh)/errors
})
except Exception as e:
if "zero-size array" in str(e):
pass
else:
print(e)
if threshold > 10000 or threshold < 4000:
for i, lbl in enumerate(labels):
d2.append({
'x': xtt[lbl],
'y': ytt[lbl],
'marker': markers[i],
'color': colors[i],
'lw': None
})
d2.append({'x': np.array([xtt[0], xtt[-1]]),
'y': np.ones(2)*threshold,
'color': 'k',
'lw': 1
})
#threshold = (np.min(y[x<bound]) + np.max(y[x>=bound]))/2
fig = xana.simplePlot(d, y_label="PC pixel signal (ADU)", figsize='2col', aspect=2,
x_label="step #")
fig.savefig("{}/module_{}_pixel_plot_fail.png".format(out_folder, mod))
fig = xana.simplePlot(d2, y_label="PC gain signal (ADU)", figsize='2col', aspect=2,
x_label="step #")
fig.savefig("{}/module_{}_pixel_plot_gain_fail.png".format(out_folder, mod))
fig = xana.simplePlot(d3, secondpanel=True, y_label="PC signal (ADU)", figsize='2col', aspect=2,
x_label="step #", y2_label="Residuals ($\sigma$)", y2_range=(-5,5))
fig.savefig("{}/module_{}_pixel_plot_fits_fail.png".format(out_folder, mod))
roi2 = 'ROI2'
evaluate_fitting_ROI(modules, res, tpix_range2, test_cells, roi2)
```
%% Cell type:code id: tags:
``` python
# Here we perform the calculations in column-parallel for all modules
def calibrate_single_row(cells, fit_hook, inp):
def calibrate_single_row(cells, fit_hook, ranges, inp):
hg_range = ranges[0]
mg_range = ranges[1]
import numpy as np
from iminuit import Minuit
from iminuit.util import describe, make_func_code
from sklearn.cluster import KMeans
yrd, yra, offset, noise = inp
yra, yrd, offset, noise, thresh_trans = inp
def rolling_window(a, window):
shape = a.shape[:-1] + (a.shape[-1] - window + 1, window)
strides = a.strides + (a.strides[-1],)
return np.lib.stride_tricks.as_strided(a, shape=shape, strides=strides)
def calc_m_cluster2(x, y, r1=5, r2=0, r3=1.5):
scan_range = 15
ms = np.zeros((x.shape[0], scan_range))
for i in range(scan_range):
xdiffs = x - np.roll(x, i+1)
ydiffs = y - np.roll(y, i+1)
m = ydiffs/xdiffs
m = ydiffs / xdiffs
ms[:,i] = m
m = np.mean(ms, axis=1)
m[scan_range//2:-scan_range//2+1] = np.mean(rolling_window(m, scan_range),-1)
reg1 = m > r1
reg2 = m < r2
reg3 = (m > r2) & (m < r3)
reg4 = ~(reg1 | reg2 | reg3)
labels = [reg1, reg2, reg3, reg4]
regions = np.zeros_like(x, np.uint8)
for r, lbl in enumerate(labels):
regions[lbl] = r
scan_range = 30
mregions = np.round(np.mean(rolling_window(regions, scan_range),-1))
# chanage from np.nan to -1
regions[...] = -1
regions[scan_range//2:-scan_range//2+1] = mregions
labels = [regions == 0, regions == 1, regions == 2, regions == 3]
idx = np.arange(x.size)
maxlbl = x.size-1
for i in range(0, len(labels)-1):
nidx = labels[i+1]
if np.any(nidx):
maxlbl = np.max(idx[nidx])
cidx = idx > maxlbl
if np.any(cidx):
labels[i][cidx] = False
ms = []
for lbl in labels:
xl = x[lbl]
xd = np.reshape(xl, (len(xl), 1))
xdiff = xd - xd.transpose()
yl = y[lbl]
yd = np.reshape(yl, (len(yl), 1))
ydiff = yd - yd.transpose()
ms.append(np.mean(np.nanmean(ydiff/xdiff, axis=0)))
return ms, labels, None
def fit_data(fun, x, y, yerr, par_ests):
par_ests["throw_nan"] = False
par_ests["pedantic"] = False
par_ests["print_level"] = 0
f_sig = describe(fun)[1:]
class _Chi2Functor:
def __init__(self, f, x, y, err):
self.f = f
self.x = x
self.y = y
self.err = err
f_sig = describe(f)
# this is how you fake function
# signature dynamically
self.func_code = make_func_code(
f_sig[1:]) # docking off independent variable
self.func_defaults = None # this keeps numpy.vectorize happy
def __call__(self, *arg):
# notice that it accept variable length
# positional arguments
# chi2 = sum((y-self.f(x,*arg))**2 for x,y in zip(self.x,self.y))
return np.sum(((self.f(self.x, *arg) - self.y) ** 2) / self.err)
wrapped = _Chi2Functor(fun, x, y, yerr)
m = Minuit(wrapped, **par_ests)
fmin = m.migrad()
return m.values
def lin_fun(x, m, b):
return m*x+b
def hook_fun(x, a, c, o, m, b):
return a*np.exp(-(x-o)/c)+m*x+b
# linear slope
ml = np.zeros(yrd.shape[1:])
bl = np.zeros(yrd.shape[1:])
devl = np.zeros(yrd.shape[1:])
ml[...] = np.nan
bl[...] = np.nan
devl[...] = np.nan
ml = np.full(yra.shape[1:], np.nan)
bl = np.full(yra.shape[1:], np.nan)
devl = np.full(yra.shape[1:], np.nan)
#hook function
mh = np.zeros(yrd.shape[1:])
bh = np.zeros(yrd.shape[1:])
ch = np.zeros(yrd.shape[1:])
oh = np.zeros(yrd.shape[1:])
ah = np.zeros(yrd.shape[1:])
devh = np.zeros(yrd.shape[1:])
dhm = np.zeros(yrd.shape[1:])
mh[...] = np.nan
bh[...] = np.nan
ch[...] = np.nan
oh[...] = np.nan
ah[...] = np.nan
devh[...] = np.nan
dhm[...] = np.nan
mh = np.full(yra.shape[1:], np.nan)
bh = np.full(yra.shape[1:], np.nan)
ch = np.full(yra.shape[1:], np.nan)
oh = np.full(yra.shape[1:], np.nan)
ah = np.full(yra.shape[1:], np.nan)
devh = np.full(yra.shape[1:], np.nan)
dhm = np.full(yra.shape[1:], np.nan)
# threshold
thresh = np.zeros(list(yrd.shape[1:])+[3,])
thresh[...] = np.nan
thresh = np.full(list(yrd.shape[1:])+[3,], np.nan)
failures = []
for col in range(yrd.shape[-1]):
for col in range(yra.shape[-1]):
try:
y = yrd[:,col]
y = yra[:,col]
x = np.arange(y.shape[0])
yd = yrd[:,col]
vidx = (y > 1000) & np.isfinite(y)
vidx = (y > 1000) & np.isfinite(y) & np.isfinite(x)
x = x[vidx]
y = y[vidx]
yd = yd[vidx]
ms, labels, centers = calc_m_cluster2(x, y)
bound = np.min(x[labels[1]])
bound_m = ms[1]
threshold = (np.mean(yd[labels[0]])+np.mean(yd[labels[2]])) / 2
thresh[col,0] = threshold
thresh[col,1] = np.mean(y[labels[0]])
thresh[col,2] = np.mean(y[labels[2]])
# fit linear slope
xl = x[x<bound-20]
yl = y[x<bound-20] - offset[col, 0]
errors = np.ones(xl.shape)*noise[col, 0]
if hg_range[1] > 0 :
hg_bound_high = hg_range[1]
hg_bound_low = hg_range[0]
else :
hg_bound_high = bound - 20
hg_bound_low = 0
if fit_hook:
mg_bound_high = len(x)
try:
mg_bound_low = np.max(x[labels[1]]) + 20
except ValueError: #raised if `y` is empty.
mg_bound_low = bound + 100
else :
if mg_range[0] > 0 :
mg_bound_low = mg_range[0]
mg_bound_high = mg_range[1]
else :
mg_bound_high = len(x)
try:
mg_bound_low = np.max(x[labels[1]]) + 20
except ValueError: #raised if `y` is empty.
mg_bound_low = bound + 100
idx = (x > hg_bound_low) & (x < hg_bound_high)
xl = x[idx]
yl = y[idx] - y[0] # instead of offset subtraction is better to subtract first pc value here
errors = np.ones(xl.shape) * noise[col, 0]
if yl.shape[0] != 0:
parms = {'m': bound_m, 'b': np.min(yl)}
parms["limit_m"] = [0., 50.]
fitted = fit_data(lin_fun, xl, yl, errors, parms)
yf = lin_fun(xl, fitted['m'], fitted['b'])
max_devl = np.median(np.abs((yl-yf)/yl))
ml[col] = fitted['m']
bl[col] = fitted['b']
hg_slope = fitted['m']
pc_high_l = fitted['b']
yf = lin_fun(xl, hg_slope, pc_high_l)
max_devl = np.median(np.abs((yl-yf) / yl))
ml[col] = hg_slope #fitted['m']
bl[col] = pc_high_l #fitted['b']
devl[col] = max_devl
#if np.any(labels[0]) and np.any(labels[2]):
#dhm[col] = y[labels[0]].max()-y[labels[2]].min()
dhml = lin_fun(bound, fitted['m'], fitted['b'])
dhml = lin_fun(bound, hg_slope, pc_high_l)
# fit hook slope
if fit_hook:
idx = (x >= bound) & (y > 0) & np.isfinite(x) & np.isfinite(y)
idx = (x >= mg_bound_low) & (x < mg_bound_high)
xh = x[idx]
yh = y[idx] - offset[col, 1]
errors = np.ones(xh.shape)*noise[col, 1]
parms = {'m': bound_m/10 if bound_m/10 > 0.3 else 0.5, 'b': np.min(yh[yh > 0]), 'a': np.max(yh), 'c': 5., 'o': bound-1}
parms["limit_m"] = [0.3, 2.0]
errors = np.ones(xh.shape) * noise[col, 1]
parms = {'m': bound_m/10 if bound_m/10 > 0.3 else 0.5, 'b': np.min(yh[yh > -2000]), 'a': np.max(yh), 'c': 5., 'o': bound-1}
parms["limit_m"] = [0., 2.0]
parms["limit_c"] = [1., 1000]
fitted = fit_data(hook_fun, xh, yh, errors, parms)
yf = hook_fun(xh, fitted['a'], fitted['c'], fitted['o'], fitted['m'], fitted['b'])
max_devh = np.median(np.abs((yh-yf)/yh))
mg_slope = fitted['m']
pc_med_l = fitted['b']
slope_ratio = hg_slope / mg_slope
md_additional_offset = pc_high_l - pc_med_l * slope_ratio
yf = lin_fun(xh, mg_slope * slope_ratio, pc_med_l)
corr_yh = yh * slope_ratio + md_additional_offset
max_devh = np.median(np.abs((corr_yh - yf) / corr_yh))
mh[col] = fitted['m']
bh[col] = fitted['b']
mh[col] = mg_slope # fitted['m']
bh[col] = pc_med_l # fitted['b']
ah[col] = fitted['a']
oh[col] = fitted['o']
ch[col] = fitted['c']
devh[col] = max_devh
dhm[col] = bound #(dhml) - lin_fun(bound, fitted['m'], fitted['b'])
y = yra[:,col]
x = np.arange(y.shape[0])
vidx = (y > 1000) & np.isfinite(y)
x = x[vidx]
y = y[vidx]
dhm[col] = bound
else :
idx = (x >= mg_bound_low) & (x < mg_bound_high)
xh = x[idx]
yh = y[idx] - offset[col, 1]
errors = np.ones(xh.shape)*noise[col, 1]
parms = {'m': bound_m/10 if bound_m/10 > 0.3 else 0.5,
'b': np.min(yh[yh > -2000]),
}
parms["limit_m"] = [0., 2.0]
fitted = fit_data(lin_fun, xh, yh, errors, parms)
mg_slope = fitted['m']
pc_med_l = fitted['b']
slope_ratio = hg_slope / mg_slope
md_additional_offset = pc_high_l - pc_med_l * slope_ratio
yf = lin_fun(xh, mg_slope * slope_ratio, pc_med_l)
corr_yh = yh * slope_ratio + md_additional_offset
max_devh = np.median(np.abs((corr_yh - yf) / corr_yh))
mh[col] = mg_slope # fitted['m']
bh[col] = pc_med_l # fitted['b']
ah[col] = 0
oh[col] = 1
ch[col] = 1
devh[col] = max_devh
dhm[col] = bound
threshold = (np.mean(y[labels[0]])+np.mean(y[labels[2]]))/2
thresh[col,0] = threshold
thresh[col,1] = np.mean(y[labels[0]])
thresh[col,2] = np.mean(y[labels[2]])
except Exception as e:
print(e)
failures.append((col, str(e)))
del yrd
del yra
return thresh, (ml, bl, devl), (mh, bh, ah, oh, ch, devh), failures, dhm
```
%% Cell type:code id: tags:
``` python
start = datetime.now()
fres = {}
failures = []
for i, r in zip(modules, res):
offset = offsets[i]
noise = noises[i]
qm = module_index_to_qm(i)
dig, ana, cellId = r
thresh_trans = thresholds[i][..., 0]
# linear slope
ml = np.zeros(dig.shape[1:])
bl = np.zeros(dig.shape[1:])
devl = np.zeros(dig.shape[1:])
#hook function
mh = np.zeros(dig.shape[1:])
bh = np.zeros(dig.shape[1:])
ch = np.zeros(dig.shape[1:])
oh = np.zeros(dig.shape[1:])
ah = np.zeros(dig.shape[1:])
devh = np.zeros(dig.shape[1:])
dhma = np.zeros(dig.shape[1:])
# threshold
thresh = np.zeros(list(dig.shape[1:]))
thresh_bounds = np.zeros(list(dig.shape[1:])+[2,])
for cell in range(dig.shape[1]):
inp = []
for j in range(dig.shape[2]):
inp.append((dig[:,cell,j,:], ana[:,cell,j,:], offset[j,:,cell,:], noise[j,:,cell,:]))
inp.append((dig[:,cell,j,:], ana[:,cell,j,:], offset[j,:,cell,:], noise[j,:,cell,:], thresh_trans[j,:,cell]))
p = partial(calibrate_single_row, cells, fit_hook)
#print("Running {} tasks in parallel".format(len(inp)))
p = partial(calibrate_single_row, cells, fit_hook, [hg_range, mg_range])
frs = view.map_sync(p, inp)
#frs = list(map(p, inp))
for j, fr in enumerate(frs):
threshr, lin, hook, fails, dhm = fr
mlr, blr, devlr = lin
mhr, bhr, ahr, ohr, chro, devhr = hook
failures.append(fails)
ml[cell,j,:] = mlr
bl[cell,j,:] = blr
devl[cell,j,:] = devlr
mh[cell,j,:] = mhr
bh[cell,j,:] = bhr
oh[cell,j,:] = ohr
ml[cell, j, :] = mlr
bl[cell, j, :] = blr
devl[cell, j, :] = devlr
mh[cell, j, :] = mhr
bh[cell, j, :] = bhr
oh[cell, j, :] = ohr
ch[cell,j,:] = chro
ah[cell,j,:] = ahr
devh[cell,j,:] = devhr
dhma[cell,j,:] = dhm
ah[cell, j, :] = ahr
devh[cell, j, :] = devhr
dhma[cell, j, :] = dhm
thresh[cell,j,...] = threshr[...,0]
thresh_bounds[cell,j,...] = threshr[...,1:]
thresh[cell, j, ...] = threshr[..., 0]
thresh_bounds[cell, j, ...] = threshr[..., 1:]
fres[qm] = {'ml': ml,
'bl': bl,
'devl': devl,
'tresh': thresh,
'tresh_bounds': thresh_bounds,
'dhm': dhma}
if fit_hook:
fres[qm].update({
'mh': mh,
'bh': bh,
'oh': oh,
'ch': ch,
'ah': ah,
'devh': devh,
})
fres[qm].update({
'mh': mh,
'bh': bh,
'oh': oh,
'ch': ch,
'ah': ah,
'devh': devh,
})
end = datetime.now() - start
print('Duration of fitting: ',end)
```
%% Cell type:markdown id: tags:
Results of slope fitting from PC runs values are
distinguished on axis 0 by index:
0: linear slope - m value
1: linear slope - b value
2: linear slope - deviation
3: hook function - m value
4: hook function - b value
5: hook function - o value
6: hook function - c value
7: hook function - a value
8: hook function - deviation
%% Cell type:code id: tags:
``` python
from collections import OrderedDict
from scipy.stats import median_abs_deviation as mad
bad_pixels = OrderedDict()
for qm, data in fres.items():
mask = np.zeros(data['ml'].shape, np.uint32)
mask[(data['tresh'][...,0] < 50) | (data['tresh'][...,0] > 8500)] |= BadPixels.CI_GAIN_OF_OF_THRESHOLD.value
mask[(data['devl'] == 0)] |= BadPixels.CI_LINEAR_DEVIATION.value
mask[(np.abs(data['devl']) > 0.5)] |= BadPixels.CI_LINEAR_DEVIATION.value
mask[(~np.isfinite(data['devl']))] |= BadPixels.CI_EVAL_ERROR.value
bad_pixels[qm] = mask
# high gain slope from pulse capacitor data
pc_high_m = data['ml']
pc_high_b = data['bl']
# medium gain slope from pulse capacitor data
pc_med_m = data['mh']
pc_med_b = data['bh']
fshape = pc_high_m.shape
# calculate median for slopes
pc_high_med = np.nanmedian(pc_high_m, axis=(1, 2))
pc_med_med = np.nanmedian(pc_med_m, axis=(1, 2))
# calculate median for intercepts:
pc_high_b_med = np.nanmedian(pc_high_b, axis=(1, 2))
pc_med_b_med = np.nanmedian(pc_med_b, axis=(1, 2))
# mad can only iterate on 1 axis
tshape = (fshape[0],fshape[1]*fshape[2])
# calculate MAD for slopes
pc_high_rms = mad(pc_high_m.reshape(tshape), axis=1, scale='normal', nan_policy='omit')
pc_med_rms = mad(pc_med_m.reshape(tshape), axis=1, scale='normal', nan_policy='omit')
# calculate MAD for intercepts:
pc_high_b_rms = mad(pc_high_b.reshape(tshape), axis=1, scale='normal', nan_policy='omit')
pc_med_b_rms = mad(pc_med_b.reshape(tshape), axis=1, scale='normal', nan_policy='omit')
# expand the arrays to match the size of the originals
pc_high_med = np.repeat(pc_high_med, fshape[1]*fshape[2]).reshape(fshape)
pc_med_med = np.repeat(pc_med_med, fshape[1]*fshape[2]).reshape(fshape)
pc_high_b_med = np.repeat(pc_high_b_med, fshape[1]*fshape[2]).reshape(fshape)
pc_med_b_med = np.repeat(pc_med_b_med, fshape[1]*fshape[2]).reshape(fshape)
pc_high_rms = np.repeat(pc_high_rms, fshape[1]*fshape[2]).reshape(fshape)
pc_med_rms = np.repeat(pc_med_rms, fshape[1]*fshape[2]).reshape(fshape)
pc_high_b_rms = np.repeat(pc_high_b_rms, fshape[1]*fshape[2]).reshape(fshape)
pc_med_b_rms = np.repeat(pc_med_b_rms, fshape[1]*fshape[2]).reshape(fshape)
# sanitize PC data
# replace vals outside range and nans with median.
bad_fits_high = (np.abs((pc_high_m - pc_high_med)/pc_high_rms) > sigma_dev_cut) |\
(np.abs((pc_high_b - pc_high_b_med)/pc_high_b_rms) > sigma_dev_cut) |\
(~np.isfinite(pc_high_m)) | (~np.isfinite(pc_high_b))
bad_fits_med = (np.abs((pc_med_m - pc_med_med)/pc_med_rms) > sigma_dev_cut) |\
(np.abs((pc_med_b - pc_med_b_med)/pc_med_b_rms) > sigma_dev_cut) |\
(~np.isfinite(pc_med_m)) | (~np.isfinite(pc_med_b))
def plot_cuts(a,sel,name,units, ax = None) :
stats_cut = {}
stats = {}
if ax is None :
fig = plt.figure(figsize=(5,5))
ax = fig.add_subplot(111)
stats_cut["Mean"] = np.nanmean(a[sel])
stats_cut["STD"] = np.nanstd(a[sel])
stats_cut["Median"] = np.nanmedian(a[sel])
stats_cut["Min"] = np.nanmin(a[sel])
stats_cut["Max"] = np.nanmax(a[sel])
stats["Mean"] = np.nanmean(a)
stats["STD"] = np.nanstd(a)
stats["Median"] = np.nanmedian(a)
stats["Min"] = np.nanmin(a)
stats["Max"] = np.nanmax(a)
m=np.nanmean(a)
s=np.nanstd(a)
bins = np.linspace(m-10*s,m+10*s,100)
ax.hist(a.ravel(),
bins = bins,
color='tab:red',
label='all fits',
)
ax.hist(a[sel].ravel(),
bins = bins,
color='tab:green',
label='good fits'
)
ax.grid(zorder=0)
ax.set_xlabel(units)
ax.set_yscale('log')
ax.set_title(name)
ax.legend()
def statistic(stat, colour, shift):
textstr = ""
for key in stat:
try:
textstr += '{0: <6}: {1: 7.2f}\n'.format(key, stat[key])
except:
pass
props = dict(boxstyle='round', alpha=0.65, color=colour,)
ax.text(0.05, 0.95-shift, textstr, transform=ax.transAxes, fontsize=10,
family='monospace', weight='book', stretch='expanded',
verticalalignment='top', bbox=props, zorder=2)
statistic(stats_cut, 'tab:green', 0)
statistic(stats, 'tab:red', 0.25)
fig = plt.figure(figsize=(15,15))
plot_cuts(pc_high_m,~(bad_fits_high | (mask>0)),'pc_high_m',"ADU/DAC",fig.add_subplot(221))
plot_cuts(pc_high_b,~(bad_fits_high | (mask>0)),'pc_high_b',"ADU",fig.add_subplot(222))
plot_cuts(pc_med_m,~(bad_fits_med | (mask>0)),'pc_med_m',"ADU/DAC",fig.add_subplot(223))
plot_cuts(pc_med_b,~(bad_fits_med | (mask>0)),'pc_med_b',"ADU",fig.add_subplot(224))
pc_high_m[bad_fits_high] = pc_high_med[bad_fits_high]
pc_high_b[bad_fits_high] = pc_high_b_med[bad_fits_high]
pc_med_m[bad_fits_med] = pc_med_med[bad_fits_med]
pc_med_b[bad_fits_med] = pc_med_b_med[bad_fits_med]
mask[bad_fits_high] |= BadPixels.CI_EVAL_ERROR.value
mask[bad_fits_med] |= BadPixels.CI_EVAL_ERROR.value
```
%% Cell type:code id: tags:
``` python
def slope_dict_to_arr(d):
key_to_index = {
"ml": 0,
"bl": 1,
"devl": 2,
"mh": 3,
"bh": 4,
"oh": 5,
"ch": 6,
"ah": 7,
"devh": 8,
"tresh": 9,
}
arr = np.zeros([11]+list(d["ml"].shape), np.float32)
for key, item in d.items():
if key not in key_to_index:
continue
arr[key_to_index[key],...] = item
return arr
```
%% Cell type:code id: tags:
``` python
from collections import OrderedDict
bad_pixels = OrderedDict()
for qm, data in fres.items():
mask = np.zeros(data['ml'].shape, np.uint32)
mask[(data['tresh'][...,0] < 50) | (data['tresh'][...,0] > 8500)] |= BadPixels.CI_GAIN_OF_OF_THRESHOLD.value
mask[(data['devl'] == 0)] |= BadPixels.CI_LINEAR_DEVIATION.value
mask[(np.abs(data['devl']) > 0.5)] |= BadPixels.CI_LINEAR_DEVIATION.value
mask[(~np.isfinite(data['devl']))] |= BadPixels.CI_EVAL_ERROR.value
bad_pixels[qm] = mask
```
%% Cell type:code id: tags:
``` python
if local_output:
ofile = "{}/agipd_pc_store_{}_{}_{}.h5".format(out_folder, "_".join([str(run) for run in runs]), modules[0], modules[-1])
store_file = h5py.File(ofile, "w")
for qm, r in fres.items():
for key, item in r.items():
store_file["/{}/{}/0/data".format(qm, key)] = item
#arr = slope_dict_to_arr(r)
#store_file["/{}/SlopesPC/0/data".format(qm)] = arr
store_file["/{}/{}/0/data".format(qm, "BadPixelsPC")] = bad_pixels[qm]
store_file.close()
```
%% Cell type:code id: tags:
``` python
# Read report path and create file location tuple to add with the injection
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = proposal + ' ' + ' '.join(list(map(str,runs)))
report = get_report(metadata_folder)
```
%% Cell type:code id: tags:
``` python
md = None
# set the operating condition
condition = Conditions.Dark.AGIPD(memory_cells=maxcells, bias_voltage=bias_voltage,
acquisition_rate=acq_rate, gain_setting=gain_setting)
condition = Conditions.Dark.AGIPD(memory_cells=mem_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
integration_time=integration_time)
db_modules = get_pdu_from_db(karabo_id, karabo_da, Constants.AGIPD.SlopesPC(),
condition, cal_db_interface,
snapshot_at=creation_time)
for pdu, (qm, r) in zip(db_modules, fres.items()):
for const in ["SlopesPC", "BadPixelsPC"]:
dbconst = getattr(Constants.AGIPD, const)()
if const == "SlopesPC":
dbconst.data = slope_dict_to_arr(r)
else:
dbconst.data = bad_pixels[qm]
if db_output:
md = send_to_db(pdu, karabo_id, dbconst, condition,
file_loc, report, cal_db_interface,
creation_time=creation_time)
creation_time=creation_time,
variant=1)
# TODO: check if this can replace other written function of this notebook.
#if local_output:
# md = save_const_to_h5(pdu, karabo_id, dconst, condition, dconst.data,
# file_loc, report, creation_time, out_folder)
print("Constants parameter conditions are:\n")
print(f"• memory_cells: {maxcells}\n• bias_voltage: {bias_voltage}\n"
print(f"• memory_cells: {mem_cells}\n• bias_voltage: {bias_voltage}\n"
f"• acquisition_rate: {acq_rate}\n• gain_setting: {gain_setting}\n"
f"• integration_time: {integration_time}\n"
f"• creation_time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n")
```
%% Cell type:markdown id: tags:
## Overview Plots ##
Each of the following plots represents one of the fit parameters of memory cell 4 on a module:
Each of the following plots represents one of the fit parameters of a defined memory cell on a module:
For the linear function of the high gain region
$$y = mx + b$$
* ml denotes the $m$ parameter
* bl denotes the $b$ parameter
* devl denotes the anbsolute relative deviation from linearity.
* devl denotes the absolute relative deviation from linearity.
For the linear function of the medium gain region
$$y = mx + b$$
* mh denotes the $m$ parameter
* bh denotes the $b$ parameter
* devh denotes the absolute relative deviation from linearity.
For the composite function of the medium gain and transition region
For the composite function (if selected) of the medium gain and transition region
$$y = A*e^{-(x-O)/C}+mx+b$$
* oh denotes the $O$ parameter
* ch denotes the $C$ parameter
* mh denotes the $m$ parameter
* bh denotes the $b$ parameter
* devh denotes the anbsolute relative deviation from the linear part of the function.
* devh denotes the absolute relative deviation from the linear part of the function.
Additionally, the thresholds and bad pixels (mask) are shown.
Finally, the red and white rectangles indicate the first and second pixel ranges
Finally, the red and white rectangles indicate the first and second pixel ranges.
%% Cell type:code id: tags:
``` python
import matplotlib.patches as patches
import matplotlib.pyplot as plt
from mpl_toolkits.axes_grid1 import AxesGrid
def preview_fitted_params(data, module, cell_to_preview, fit_hook):
plot_text= {"ml": 'HG slope (ml)',
"bl": 'HG intercept (bl)',
"devl": 'HG linearity deviation (devl)',
"tresh": 'Gain threshold (tresh)',
"tresh_bounds": 'Mean HG signal (tresh_bounds)',
"dhm": 'HG boundary (dhm)',
"mh": 'MG slope (mh)',
"bh": 'MG intercept (bh)',
"oh": 'Hook - o (oh)',
"ch": 'Hook - c (ch)',
"ah": 'Hook - a (ah)',
"devh": 'MG linearity deviation (devh)'
}
cell_to_preview = min(59, mem_cells-1)
for module, data in fres.items():
mask = bad_pixels[module]
fig = plt.figure(figsize=(20,20))
grid = AxesGrid(fig, 111,
nrows_ncols=(7 if fit_hook else 3, 2),
axes_pad=(0.9, 0.15),
label_mode="1",
if fit_hook:
grid = AxesGrid(fig, 111,
nrows_ncols=(7, 2),
axes_pad=(0.9, 0.55),
label_mode="0",
share_all=True,
cbar_location="right",
cbar_mode="each",
cbar_size="7%",
cbar_pad="2%",
cbar_pad="2%"
)
else:
grid = AxesGrid(fig, 111,
nrows_ncols=(5, 2),
axes_pad=(0.9, 0.55),
label_mode="0",
share_all=True,
cbar_location="right",
cbar_mode="each",
cbar_size="7%",
cbar_pad="2%"
)
mask = bad_pixels[module]
i = 0
for key, citem in data.items():
item = citem.copy()
item[~np.isfinite(item)] = 0
med = np.nanmedian(item)
bound = 0.1
maxcnt = 10
if med < 0:
bound = -bound
while(np.count_nonzero((item < med-bound*med) | (item > med+bound*med))/item.size > 0.01):
bound *=2
bound *= 2
maxcnt -= 1
if maxcnt < 0:
break
if "bounds" in key:
d = item[cell_to_preview,...,0]
im = grid[i].imshow(d, interpolation="nearest",
d = item[cell_to_preview, ..., 0]
im = grid[i].imshow(d, interpolation="nearest", origin='lower',
vmin=med-bound*med, vmax=med+bound*med)
grid[i].set_title(plot_text[key], fontsize=14)
else:
d = item[cell_to_preview,...]
im = grid[i].imshow(d, interpolation="nearest",
vmin=med-bound*med, vmax=med+bound*med)
cb = grid.cbar_axes[i].colorbar(im)
if fit_hook:
d = item[cell_to_preview, ...]
im = grid[i].imshow(d, interpolation="nearest", origin='lower',
vmin=med-bound*med, vmax=med+bound*med)
grid[i].set_title(plot_text[key], fontsize=14)
else:
if key == 'oh' or key == 'ch' or key == 'ah':
continue
else:
d = item[cell_to_preview, ...]
im = grid[i].imshow(d, interpolation="nearest", origin='lower',
vmin=med-bound*med, vmax=med+bound*med)
grid[i].set_title(plot_text[key], fontsize=14)
cb = grid.cbar_axes[i].colorbar(im)
# axes coordinates are 0,0 is bottom left and 1,1 is upper right
x0, x1 = tpix_range1[0][0], tpix_range1[0][1]
y0, y1 = tpix_range1[1][0], tpix_range1[1][1]
p = patches.Rectangle(
(x0, y0), x1-x0, y1-y0, fill=False, color="red")
p = patches.Rectangle((x0, y0), (x1 - x0), (y1 - y0), fill=False, color="red")
grid[i].add_patch(p)
x0, x1 = tpix_range2[0][0], tpix_range2[0][1]
y0, y1 = tpix_range2[1][0], tpix_range2[1][1]
p = patches.Rectangle(
(x0, y0), x1-x0, y1-y0, fill=False, color="white")
p = patches.Rectangle((x0, y0), (x1 - x0), (y1 - y0), fill=False, color="white")
grid[i].add_patch(p)
grid[i].text(20, 50, key, color="w", fontsize=50)
i += 1
im = grid[-1].imshow(mask[cell_to_preview,...], interpolation="nearest",
vmin=0, vmax=1)
im = grid[-1].imshow(mask[cell_to_preview, ...], interpolation="nearest", origin='lower', vmin=0, vmax=1)
cb = grid.cbar_axes[-1].colorbar(im)
grid[-1].set_title('Mask', fontsize=14)
```
%% Cell type:code id: tags:
grid[-1].text(20, 50, "mask", color="w", fontsize=50)
fig.savefig("{}/module_{}_PC.png".format(out_folder, module))
``` python
cell_to_preview=[1,mem_cells-5]
for module, data in fres.items():
print(' Memory cell {}:'.format(cell_to_preview[0]))
preview_fitted_params(data, module, cell_to_preview[0], fit_hook)
```
%% Cell type:code id: tags:
``` python
for module, data in fres.items():
print('Memory cell {}:'.format(cell_to_preview[1]))
preview_fitted_params(data, module, cell_to_preview[1], fit_hook)
```
%% Cell type:markdown id: tags:
### Memory Cell dependent behavior of thresholding ###
%% Cell type:code id: tags:
``` python
mltomh = ml/mh
fres[qm].update({'mltomh': mltomh})
toplot = {"tresh": "Gain theshold (ADU)",
"ml": "Slope (HG)",
"bl": "Offset (HG) (ADU)",
"mh": "Slope (MG)",
"bh": "Offset (MG) (ADU)",
"mltomh": "Ration slope_HG/slope_MG"}
from matplotlib.colors import LogNorm, PowerNorm
for module, data in fres.items():
fig, ax = plt.subplots(3, 2, figsize=(18, 15))
fig.subplots_adjust(wspace=0.1, hspace=0.15)
for module, data in fres.items():
mltomh = data['ml'] / data['mh']
fres[module].update({'mltomh': mltomh})
bins = 100
for typ, label in toplot.items():
for counter, (typ, label) in enumerate(toplot.items()):
r_hist = np.zeros((mem_cells, bins))
mask = bad_pixels[module]
thresh = data[typ]
hrange = [0.5*np.nanmedian(thresh), 1.5*np.nanmedian(thresh)]
hrange = [0.5 * np.nanmedian(thresh), 1.5 * np.nanmedian(thresh)]
if hrange[1] < hrange[0]:
hrange = hrange[::-1]
for c in range(mem_cells):
d = thresh[c,...]
d = thresh[c, ...]
h, e = np.histogram(d.flatten(), bins=bins, range=hrange)
r_hist[c, :] = h
fig = plt.figure(figsize=(5,5))
ax = fig.add_subplot(111)
im = ax.imshow(r_hist[:,:].T[::-1,:], interpolation="nearest",
aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(r_hist)),
extent=[0, mem_cells, hrange[0], hrange[1]])
ax.set_xlabel("Memory cell")
ax.set_ylabel(label)
cb = fig.colorbar(im)
cb.set_label("Counts")
#fig.savefig("/gpfs/exfel/data/scratch/haufs/test/agipd_gain_threholds.pdf", bbox_inches="tight")
if counter < 3:
im = ax[counter, 0].imshow(r_hist[:, :].T[::-1, :], interpolation="nearest",
aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(r_hist)),
extent=[0, mem_cells, hrange[0], hrange[1]])
ax[counter,0].set_xlabel("Memory cell", fontsize=12)
ax[counter,0].set_ylabel(label, fontsize=12)
cb = fig.colorbar(im, ax=ax[counter, 0], pad=0.01)
else:
im = ax[counter-3, 1].imshow(r_hist[:, :].T[::-1, :], interpolation="nearest",
aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(r_hist)),
extent=[0, mem_cells, hrange[0], hrange[1]])
ax[counter-3, 1].set_xlabel("Memory cell", fontsize=12)
ax[counter-3, 1].set_ylabel(label, fontsize=12)
cb = fig.colorbar(im, ax=ax[counter-3, 1], pad=0.01)
cb.set_label("Counts", fontsize=12)
```
%% Cell type:markdown id: tags:
## Global Bad Pixel Behaviour ##
The following plots show the results of bad pixel evaluation for all evaluated memory cells. Cells are stacked in the Z-dimension, while pixels values in x/y are rebinned with a factor of 2. This excludes single bad pixels present only in disconnected pixels. Hence, any bad pixels spanning at least 2 pixels in the x/y-plane, or across at least two memory cells are indicated. Colors encode the bad pixel type, or mixed type.
%% Cell type:code id: tags:
``` python
cols = {BadPixels.CI_GAIN_OF_OF_THRESHOLD.value: (BadPixels.CI_GAIN_OF_OF_THRESHOLD.name, '#FF000080'),
BadPixels.CI_EVAL_ERROR.value: (BadPixels.CI_EVAL_ERROR.name, '#0000FF80'),
BadPixels.CI_GAIN_OF_OF_THRESHOLD.value | BadPixels.OFFSET_OUT_OF_THRESHOLD.value: ('MIXED', '#DD00DD80')}
rebin = 2 if not high_res_badpix_3d else 1
gain = 0
for mod, data in bad_pixels.items():
plot_badpix_3d(np.moveaxis(data, 0, 2), cols, title=mod, rebin_fac=rebin, azim=60.)
```
%% Cell type:code id: tags:
``` python
one_photon = 55 # ADU
one_photon = 73 # ADU (10 keV photon)
test_pixels = []
tpix_range1 = [(0,8), (0,8)]
for i in range(*tpix_range1[0]):
for j in range(*tpix_range1[1]):
test_pixels.append((j,i))
test_cells = [4, 38, 64, 128, 200, 249]
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
from mpl_toolkits.axes_grid1 import ImageGrid
for mod, r in zip(modules, res):
dig, ana, cellId = r
d = []
d2 = []
d3 = []
H = [0, 0, 0, 0]
H2 = [0, 0, 0, 0]
Ha = [0, 0, 0, 0]
qm = module_index_to_qm(mod)
cdata = fres[qm]
ex, ey, ea = None, None, None
medml = np.nanmean(cdata['ml'])
medmh = np.nanmean(cdata['mh'][cdata['mh']> 0.5])
offset = offsets[mod]
threshold = thresholds[mod]
medth = np.nanmean(threshold[...,0])
medth = np.nanmean(threshold[..., 0])
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3,1)
color = np.random.rand(3, 1)
x = np.arange(dig.shape[0])
y = dig[:,cell, pix[0], pix[1]]
a = ana[:,cell, pix[0], pix[1]]
y = dig[:, cell, pix[0], pix[1]]
a = ana[:, cell, pix[0], pix[1]]
vidx = (y > 1000) & np.isfinite(y)
x = x[vidx]
y = y[vidx]
a = a[vidx]
ms, labels, centers = calc_m_cluster2(x, y)
bound = None
bound_m = None
markers = ['o','.','x','v']
colors = ['b', 'r', 'g', 'k']
ymin = y.min()
amin = a[labels[2]].min()
for i, lbl in enumerate(labels):
if np.any(lbl):
if i == 0:
cm = (cdata['ml'][cell, pix[0], pix[1]]/medml)
o = offset[pix[0], pix[1], cell, 0]
ym = (y[lbl]-o)/cm
elif i >= 1:
mh = cdata['mh'][cell, pix[0], pix[1]]
ml = cdata['ml'][cell, pix[0], pix[1]]
cml = ml/medml
cmh = mh/medmh
cm = medml/medmh
cml = ml / medml
cmh = mh / medmh
cm = medml / medmh
oh = cdata['bh'][cell, pix[0], pix[1]]
o = offset[pix[0], pix[1], cell, 1] + oh
ym = (y[lbl]-o)/cmh*cm
ym = (y[lbl]-o) / cmh * cm
if i == 1:
ah = cdata['ah'][cell, pix[0], pix[1]]
ch = cdata['ch'][cell, pix[0], pix[1]]
ohh = cdata['oh'][cell, pix[0], pix[1]]
tx = ch * np.log(ah/(y[lbl]-o))+ohh
chook = (ah*np.exp(-(tx-ohh)/ch) - mh*tx)/cmh*cm
ym -= chook
h, ex, ey = np.histogram2d(x[lbl], ym/one_photon, range=((0, 600), (0, 15000/one_photon)), bins=(300, 600))
H[i] += h
labels = [a < threshold[pix[0], pix[1], cell,0], a >= threshold[pix[0], pix[1], cell,0]]
for i, lbl in enumerate(labels):
if np.any(lbl):
if i == 0:
cm = (cdata['ml'][cell, pix[0], pix[1]]/medml)
o = offset[pix[0], pix[1], cell, 0]
ym = (y[lbl]-o)/cm
ym = (y[lbl]-o) / cm
elif i >= 1:
mh = cdata['mh'][cell, pix[0], pix[1]]
ml = cdata['ml'][cell, pix[0], pix[1]]
cml = ml/medml
cmh = mh/medmh
cm = medml/medmh
oh = cdata['bh'][cell, pix[0], pix[1]]
o = offset[pix[0], pix[1], cell, 1] + oh
ym = (y[lbl]-o)/cmh*cm
ym = (y[lbl]-o) / cmh * cm
if i == 1:
ah = cdata['ah'][cell, pix[0], pix[1]]
ch = cdata['ch'][cell, pix[0], pix[1]]
ohh = cdata['oh'][cell, pix[0], pix[1]]
tx = ch * np.log(ah/(y[lbl]-o))+ohh
tx = ch*np.log(ah/(y[lbl]-o)) + ohh
chook = (ah*np.exp(-(tx-ohh)/ch) - mh*tx)/cmh*cm
idx = (a[lbl]-amin) < 0
ym[idx] -= chook[idx]
#ym = a[lbl]-amin
h, ex, ey = np.histogram2d(x[lbl], ym/one_photon, range=((0, 600), (0, 15000/one_photon)), bins=(300, 600))
H2[i] += h
labels = [a < threshold[pix[0], pix[1], cell,0], a >= threshold[pix[0], pix[1], cell,0]]
labels = [a < threshold[pix[0], pix[1], cell, 0], a >= threshold[pix[0], pix[1], cell, 0]]
for i, lbl in enumerate(labels):
if np.any(lbl):
#if i == 0:
# amin = a[lbl].min()
#else:
# amin = a[labels[0]].min() #a[labels[1]].min()# /(threshold[pix[0], pix[1], cell,0]/medth)
am = a[lbl] - amin
h, ex, ea = np.histogram2d(x[lbl], am, range=((0, 600), (-100, 5000)), bins=(300, 400))
Ha[i] += h
fig = plt.figure(figsize=(10,15))
ax = fig.add_subplot(311)
for i in range(3):
H[i][H[i]==0] = np.nan
ax.imshow(H[0].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='summer', alpha=0.7, vmin=0, vmax=1000)
ax.imshow(H[1].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='spring', alpha=0.7, vmin=0, vmax=100)
ax.imshow(H[2].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='winter', alpha=0.7, vmin=0, vmax=1000)
ax.set_ylabel("AGIPD response (ADU)")
ax.set_ylabel("AGIPD response (# photon)")
ax.set_xlabel("PC scan point (#)")
x = np.arange(0, 600)
ideal = medml*x/one_photon
ideal = medml * x / one_photon
ax.plot(x, ideal, color='red')
ax.plot(x, ideal + np.sqrt(ideal), color='red')
ax.plot(x, ideal - np.sqrt(ideal), color='red')
ax.grid(lw=1.5)
ax = fig.add_subplot(312)
for i in range(2):
H2[i][H2[i]==0] = np.nan
ax.imshow(H2[0].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='summer', alpha=0.7, vmin=0, vmax=1000)
ax.imshow(H2[1].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='winter', alpha=0.7, vmin=0, vmax=1000)
ax.set_ylabel("AGIPD response (ADU)")
ax.set_ylabel("AGIPD response (# photon)")
ax.set_xlabel("PC scan point (#)")
x = np.arange(0, 600)
ideal = medml*x/one_photon
ideal = medml * x / one_photon
ax.plot(x, ideal, color='red')
ax.plot(x, ideal + np.sqrt(ideal), color='red')
ax.plot(x, ideal - np.sqrt(ideal), color='red')
ax.grid(lw=1.5)
ax = fig.add_subplot(313)
for i in range(2):
Ha[i][Ha[i]==0] = np.nan
ax.imshow(Ha[0].T, origin="lower", extent=[ex[0], ex[-1], ea[0], ea[-1]],
aspect='auto', cmap='summer', alpha=0.7, vmin=0, vmax=1000)
#ax.imshow(Ha[1].T, origin="lower", extent=[ex[0], ex[-1], ea[0], ea[-1]],
# aspect='auto', cmap='spring', alpha=0.7, vmin=0, vmax=100)
ax.imshow(Ha[1].T, origin="lower", extent=[ex[0], ex[-1], ea[0], ea[-1]],
aspect='auto', cmap='winter', alpha=0.7, vmin=0, vmax=1000)
ax.set_ylabel("AGIPD gain (ADU)")
ax.set_xlabel("PC scan point (#)")
ax.grid(lw=1.5)
```
%% Cell type:code id: tags:
``` python
```
Loading