Skip to content
Snippets Groups Projects
Commit 4a63c004 authored by Ivana Klackova's avatar Ivana Klackova
Browse files

Removed integration time from conditions, acq_rate, mem_cells injection with deviations.

parent 58b182f4
Branches fix/add_JUNGF_webservice_3runs
No related tags found
1 merge request!874[AGIPD] [CS] new current source calibration constant
%% Cell type:markdown id: tags:
# Characterize AGIPD Current Source Data
Author: Detector Group, Version 1.0
The following notebook characterises the AGIPD data taken with the current source (CS) and is used to produce constants to allow for relative gain correction.
The current source allows to scan through all the three gain stages of the AGIPD: high, medium, and low. The amount of injected charge during the scan increases by applying current to the pixel wih increasing integration time, which is proportional to the amount of charge collected by a pixel.
The current source data can be taken with constant step to increase integration time (clk) throughout the whole scan or with several so-called loops, covering the dynamic range, with different integration time steps in each loop.
Due to differences in ASICs, to characterise all pixels, datasets taken with seven different settings of ITEST current would be needed. This is not feasible. Hence, we use looping strategy, in each loop having different steps between integration times and different number of increments within the loop to cover the whole dynamic range of the AGIPD pixels.
The signal is injected in a column-wise fashion, having signal in every 4th column, hence 4 files have to be merged to create a full image. Merging is done in separate notebook: [CS_parallelMerging_NBC.ipynb](./CS_parallelMerging_NBC.ipynb) and saved as .h5 files. This notebook loads the merged files. Raw data is only used to retrieve operation conditions.
First, the algorithm labels the individual gain stages which are then fitted with a linear function, leading to three sets of slopes (m) and intercepts (b). Prior to fitting, selection of the fit range is performed to exclude non-linear regions of the injected signal. Dark data are collected in the same fashion as current source data, i.e. for each integration time step there is one dark signal data point. No averaging is performed. Offset correction is done only for the high gain as for medium and low gain this led to worse fitting results.
The fitted slopes and intercepts are sanitised in the next steps; problematic fits and pixels are marked as bad and median values are used to replace these values.
Constants are saved wit the following notation:
- Slopes:
- mH: high gain slope
- mM: medium gain slope
- mL: low gain slope
- Intercepts:
- bH: high gain intercept
- bM: medium gain intercept
- bL: low gain intercept
- Ratios:
- H-M: ratio of high gain and medium gain slope
- M-L: ratio of medium gain and low gain slope
%% Cell type:code id: tags:
``` python
in_folder = '/gpfs/exfel/exp/SPB/202330/p900340/scratch/CSmergedFiles/19012023/' # path to input data, required
out_folder = "/gpfs/exfel/exp/SPB/202330/p900340/scratch/CS_Processing/test" # path to output to, required
raw_folder = '/gpfs/exfel/exp/SPB/202330/p900340/raw/' # path to raw folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
dark_run = 5 # run containning CS specific darks, required
modules = [7] # modules to work on, required, range allowed
karabo_da = ["all"]
karabo_id_control = "SPB_IRU_AGIPD1M1" # karabo-id for the control device e.g. "MID_EXP_AGIPD1M1", or "SPB_IRU_AGIPD1M1"
karabo_id = "SPB_DET_AGIPD1M-1"
ctrl_source_template = '{}/MDL/FPGA_COMP' # path to control information
instrument_source_template = '{}/DET/{}:xtdf' # path in the HDF5 file to images
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
creation_date_offset = "00:00:00" # add an offset to creation date, e.g. to get different constants
cal_db_interface = "tcp://max-exfl-cal002:8015#8045" # the database interface to use
local_output = True # output constants locally
db_output = False # output constants to database
bias_voltage = -1 # detector bias voltage, use -1 to use value stored in slow data.
mem_cells = -1 # number of memory cells used, use -1 to use value stored in slow data.
acq_rate = -1. # the detector acquisition rate, use -1. to use value stored in slow data.
gain_setting = -1 # the gain setting, use -1 to use value stored in slow data.
integration_time = -1 # integration time, use -1 to use value stored in slow data.
sigma_dev_cut = 5 # parameters outside the range median +- sigma_dev_cut*MAD are replaced with the median
chi2_lim = 7 # limit of chi2 of the fit
relgain_lim = [0.7, 1.3] # limit for the relative gain
steps = [1, 10, 75] # spacing between integration time steps for each loop
increments = [300, 400, 200] # number of steps within a loop
```
%% Cell type:code id: tags:
``` python
import os
import warnings
from datetime import datetime, timedelta
from dateutil import parser
warnings.filterwarnings('ignore')
import h5py
import matplotlib
import numpy as np
from scipy.stats import median_abs_deviation as mad
import matplotlib.pyplot as plt
from matplotlib import gridspec
from matplotlib.colors import LogNorm
import matplotlib.patches as patches
from mpl_toolkits.axes_grid1 import AxesGrid
import XFELDetAna.xfelpyanatools as xana
from extra_data import RunDirectory
import pasha as psh
import multiprocessing
from cal_tools.agipdlib import AgipdCtrl
from cal_tools.ana_tools import get_range
from cal_tools.enums import BadPixels
from cal_tools.step_timing import StepTimer
from cal_tools.tools import (
get_constant_from_db_and_time,
calcat_creation_time,
)
from iCalibrationDB import Conditions, Constants
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
for module in modules:
base = 'agipd_CH{:02d}'.format(module)
for fname in os.listdir(in_folder):
if base in fname:
print(f'Processing file: {fname}')
merged_h5file = fname
```
%% Cell type:code id: tags:
``` python
cells = mem_cells
image_name_temp = 'RAW-R{:04d}-AGIPD{:02d}-S{:05d}.h5'
print("Parameters are:")
if mem_cells == -1:
print("Memory cells: auto-detection on")
else:
print("Memory cells set by user: {}".format(mem_cells))
print("Run: {}".format(dark_run))
print("Modules: {}".format(modules))
instrument = karabo_id.split("_")[0]
if instrument == "HED":
nmods = 8
else:
nmods = 16
print(f"Detector in use is {karabo_id}")
if karabo_da == ["all"]:
if modules[0] == -1:
modules = list(range(nmods))
karabo_da = ["AGIPD{:02d}".format(i) for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
```
%% Cell type:code id: tags:
``` python
# conditions are extracted from 1st channel only
ctrl_src = ctrl_source_template.format(karabo_id_control)
instrument_src = instrument_source_template.format(karabo_id, "0CH0")
agipd_cond = AgipdCtrl(
run_dc = RunDirectory(f'{raw_folder}/r{dark_run:04d}'),
image_src = instrument_src,
ctrl_src = ctrl_src,
raise_error = False, # to be able to process very old data without gain_setting value
)
```
%% Cell type:code id: tags:
``` python
# Evaluate creation time
creation_time = calcat_creation_time(raw_folder, dark_run, creation_time)
offset = parser.parse(creation_date_offset)
delta = timedelta(hours=offset.hour, minutes=offset.minute, seconds=offset.second)
creation_time += delta
print(f"Creation time: {creation_time}")
# Read AGIPD parameter conditions.
if acq_rate == -1.:
acq_rate = agipd_cond.get_acq_rate()
if mem_cells == -1:
mem_cells = agipd_cond.get_num_cells()
# TODO: look for alternative for passing creation_time
if gain_setting == -1:
gain_setting = agipd_cond.get_gain_setting(creation_time)
if bias_voltage == -1:
bias_voltage = agipd_cond.get_bias_voltage(karabo_id_control)
if integration_time == -1:
integration_time = agipd_cond.get_integration_time()
print(f"Bias voltage: {bias_voltage} V")
print(f"Acquisition rate: {acq_rate} MHz")
print(f"Memory cells: {mem_cells}")
print(f"Gain setting: {gain_setting}")
print(f"Integration time: {integration_time}")
print(f"Using {creation_time} as creation time of constant.")
```
%% Cell type:code id: tags:
``` python
# Load analog part of merged data, digital is not used in this notebook
cs_data = []
with h5py.File(in_folder+merged_h5file, 'r') as f:
cs_data.append(np.array(f[f'/{modules[0]}/Analog/data']))
trains = cs_data[0].shape[0]
```
%% Cell type:code id: tags:
``` python
# Create new x-axis to distribute the points equally according to looping settings
a = np.arange(0, increments[0], steps[0])
b = np.arange(a[-1]+1, a[-1]+1+increments[1]*steps[1], steps[1])
c = np.arange(b[-1]+1, b[-1]+1+increments[2]*steps[2], steps[2])
x_eq = np.concatenate((a, b, c))
```
%% Cell type:markdown id: tags:
## Inspection of looping for one pixel
%% Cell type:code id: tags:
``` python
pix=[50, 51]
mc = 300
x = x_eq[:cs_data[0].shape[0]]
y = cs_data[0][:,mc, pix[0], pix[1]]
fig = plt.figure(figsize=(9, 5))
plt.plot(a, y[0:increments[0]], ls="None", marker='.', label='loop 1')
plt.plot(b, y[increments[0]:np.sum(increments[:2])], ls="None", marker='.', label='loop 2')
plt.plot(c[:increments[2]-(np.sum(increments)-y.shape[0])], y[np.sum(increments[:2]):], ls="None",
marker='.', label='loop 3')
plt.grid()
plt.legend(loc='lower right', fontsize=11)
plt.xscale('log')
plt.title('M{}, MC: {}, [{}, {}]'.format(modules[0], mc, pix[0], pix[1]))
plt.xlabel('Integration time (clk)', fontsize=11)
plt.ylabel('Signal (ADU)', fontsize=11)
plt.show()
```
%% Cell type:code id: tags:
``` python
from iminuit import Minuit
from iminuit.util import describe, make_func_code
def fit_data(fun, x, y, par_ests, minos_errs=False):
"""Wrapper to fit data with provided function."""
par_ests["throw_nan"] = False
par_ests["pedantic"] = False
# par_ests["print_level"] = 0
f_sig = describe(fun)[1:]
class _Chi2Functor:
def __init__(self, f, x, y, err):
self.f = f
self.x = x[y != 0]
self.y = y[y != 0]
self.err = err[y != 0]
f_sig = describe(f)
# this is how you fake function
# signature dynamically
self.func_code = make_func_code(
f_sig[1:]) # docking off independent variable
self.func_defaults = None # this keeps numpy.vectorize happy
def __call__(self, *arg):
# notice that it accept variable length
# positional arguments
# chi2 = sum((y-self.f(x,*arg))**2 for x,y in zip(self.x,self.y))
return np.sum(((self.f(self.x, *arg) - self.y) ** 2) / self.err)
yerr = np.copy(y) #errors --> this has to be changed to use noise values instead
wrapped = _Chi2Functor(fun, x, y, yerr)
m = Minuit(wrapped, **par_ests)
m.migrad()
if m.get_fmin().is_valid:
if minos_errs:
m.minos()
print('{}\n'.format(m.get_param_states()))
redchi2 = m.fval / (len(x) - len(m.args))
else:
redchi2 = -1
# m.print_matrix()
fit_res = m.fitarg
fit_res['red_chi2'] = redchi2
return fit_res
def lin_fun(x, m, b):
"""Linear function."""
return m*x+b
```
%% Cell type:markdown id: tags:
## Mark gain stage ##
%% Cell type:code id: tags:
``` python
def moving_average(vals, w):
"""Calculate moving average of provided data."""
return np.convolve(vals, np.ones(w), 'same') / w
def label_gain_stage(x, y):
"""Gain stage marking of CS data.
This is done to separate
different gain stages to be able
to fit data in each gain stage separately.
"""
labels = []
bounds = []
gain_marker = [0, 1, 2]
index = np.arange(y.shape[0])
if np.max(y) < 8e3: # to exclude pixels with corrupted CS injection signal
pass
else:
diff = np.diff(y, prepend=y[0])
mins = []
boundaries = []
# Searching for switching points
try:
mins.append(np.min(diff[(index<400) & (diff < -500.)]))
mins.append(np.min(diff[(index>400) & (diff < -500.)]))
except:
mins.append(np.nan)
diff_idx = np.where(np.isin(diff, mins))[0]
# This is needed as for some pixels both gain switching points appear very soon
if np.all(diff_idx < 400):
mins = []
mins.append(np.min(diff[index < 300]))
mins.append(np.min(diff[index > 300]))
mins = np.array(mins)
if np.any(mins) > -200.:
mins = mins[mins < -200.]
# Define boundaries of gain stages
for m in mins:
boundaries.append(np.where(diff == m)[0][0])
if (boundaries[-1] > 890) | (np.max(y) < 9e3):
pass
elif len(mins) != 2:
pass
else:
hg_b = boundaries[0]
mg_b = boundaries[-1]
labels = np.zeros(y.shape[0])
labels[:hg_b+1] = 0
labels[hg_b+1:mg_b+1] = 1
labels[mg_b+1:] = 2
# Excluding boundary values shifted by 5 from each side
# due to CS instability around gain switching region
boundaries = np.array([np.arange(hg_b-5, hg_b+10),
np.arange(mg_b-5, mg_b+10),
])
labels[boundaries] = -1
maxval_idx = np.where(y[labels==2] == np.max(y[labels==2]))[0][-1]
#######################################################
# to exclude corrupted CS injections
if (len(np.unique(labels)) < 4) | (np.min(y[labels==2]) > 13.5e3):
pass
# This is used to find saturation region of medium and low gain
else:
grad2 = np.gradient(y[labels==2])
mvavg = moving_average(grad2, 5)
grad = np.gradient(y[labels==1])
if len(mvavg) < 100:
msk2 = np.ones_like(mvavg, dtype=bool)
else:
msk2 = 50+np.where(mvavg[50:maxval_idx] == np.max(mvavg[50:maxval_idx]))[0][-1]
msk = np.where(grad==np.max(grad[y[labels==1] >
0.95*np.max(y[labels==1])]))[0][-1]
bounds.append(hg_b-6)
bounds.append(np.where(y[labels==1] == y[labels==1][msk])[0][0])
bounds.append(np.where(y[labels==2] == y[labels==2][msk2])[0][-1])
return labels, gain_marker, bounds
```
%% Cell type:code id: tags:
``` python
run = RunDirectory(raw_folder+"/r{:04d}/".format(dark_run))
instrument_source = karabo_id+'/DET/{}CH0:xtdf'.format(modules[0])
offset = run.get_array(instrument_source, 'image.data')[:trains*mem_cells, 0, :, :]
offset = offset.values.reshape((offset.shape[0] // mem_cells, mem_cells, 1, 512, 128))
offset = np.moveaxis(offset, 4, 3)
offset = np.squeeze(offset).astype(np.float32)
```
%% Cell type:code id: tags:
``` python
# from cal_tools.tools import get_constant_from_db_and_time
# noises = {}
# for k_da, mod in zip(karabo_da, modules):
# noise, when = get_constant_from_db_and_time(karabo_id, k_da,
# Constants.AGIPD.Noise(),
# Conditions.Dark.AGIPD(
# memory_cells=mem_cells,
# bias_voltage=bias_voltage,
# acquisition_rate=acq_rate,
# gain_setting=gain_setting,
# integration_time=integration_time),
# np.zeros((128, 512, mem_cells, 3)),
# cal_db_interface, creation_time=creation_time)
# noises[mod] = np.array(noise.data)
```
%% Cell type:markdown id: tags:
## Inspection of gain stage marking
%% Cell type:code id: tags:
``` python
test_pixels = []
tpix_range1 = [(200,210), (60,73)]
for i in range(*tpix_range1[0]):
for j in range(*tpix_range1[1]):
test_pixels.append((j,i))
test_cells = [1, 35]
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
bins = (np.linspace(-100, 6000, 100),
np.linspace(5000, 13000, 1000),
)
from mpl_toolkits.axes_grid1 import ImageGrid
ana = cs_data[0]
H = [0, 0, 0, 0, 0]
gains = [0, 1, 1, 2, 2]
ex, ey = None, None
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3,1)
x = x_eq[:ana.shape[0]]
y = ana[:,cell, pix[0], pix[1]]
if np.any(y):
labels, gain_marker, bounds = label_gain_stage(x, y)
if np.any(labels):
for i, lbl in enumerate(gain_marker):
ym = y[labels==lbl]
h, ex, ey = np.histogram2d(x[labels==lbl], ym, range=((0,6e3), (5e3, 13e3)), bins=bins)
H[i] += h
fig = plt.figure(figsize=(10,7))
ax = fig.add_subplot(111)
ax.grid(lw=1.5)
for i in range(3):
H[i][H[i]==0] = np.nan
ax.imshow(H[0].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='summer', alpha=0.5, vmin=0, vmax=1000)
ax.imshow(H[1].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='spring', alpha=0.5, vmin=0, vmax=100)
ax.imshow(H[2].T, origin="lower", extent=[ex[0], ex[-1], ey[0], ey[-1]],
aspect='auto', cmap='winter', alpha=0.5, vmin=0, vmax=1000)
ax.set_ylabel("AGIPD response (ADU)", fontsize=12)
ax.set_xlabel("CS scan point (#)", fontsize=12)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.show()
```
%% Cell type:markdown id: tags:
## Example from Pixel Subset
The following is to verify the labeling and fitting procedure for a small sample of pixels from two regions of interest.
Plots visualize:
1. Raw CS signal with marked gain stages with the following color coding:
- blue: high gain
- orane: medium gain
- green: low gain
2. Offset corrected CS signal (high gain only) with cuts to fit only linear part of signal
3. Fits for each gain stage and their relative deviation from linearity
4. Gain correction based on the extracted fit parameters
%% Cell type:code id: tags:
``` python
def evaluate_fitting_ROI(data, tpix_range, test_cells, roi):
"""Evaluate gain stage labeling and fitting of CS data for specified ROI."""
lines1=0
no_entry = 0
fit_errors = {}
fits = {}
reldiff = []
markers = ['o', '.', 'x', 'v']
colors = ['tab:blue', 'tab:orange', 'tab:green']
test_pixels = []
fig1, ax1 = plt.subplots(figsize=(9, 5))
ax1.grid(zorder=0, lw=1.5)
ax1.set_ylabel("CS signal (ADU)", fontsize=11)
ax1.set_xlabel('Integration time (clk)', fontsize=11)
ax1.title.set_text('Raw CS signal')
fig2, ax2 = plt.subplots(figsize=(9, 5))
ax2.grid(zorder=0, lw=1.5)
ax2.set_ylabel("CS signal (ADU) fit cut", fontsize=11)
ax2.set_xlabel('Integration time (clk)', fontsize=11)
ax2.title.set_text('Offset corrected CS signal with fit cuts')
fig3 = plt.figure(figsize=(9, 4))
gs = gridspec.GridSpec(2, 1, height_ratios=[3, 1])
ax3 = plt.subplot(gs[0])
ax4 = plt.subplot(gs[1])
ax3.grid(zorder=0, lw=1.5)
ax3.set_ylabel('CS signal (ADU)', fontsize=11)
ax4.set_ylabel('Relative\ndeviation', fontsize=11)
ax4.set_xlabel('Integration time (clk)', fontsize=11)
ax3.title.set_text('CS signal fits')
fig5 = plt.figure(figsize=(9, 4))
gs = gridspec.GridSpec(2, 1, height_ratios=[3, 1])
ax5 = plt.subplot(gs[0])
ax6 = plt.subplot(gs[1])
ax5.grid(zorder=0, lw=1.5)
ax6.set_xlabel("Pixel counter", fontsize=11)
ax6.set_ylabel("Relative\ndifference", fontsize=11)
ax5.set_ylabel('Fitted slope ratio', fontsize=11)
ax5.title.set_text('Ratios of fitted gains')
ax5.set_ylim(0,60)
ax5.minorticks_on()
fig7, ax7 = plt.subplots(figsize=(9, 5))
ax7.grid(zorder=0, lw=1.5)
ax7.set_xlabel('Integration time (clk)', fontsize=11)
ax7.set_ylabel('CS signal (keV)', fontsize=11)
ax7.title.set_text('Corrected CS signal (keV)')
for i in range(*tpix_range[0]):
for j in range(*tpix_range[1]):
test_pixels.append((j,i))
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
ana = data
# noise = noises[mod]
counter = 0
label_added = False
for cell in test_cells:
for pix in test_pixels:
color = np.random.rand(3, 1)
x = x_eq[:ana.shape[0]]
y = ana[:trains,cell, pix[0], pix[1]]
y_corr = (y - offset[:, cell, pix[0], pix[1]]).astype(np.float32)
labels, gain_marker, bounds = label_gain_stage(x, y)
if np.any(bounds):
for g in gain_marker:
ax1.plot(x[labels==g][::4], y[labels==g][::4],
ls='None', marker=markers[g], color=colors[g], alpha=0.3)
nonzero = np.where(y_corr[labels==g] > 0.)
x_sg = x[labels==g][nonzero][:bounds[g]]
if g == 0:
y_sg = y_corr[labels==g][nonzero][:bounds[g]]
else:
y_sg = y[labels==g][nonzero][:bounds[g]]
ax2.plot(x_sg, y_sg, ls='None', marker=markers[g],
color=colors[g], alpha=0.3)
if y_sg.shape[0] != 0:
parms = {'m': 1, 'b': np.min(y_sg)}
# errors = np.ones(x_sg.shape) * noise[pix[0], pix[1], cell, g]
fitted = fit_data(lin_fun, x_sg, y_sg, parms)
fits[g] = fitted
yf = lin_fun(x_sg, fitted['m'], fitted['b'])
ax3.plot(x_sg, y_sg, color=colors[g], ls='None',alpha=0.3, marker='o')
ax3.plot(x_sg, yf, color='navy', lw=1, zorder=9, alpha=0.2)
ax4.plot(x_sg, ((y_sg-yf) / y_sg), lw=1, color='navy')
hm = fits[0]['m'] / fits[1]['m']
hl = fits[0]['m'] / fits[2]['m']
ml = fits[1]['m'] / fits[2]['m']
M_offset = fits[1]['b']*hm-fits[0]['b']
L_offset = fits[2]['b']*hl-fits[0]['b']
y_mgcorr = y[labels==1] * hm - M_offset
y_lgcorr = y[labels==2] * hl - L_offset
err_hm = hm * np.sqrt((fits[0]['error_m']/fits[0]['m'])**2 + (fits[1]['error_m']/fits[1]['m'])**2)
err_ml = ml * np.sqrt((fits[1]['error_m']/fits[1]['m'])**2 + (fits[2]['error_m']/fits[2]['m'])**2)
if not label_added:
ax5.errorbar(counter, hm, err_hm, label='HG/MG', ls="None", marker='x', color='tab:blue')
ax5.errorbar(counter, ml, err_ml, label='MG/LG', ls="None", marker='x', color='tab:orange')
label_added = True
else:
ax5.errorbar(counter, hm, err_hm, ls="None", marker='x', color='tab:blue')
ax5.errorbar(counter, ml, err_ml, ls="None", marker='x', color='tab:orange')
counter += 1
ax7.scatter(x[labels==0], y_corr[labels==0]/7.3,color='tab:blue')
ax7.scatter(x[labels==1], y_mgcorr/7.3,color='tab:orange')
ax7.scatter(x[labels==2], y_lgcorr/7.3,color='tab:green')
ax7.set_xscale('log')
else:
no_entry += 1
ax6.hlines(np.mean(reldiff), 0, counter, color='k', zorder=10)
leg = ax5.legend()
fig1.show()
fig2.show()
fig3.show()
fig7.show()
print(f"Pixels with signal not being injected in all gain stages: {no_entry//len(test_cells)}")
```
%% Cell type:code id: tags:
``` python
tpix_range1 = [(200,210), (63,73)]
test_cells = [10,300]
roi = 'ROI1'
evaluate_fitting_ROI(cs_data[0], tpix_range1, test_cells, roi)
```
%% Cell type:code id: tags:
``` python
tpix_range2 = [(35,38), (38,44)]
roi2 = 'ROI2'
evaluate_fitting_ROI(cs_data[0], tpix_range2, test_cells, roi2)
```
%% Cell type:markdown id: tags:
## Parallel fitting of the whole module
%% Cell type:code id: tags:
``` python
def parallel_fit(yca, cor, variable):
"""Perform per-pixel parallel fitting for the module."""
gain_marker = [0, 1, 2]
all_cores = multiprocessing.cpu_count()
num_workers = cor.shape[1] if cor.shape[1] < 50 else all_cores - 3
context= psh.context.ProcessContext(num_workers=num_workers)
all_fres = {g: {var: context.alloc(shape=yca.shape[1:],
dtype=np.float32) for var in variable} for g in gain_marker}
failures = context.alloc(shape=yca.shape[1:], dtype=np.int32)
def fit_row_parallel(worker_id, array_index, row):
for cell in range(yca.shape[1]):
y = yca[:, cell, row]
x = x_eq[:y.shape[0]]
y_cor = cor[:, cell, row]
try:
failures[cell, row] = 0
labels, gain_marker, bounds = label_gain_stage(x, y)
if (np.any(bounds)):
for g in gain_marker:
nonzero = np.where(y_cor[labels==g] > 0.)
x_sg = x[labels==g][nonzero][:bounds[g]]
if g == 0:
y_sg = y_cor[labels==g][nonzero][:bounds[g]]
else:
y_sg = y[labels==g][nonzero][:bounds[g]]
if y_sg.shape[0] != 0:
parms = {'m': 1, 'b': np.min(y_sg)}
fitted = fit_data(lin_fun, x_sg, y_sg, parms)
yf = lin_fun(x_sg, fitted['m'], fitted['b'])
all_fres[g]['m'][cell, row] = fitted['m']
all_fres[g]['b'][cell, row] = fitted['b']
all_fres[g]['error_m'][cell, row] = fitted['error_m']
all_fres[g]['red_chi2'][cell, row] = fitted['red_chi2']
all_fres[g]['fit_dev'][cell, row] = np.median(np.abs((y_sg - yf) / y_sg))
all_fres[g]['no_entry'][cell, row] = 0
else:
all_fres[g]['no_entry'][cell, row] = 1
except Exception as e:
failures[cell, row] = 1
context.map(fit_row_parallel, range(yca.shape[-1]))
return all_fres, failures
```
%% Cell type:code id: tags:
``` python
step_timer = StepTimer()
```
%% Cell type:code id: tags:
``` python
def execute_parallel_fit(data, corr_data):
"""Execute prallelel fitting function column-wise."""
gains = [0, 1, 2]
variable = ['m', 'b', 'error_m', 'red_chi2', 'fit_dev', 'no_entry']
fres = {g: {var: np.full(data.shape[1:], np.nan) for var in variable} for g in gains}
failures_fit = np.full(data.shape[1:], np.nan)
step_timer.start()
for col in range(data.shape[2]):
yca = data[..., col, :]
cor = corr_data[..., col, :]
frs = parallel_fit(yca, cor, variable)
for gain in gains:
fres[gain]['m'][:, col, :] = frs[0][gain]['m']
fres[gain]['b'][:, col, :]= frs[0][gain]['b']
fres[gain]['error_m'][:, col, :] = frs[0][gain]['error_m']
fres[gain]['red_chi2'][:, col, :] = frs[0][gain]['red_chi2']
fres[gain]['fit_dev'][:, col, :] = frs[0][gain]['fit_dev']
fres[gain]['no_entry'][:, col, :] = frs[0][gain]['no_entry']
failures_fit[:, col, :] = frs[1]
step_timer.done_step(f"Processed row {col}")
return fres, failures_fit
```
%% Cell type:code id: tags:
``` python
cor = cs_data[0] - offset
```
%% Cell type:code id: tags:
``` python
start = datetime.now()
fres, failures_fit = execute_parallel_fit(cs_data[0], cor)
end = datetime.now() - start
print('Duration of fitting: ', end)
```
%% Cell type:code id: tags:
``` python
import copy
fres_copy = copy.deepcopy(fres)
```
%% Cell type:code id: tags:
``` python
failure_percentage = np.count_nonzero(failures_fit)/352/(128*512)*100
print('Percentage of fitting failure: {:.2f}'.format(failure_percentage))
```
%% Cell type:code id: tags:
``` python
slopes = {}
intercepts = {}
red_chi2 = {}
for i, (gain, key) in enumerate(fres.items()):
slopes[i] = key['m']
intercepts[i] = key['b']
red_chi2[i] = key['red_chi2']
```
%% Cell type:code id: tags:
``` python
def calc_median(roi):
"""Calculate median for a specified block of pixels."""
index = []
for idx in range(0, 128,roi[0]):
for j in range(0,(512//roi[1])):
index.append([[j*roi[1], j*roi[1]+roi[1]], [idx, idx+roi[0]]])
index = np.asarray(index)
fshape = fres[0]['m'].shape
median_m = np.zeros((3,fshape[0], fshape[1], fshape[2]))
median_b = np.zeros((3,fshape[0], fshape[1], fshape[2]))
for g in range(0,3):
means = np.zeros((fshape[0], fshape[1], fshape[2]))
means_b = np.zeros((fshape[0], fshape[1], fshape[2]))
for r_ac in range(index.shape[0]):
idx = index[r_ac]
val = np.nanmedian(slopes[g][:, idx[1][0]:idx[1][1],
idx[0][0]:idx[0][1]], axis=(1,2))
means[:, idx[1][0]:idx[1][1], idx[0][0]:idx[0][1]]=np.repeat(val, roi[0]*roi[1]).reshape(fshape[0],roi[0],roi[1])
val = np.nanmedian(intercepts[g][:, idx[1][0]:idx[1][1],
idx[0][0]:idx[0][1]], axis=(1,2))
means_b[:, idx[1][0]:idx[1][1], idx[0][0]:idx[0][1]]=np.repeat(val, roi[0]*roi[1]).reshape(fshape[0],roi[0],roi[1])
median_m[g, ...] = means
median_b[g, ...] = means_b
return median_m, median_b
```
%% Cell type:code id: tags:
``` python
# First we calculate median with higher granularity to cope with inhomogeneity of CS injection over module
# The median values are used to replace bad pixel values.
roi = [4,8]
median_m, median_b = calc_median(roi)
```
%% Cell type:markdown id: tags:
### Example of high gain median values of cell 1
%% Cell type:code id: tags:
``` python
fig = xana.heatmapPlot(median_m[0,1,...], cmap='jet', add_panels=False, aspect=1,
lut_label='Median slope', cb_loc='bottom', x_label='Rows', y_label='Columns')
fig = xana.heatmapPlot(median_b[0,1,...], cmap='jet', add_panels=False, aspect=1,
lut_label='Median intercept', cb_loc='bottom', x_label='Rows', y_label='Columns')
```
%% Cell type:code id: tags:
``` python
# Defining bad pixels based on fit reliability
bad_pixels = {}
for g in range(0,3):
mask = np.zeros(slopes[g].shape, np.uint32)
error_m_thr = np.nanmean(fres[g]['error_m'])+5*np.nanstd(fres[g]['error_m'])
# mask[(np.abs(fres[g]['error_m']) > error_m_thr) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value
# mask[(np.abs(fres[g]['error_m']) == 0) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value
mask[(~np.isfinite(fres[g]['error_m'])) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value
mask[(fres[g]['red_chi2'] > chi2_lim) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value
mask[(~np.isfinite(fres[g]['red_chi2'])) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value
mask[(~np.isfinite(fres[g]['fit_dev'])) & (mask==0)] |= BadPixels.CI_LINEAR_DEVIATION.value
fit_dev_thr = np.nanmean(fres[g]['fit_dev'][np.isfinite(fres[g]['fit_dev'])])+5*np.nanstd(fres[g]['fit_dev'][np.isfinite(fres[g]['fit_dev'])])
mask[(fres[g]['fit_dev'] > fit_dev_thr) & (mask==0)] |= BadPixels.CI_LINEAR_DEVIATION.value
# mask[((slopes[g] <= 0.1) | (intercepts[g] <= 1.)) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value #this causes most pixels mask in A0
mask[((~np.isfinite(slopes[g])) | (~np.isfinite(intercepts[g]))) & (mask==0)] |= BadPixels.CI_EVAL_ERROR.value
bad_pixels[g] = mask
```
%% Cell type:code id: tags:
``` python
# Merging bad pixels from each gain stage to one bad pixel map
fshape = fres[0]['m'].shape
BPmap = np.zeros(fshape, np.uint32)
for g in range(0,3):
BPmap[(bad_pixels[g] > 0) & (BPmap==0)] = bad_pixels[g][(bad_pixels[g] > 0) & (BPmap==0)]
```
%% Cell type:code id: tags:
``` python
# Defining bad pixels based on performance
tshape = (fshape[0], fshape[1]*fshape[2])
ratio_HM = slopes[0] / slopes [1]
ratio_ML = slopes[1] / slopes [2]
ratio_HM_med = median_m[0] / median_m[1]
ratio_ML_med = median_m[1] / median_m[2]
rms_ratio_HM = mad(ratio_HM.reshape(tshape), axis=1, scale='normal', nan_policy='omit')
rms_ratio_ML = mad(ratio_ML.reshape(tshape), axis=1, scale='normal', nan_policy='omit')
rms_ratio_HM = np.repeat(rms_ratio_HM, fshape[1]*fshape[2]).reshape(fshape)
rms_ratio_ML = np.repeat(rms_ratio_ML, fshape[1]*fshape[2]).reshape(fshape)
thr_mean_HM = [relgain_lim[0]*np.median(ratio_HM_med[np.isfinite(ratio_HM_med) & (ratio_HM_med>0.)]),
relgain_lim[1]*np.median(ratio_HM_med[np.isfinite(ratio_HM_med) & (ratio_HM_med>0.)])]
thr_mean_ML = [relgain_lim[0]*np.median(ratio_ML_med[np.isfinite(ratio_ML_med) & (ratio_ML_med>0.)]),
relgain_lim[1]*np.median(ratio_ML_med[np.isfinite(ratio_ML_med) & (ratio_ML_med>0.)])]
print(f'Thresholds HM ratio: {thr_mean_HM[0]} ; {thr_mean_HM[1]}')
print(f'Thresholds ML ratio: {thr_mean_ML[0]} ; {thr_mean_ML[1]}')
bad_fits = (np.abs((ratio_HM - ratio_HM_med)/rms_ratio_HM) > sigma_dev_cut) |\
(np.abs((ratio_ML - ratio_ML_med)/rms_ratio_ML) > sigma_dev_cut) |\
(~np.isfinite(ratio_HM)) | (~np.isfinite(ratio_ML)) |\
(ratio_HM < thr_mean_HM[0]) | (ratio_HM > thr_mean_HM[1]) |\
(ratio_ML < thr_mean_ML[0]) | (ratio_ML > thr_mean_ML[1])
BPmap[(bad_fits) & (BPmap==0)] |= BadPixels.CI_GAIN_OF_OF_THRESHOLD.value
bp_vals = [BadPixels.CI_GAIN_OF_OF_THRESHOLD.value,
BadPixels.CI_LINEAR_DEVIATION.value,
BadPixels.CI_EVAL_ERROR.value
]
```
%% Cell type:markdown id: tags:
## Bad pixels inspection
It is a known behaviour that cells around cell Id higher than 300 show peculiar CS signal, hence fitting procedure fails, leading to almost 100% of bad pixels. These cells are later on filled with median values calculated over all memory cells.
%% Cell type:code id: tags:
``` python
nonzeros = []
for i in range(BPmap.shape[0]):
nonzeros.append(np.count_nonzero(BPmap[i, ...]))
worst_cell = np.where(nonzeros == np.max(nonzeros))[0][0]
print('Percentage of bad pixels in cell 1: {:.2f}%'.format(np.asarray(nonzeros[1])/(512*128)*100))
print('Percentage of bad pixels: {:.2f}%'.format(np.sum(np.asarray(nonzeros))/(512*128*352)*100))
fig=xana.heatmapPlot(BPmap[1, ...], cmap='magma', add_panels=False, aspect=1,
lut_label='Bad pixels, cell: 1', cb_loc='bottom', vmin=0)
fig=xana.heatmapPlot(BPmap[worst_cell, ...], cmap='magma', add_panels=False, aspect=1,
lut_label=f'Bad pixels (the worst cell), cell: {worst_cell}', cb_loc='bottom', vmin=0)
```
%% Cell type:code id: tags:
``` python
nonzeros = np.array(nonzeros)
BP_rtio = nonzeros/(128*512)*100
```
%% Cell type:code id: tags:
``` python
fig=plt.figure(figsize=(6,4))
plt.plot(BP_rtio, ls='--', marker='o')
plt.ylabel('# Bad pixels %')
plt.xlabel('Memory cell Id')
plt.grid(axis='y', ls='dotted')
# plt.ylim(0,30)
plt.show()
```
%% Cell type:code id: tags:
``` python
print('Number of cells with more than 20% of bad pixels:',np.count_nonzero((nonzeros/(128*512)*100) > 20))
```
%% Cell type:code id: tags:
``` python
total = np.count_nonzero(BPmap)
lbl = ['Gain off', 'Linear deviation', 'Evaluation error']
color = plt.cm.YlOrBr(np.linspace(0.2, 0.6, 3))
cut_flow = [np.count_nonzero(BPmap == 16), # CI_GAIN_OF_OF_THRESHOLD
np.count_nonzero((BPmap == 16) | (BPmap == 32)), #previous+CI_LINEAR_DEVIATION
np.count_nonzero(BPmap)] #previous + CI_EVAL_ERROR
fig, ax = plt.subplots(figsize=(8,4))
ax.yaxis.grid(zorder=0, ls='--')
lines = []
for i, bp_vl in enumerate(bp_vals):
ctr = np.count_nonzero(BPmap == bp_vl)
y = (1-ctr/np.prod(BPmap.shape))*100
y1 = (1-cut_flow[i]/np.prod(BPmap.shape))*100
lines += plt.bar(lbl[i], y, color='tab:blue', label='this cut', zorder=5, )
plt.bar(lbl[i], y1, color='tab:orange', alpha=1, zorder=5)
ax.set_xticklabels(lbl, rotation=45, ha='right', fontsize=12)
plt.ylim(y1-1, 100)
plt.ylabel('Percentage of good pixels (%)', fontsize=12)
labels = ['Single contribution', 'Cut flow']
plt.legend(labels)
plt.show()
```
%% Cell type:code id: tags:
``` python
def plot_cuts(a,sel,name, ax = None):
"""Plot histograms of fitted parameters."""
stats_cut = {}
stats = {}
if ax is None :
fig = plt.figure(figsize=(3,5))
ax = fig.add_subplot(111)
stats_cut["Mean"] = np.nanmean(a[sel])
stats_cut["STD"] = np.nanstd(a[sel])
stats_cut["Median"] = np.nanmedian(a[sel])
stats_cut["Min"] = np.nanmin(a[sel])
stats_cut["Max"] = np.nanmax(a[sel])
msk = (np.isfinite(a)) & (a > 0) & (a<14e3)
stats["Mean"] = np.nanmean(a[msk])
stats["STD"] = np.nanstd(a[msk])
stats["Median"] = np.nanmedian(a[msk])
stats["Min"] = np.nanmin(a[msk])
stats["Max"] = np.nanmax(a[msk])
m = np.nanmean(a[msk])
s = np.nanstd(a[msk])
bins = np.linspace(m-10*s,m+10*s,100)
ax.hist(a.ravel(),
bins = bins,
color='tab:red',
label='all fits',
alpha=0.7
)
ax.hist(a[sel].ravel(),
bins = bins,
color='tab:green',
label='good fits'
)
ax.grid(zorder=0)
ax.set_xlabel(name)
ax.set_yscale('log')
# ax.set_title(name)
ax.legend()
def statistic(stat, colour, shift):
textstr = ""
for key in stat:
try:
textstr += '{0: <6}: {1: 7.2f}\n'.format(key, stat[key])
except:
pass
props = dict(boxstyle='round', alpha=0.5, color=colour,)
ax.text(0.05, 0.95-shift, textstr, transform=ax.transAxes, fontsize=10,
family='monospace', weight='book', stretch='expanded',
verticalalignment='top', bbox=props, zorder=2)
statistic(stats_cut, 'tab:green', 0)
statistic(stats, 'tab:red', 0.3)
```
%% Cell type:code id: tags:
``` python
fig = plt.figure(figsize=(15,15))
plot_cuts(slopes[0],~(BPmap > 0), 'Slopes HG (ADU/DAC)', fig.add_subplot(321))
plot_cuts(slopes[1],~(BPmap > 0), 'Slopes MG (ADU/DAC)', fig.add_subplot(323))
plot_cuts(slopes[2],~(BPmap > 0), 'Slopes LG (ADU/DAC)', fig.add_subplot(325))
plot_cuts(intercepts[0],~(BPmap > 0), 'Intercepts HG (ADU)', fig.add_subplot(322))
plot_cuts(intercepts[1],~(BPmap > 0), 'Intercepts MG (ADU)', fig.add_subplot(324))
plot_cuts(intercepts[2],~(BPmap > 0), 'Intercepts LG (ADU)', fig.add_subplot(326))
fig = plt.figure(figsize=(15,15))
plot_cuts(ratio_HM,~(BPmap > 0), 'Ratio HG/MG', fig.add_subplot(321))
plot_cuts(ratio_ML,~(BPmap > 0), 'Ratio MG/LG', fig.add_subplot(322))
```
%% Cell type:code id: tags:
``` python
# Sanitisation of bad pixels
for g in range(0,3):
slopes[g][BPmap > 0] = median_m[g, ...][BPmap > 0]
intercepts[g][BPmap > 0] = median_b[g, ...][BPmap > 0]
ratio_HM = slopes[0] / slopes [1]
ratio_ML = slopes[1] / slopes [2]
ratio_HM[np.isnan(ratio_HM) | np.isinf(ratio_HM)] = 0.
ratio_ML[np.isnan(ratio_ML) | np.isinf(ratio_ML)] = 0.
```
%% Cell type:markdown id: tags:
## Overview plots after sanitisation
%% Cell type:markdown id: tags:
### Historgrams of gain ratios (good fits only)
%% Cell type:code id: tags:
``` python
ratios = {'H-M': ratio_HM,
'M-L': ratio_ML}
thr = {'H-M': thr_mean_HM,
'M-L': thr_mean_ML}
def statistic(stat, colour, shift, ax):
"""Plot statistical parameters."""
textstr = ""
for key in stat:
try:
textstr += '{0: <6}: {1: 7.2f}\n'.format(key, stat[key])
except:
pass
props = dict(boxstyle='round', alpha=0.5, color=colour,)
ax.text(0.05, 0.95-shift, textstr, transform=ax.transAxes, fontsize=10,
family='monospace', weight='book', stretch='expanded',
verticalalignment='top', bbox=props, zorder=2)
fig, ax = plt.subplots(1,2, figsize=(10,5))
stats = {}
for i, key in enumerate(ratios.keys()):
stats["Mean"] = np.nanmean(ratios[key])
stats["STD"] = np.nanstd(ratios[key])
stats["Median"] = np.nanmedian(ratios[key])
ax[i].hist(ratios[key].flatten(), bins=100, range=thr[key], color='tab:green',zorder=5)
ax[i].set_xlabel(f'{key} ratio')
ax[i].set_ylabel('Counts')
ax[i].grid()
statistic(stats, 'tab:green', 0, ax[i])
plt.show()
```
%% Cell type:markdown id: tags:
### Preview maps of fits and their quality
%% Cell type:code id: tags:
``` python
def preview_fitted_params(data, gain, cell_to_preview):
"""Plot maps of fitted parameters."""
g_label = ['High gain', 'Medium gain', 'Low gain']
plot_text = {"m": 'Fitted slope (m)',
"b": 'Fitted intercept (b)',
"error_m": 'Slope error',
"red_chi2": 'Reduced Chi2',
"fit_dev": 'Relative difference fit vs. data',
"no_entry": 'No entry for fitting'
}
fig = plt.figure(figsize=(20,7))
plt.suptitle(f'{g_label[g]}', fontsize=16)
grid = AxesGrid(fig, 111,
nrows_ncols=(3, 2),
axes_pad=(0.9, 0.55),
label_mode="0",
share_all=True,
cbar_location="right",
cbar_mode="each",
cbar_size="7%",
cbar_pad="2%"
)
i = 0
for key, citem in data.items():
item = citem.copy()
item[~np.isfinite(item)] = 0
med = np.nanmedian(item)
bound = 0.1
maxcnt = 10
if med < 0:
bound = -bound
while(np.count_nonzero((item < med-bound*med) | (item > med+bound*med))/item.size > 0.01):
bound *= 2
maxcnt -= 1
if maxcnt < 0:
break
vmin = med-bound*med
vmax = med+bound*med
if "no_entry" in key:
vmin = -1
vmax = 1
d = item[cell_to_preview, ...]
im = grid[i].imshow(d, interpolation="nearest", origin='lower', cmap='rainbow',
vmin=vmin, vmax=vmax)
grid[i].set_title(plot_text[key], fontsize=14)
cb = grid.cbar_axes[i].colorbar(im)
# axes coordinates are 0,0 is bottom left and 1,1 is upper right
x0, x1 = tpix_range1[0][0], tpix_range1[0][1]
y0, y1 = tpix_range1[1][0], tpix_range1[1][1]
p = patches.Rectangle((x0, y0), (x1 - x0), (y1 - y0), fill=False, color="red")
grid[i].add_patch(p)
x0, x1 = tpix_range2[0][0], tpix_range2[0][1]
y0, y1 = tpix_range2[1][0], tpix_range2[1][1]
p = patches.Rectangle((x0, y0), (x1 - x0), (y1 - y0), fill=False, color="white")
grid[i].add_patch(p)
i += 1
```
%% Cell type:code id: tags:
``` python
print('Results from cell: 1')
for g, (gain, data) in enumerate(fres.items()):
preview_fitted_params(data, g, 1)
```
%% Cell type:code id: tags:
``` python
print('Results from cell: 310')
for g, (gain, data) in enumerate(fres.items()):
preview_fitted_params(data, g, 310)
```
%% Cell type:code id: tags:
``` python
# Here we recalculate median values with lower granularity over already sanitized data.
# This is to minise effect of the horizontal streaks over the rows. New medians are then applied to bad pixels.
roi = [32,64]
median_m, median_b = calc_median(roi)
for g in range(0,3):
slopes[g][BPmap > 0] = median_m[g, ...][BPmap > 0]
intercepts[g][BPmap > 0] = median_b[g, ...][BPmap > 0]
ratio_HM = slopes[0] / slopes [1]
ratio_ML = slopes[1] / slopes [2]
ratio_HM[np.isnan(ratio_HM) | np.isinf(ratio_HM)] = 0.
ratio_ML[np.isnan(ratio_ML) | np.isinf(ratio_ML)] = 0.
```
%% Cell type:code id: tags:
``` python
HGMG_mean = np.nanmedian(ratio_HM, axis=0)
MGLG_mean = np.nanmedian(ratio_ML, axis=0)
slopes_median = {}
intercepts_median = {}
for g in range(0,3):
slopes_median[g] = np.nanmedian(slopes[g], axis=0)
intercepts_median[g] = np.nanmedian(intercepts[g], axis=0)
```
%% Cell type:code id: tags:
``` python
# Sanitisation of cells having more than 20% of bad pixels.
for cell in range(0,352):
if BP_rtio[cell] > 20:
ratio_HM[cell, ...] = HGMG_mean
ratio_ML[cell, ...] = MGLG_mean
for g in range(0,3):
for cell in range(0,352):
if BP_rtio[cell] > 20:
slopes[g][cell, ...] = slopes_median[g]
intercepts[g ][cell, ...] = intercepts_median[g]
```
%% Cell type:code id: tags:
``` python
fres = copy.deepcopy(fres_copy) # this is needed to have raw fits without sanitization
```
%% Cell type:markdown id: tags:
### Preview maps of ratios
%% Cell type:code id: tags:
``` python
cells = [1, 310]
for i, cell in enumerate(cells):
vmin, vmax = get_range(ratio_HM, 8)
fig = xana.heatmapPlot(ratio_HM[cell, ...], cmap='jet', add_panels=False, aspect=1,
lut_label=f'H-M ratio, Cell: {cell}',
cb_loc='bottom', vmin=vmin, vmax=vmax)
vmin, vmax = get_range(ratio_ML, 8)
fig = xana.heatmapPlot(ratio_ML[cell, ...], cmap='jet', add_panels=False, aspect=1,
lut_label=f'M-L ratio, Cell: {cell}',
cb_loc='bottom', vmin=vmin, vmax=vmax)
```
%% Cell type:markdown id: tags:
### Median values of gain ratios
%% Cell type:code id: tags:
``` python
vmin, vmax = get_range(HGMG_mean, 7)
fig = xana.heatmapPlot(HGMG_mean, cmap='jet', add_panels=False, aspect=1,
cb_loc='bottom', vmin=vmin, vmax=vmax, lut_label='H-M ratio',
x_label='Columns', y_label='Rows')
vmin, vmax = get_range(MGLG_mean, 7)
fig = xana.heatmapPlot(MGLG_mean, cmap='jet', add_panels=False, aspect=1,
cb_loc='bottom', vmin=vmin, vmax=vmax, lut_label='M-L ratio',
x_label='Columns', y_label='Rows')
```
%% Cell type:markdown id: tags:
### Standard deviation of gain ratios
%% Cell type:code id: tags:
``` python
std_ratios = {'H-M': np.nanstd(ratio_HM, axis=0),
'M-L': np.nanstd(ratio_ML, axis=0)}
for key in std_ratios:
vmin, vmax = get_range(std_ratios[key], 10)
fig = xana.heatmapPlot(std_ratios[key], cmap='jet', add_panels=False, aspect=1,
cb_loc='bottom', vmin=vmin, vmax=vmax,
lut_label=f'Standard deviation of {key} ratio',
x_label='Columns', y_label='Rows')
```
%% Cell type:markdown id: tags:
## Inspecting cell dependent behaviour
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(3, 2, figsize=(16, 8))
fig.subplots_adjust(wspace=0.1, hspace=0.5)
lbl = ['HG', 'MG', 'LG']
for i in range(3):
hrange = [0.1 * np.median(slopes[i]), 2.5 * np.median(slopes[i])]
r_hist = np.zeros((slopes[i].shape[0], 100))
for c in range(slopes[i].shape[0]):
d = slopes[i][c, ...]
h, e = np.histogram(d.flatten(), bins=100, range=hrange)
r_hist[c, :] = h
im = ax[i, 0].imshow(r_hist[:, :].T[::-1, :], interpolation="nearest",
aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(r_hist)),
extent=[0, slopes[i].shape[0], hrange[0], hrange[1]])
ax[i, 0].set_xlabel("Memory cell", fontsize=12)
ax[i, 0].set_ylabel('Slope {}'.format(lbl[i]), fontsize=12)
cb = fig.colorbar(im, ax=ax[i, 0], pad=0.01)
hrange = [0.5 * np.median(intercepts[i]), 1.8 * np.median(intercepts[i])]
r_hist = np.zeros((intercepts[i].shape[0], 100))
for c in range(intercepts[i].shape[0]):
d = intercepts[i][c, ...]
h, e = np.histogram(d.flatten(), bins=100, range=hrange)
r_hist[c, :] = h
im = ax[i, 1].imshow(r_hist[:, :].T[::-1, :], interpolation="nearest",
aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(r_hist)),
extent=[0, intercepts[i].shape[0], hrange[0], hrange[1]])
ax[i, 1].set_xlabel("Memory cell", fontsize=12)
ax[i, 1].set_ylabel('Intercept {}'.format(lbl[i]), fontsize=12)
cb = fig.colorbar(im, ax=ax[i, 1], pad=0.01, label='Frequency')
```
%% Cell type:code id: tags:
``` python
ratios = [ratio_HM, ratio_ML]
fig, ax = plt.subplots(1, 2, figsize=(16, 4))
lbl = ['High-Medium', 'Medium-Low']
for i, ratio in enumerate(ratios):
hrange = [0.6 * np.median(ratio), 1.4 * np.median(ratio)]
r_hist = np.zeros((ratio.shape[0], 100))
for c in range(ratio.shape[0]):
d = ratio[c, ...]
h, e = np.histogram(d.flatten(), bins=100, range=hrange)
r_hist[c, :] = h
im = ax[i].imshow(r_hist[:, :].T[::-1, :], interpolation="nearest",
aspect="auto", norm=LogNorm(vmin=1, vmax=np.max(r_hist)),
extent=[0, ratio_HM.shape[0], hrange[0], hrange[1]])
ax[i].set_xlabel("Memory cell", fontsize=12)
ax[i].set_ylabel('Ratio {}'.format(lbl[i]), fontsize=12)
cb = fig.colorbar(im, ax=ax[i], pad=0.01, label='Frequency')
```
%% Cell type:code id: tags:
``` python
avg_m = {}
for g in range(0,3):
avg_m[g] = np.nanmedian(slopes[g])
```
%% Cell type:markdown id: tags:
## Validation plot
%% Cell type:code id: tags:
``` python
one_photon = 73 #assuming 10keV ph
tpix_range2 = [(35,40), (38,44)]
test_pixels = []
for i in range(*tpix_range2[0]):
for j in range(*tpix_range2[1]):
test_pixels.append((j,i))
test_cells = [1, 35]
tcell = np.array(test_cells)
tcell = tcell[tcell < mem_cells]
if tcell.size == 0:
test_cells = [mem_cells-1]
else:
test_cells = tcell.tolist()
bins = (np.linspace(-100, 6000, 100),
np.linspace(5000, 13000, 1000),
)
markers = ['o', '.', 'x', 'v']
colors = ['tab:blue', 'tab:orange', 'tab:green']
fig1, ax1 = plt.subplots(figsize=(9, 5))
ax1.grid(zorder=0, lw=1.5)
ax1.set_ylabel("CS signal (ADU)", fontsize=11)
ax1.set_xlabel('Integration time (clk)', fontsize=11)
ax1.title.set_text('Raw CS signal')
fig2, ax2 = plt.subplots(figsize=(9, 5))
ax2.grid(zorder=0, lw=1.5)
ax2.set_ylabel("AGIPD response (#photon)", fontsize=11)
ax2.set_xlabel('Integration time (clk)', fontsize=11)
ax2.title.set_text('Corrected CS signal response')
ax2.set_xscale('log')
ana = cs_data[0]
H = [0, 0, 0, 0, 0]
gains = [0, 1, 1, 2, 2]
ex, ey = None, None
for pix in test_pixels:
for cell in test_cells:
color = np.random.rand(3, 1)
x = x_eq[:ana.shape[0]]
y = ana[:trains,cell, pix[0], pix[1]]
y_corr = (y - offset[:, cell, pix[0], pix[1]]).astype(np.float32)
labels, gain_marker, bounds = label_gain_stage(x, y)
if np.any(bounds):
for g in gain_marker:
ax1.plot(x[labels==g], y[labels==g],
ls='None', marker=markers[g], color=colors[g], alpha=0.3)
x_sg = x[labels==g][:bounds[g]]
if g == 0:
cm = (slopes[0][cell, pix[0], pix[1]]/avg_m[0])
y_sg = y_corr[labels==g][:bounds[g]]/cm
else:
y_sg = y[labels==g][:bounds[g]]
hg = slopes[0][cell, pix[0], pix[1]]
og = slopes[g][cell, pix[0], pix[1]]
chg = hg / avg_m[0]
cog = og / avg_m[g]
cm = avg_m[0] / avg_m[g]
A_offset = intercepts[g][cell, pix[0], pix[1]]-intercepts[0][cell, pix[0], pix[1]]
y_corr = (y[labels==g] - A_offset) / cog * cm
y_sg = y_corr[:bounds[g]]
ax2.plot(x_sg[::5], (y_sg/one_photon)[::5], ls='None', marker=markers[g],
color=colors[g], alpha=0.3)
x = x_eq[:cs_data[0].shape[0]]
ideal = avg_m[1] * x / one_photon
fig1.show()
fig2.show()
```
%% Cell type:code id: tags:
``` python
# Saving data locally
if local_output:
for module in modules:
constants_file = '{}/CSconst_{}_M{}.h5'.format(out_folder, karabo_id, module)
with h5py.File(constants_file, 'w') as f:
for gain in fres:
for key, item in fres[gain].items():
f[f'/GainFits/{gain}/{key}/data'] = item
f['/BadPixels/data'] = BPmap
for g in range(0,3):
f[f'/SanitizedConsts/{g}/m/data'] = slopes[g]
f[f'/SanitizedConsts/{g}/b/data'] = intercepts[g]
f['/SanitizedConsts/Ratios/H-M/data'] = ratio_HM
f['/SanitizedConsts/Ratios/M-L/data'] = ratio_ML
```
......
%% Cell type:markdown id: tags:
# Current Source Characterisation Summary
This notebook is used as a dependency notebook for current source characterisation to provide summary for all modules of the AGIPD.
%% Cell type:code id: tags:
``` python
in_folder = "" # in this notebook, in_folder is not used as the data source is in the destination folder
out_folder = "/gpfs/exfel/exp/SPB/202330/p900340/scratch/CS_Processing/test/230914/" # the folder to output to, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
proc_folder = "" # Path to corrected image data used to create histograms and validation plots
raw_folder = '/gpfs/exfel/exp/SPB/202330/p900340/raw/' # folder of raw data. This is used to save information of source data of generated constants, required
dark_run = 5
karabo_id_control = "SPB_IRU_AGIPD1M1" # karabo-id for the control device e.g. "MID_EXP_AGIPD1M1", or "SPB_IRU_AGIPD1M1"
karabo_id = "SPB_DET_AGIPD1M-1"
ctrl_source_template = '{}/MDL/FPGA_COMP' # path to control information
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
creation_date_offset = "00:00:00" # add an offset to creation date, e.g. to get different constants
cal_db_interface = "tcp://max-exfl-cal002:8015#8045"
db_output = False
# Detector conditions
bias_voltage = -1 # detector bias voltage, use -1 to use value stored in slow data.
mem_cells = -1 # number of memory cells used, use -1 to use value stored in slow data.
acq_rate = -1. # the detector acquisition rate, use -1. to use value stored in slow data.
gain_setting = -1 # the gain setting, use -1 to use value stored in slow data.
integration_time = -1 # integration time, use -1 to use value stored in slow data.
```
%% Cell type:code id: tags:
``` python
import warnings
import h5py
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import matplotlib.cm as cm
import numpy as np
import tabulate
from datetime import timedelta
from dateutil import parser
import os
from cal_tools.agipdlib import AgipdCtrl
from cal_tools.ana_tools import get_range
from cal_tools.enums import BadPixels
from cal_tools.tools import (
module_index_to_qm,
calcat_creation_time,
get_report,
get_pdu_from_db,
send_to_db
)
from iCalibrationDB import Conditions, Constants
from extra_data import RunDirectory
from extra_geom import AGIPD_1MGeometry, AGIPD_500K2GGeometry
from IPython.display import Latex, display
from XFELDetAna.plotting.simpleplot import simplePlot
%matplotlib inline
warnings.filterwarnings('ignore')
```
%% Cell type:code id: tags:
``` python
# Get operation conditions
ctrl_source = ctrl_source_template.format(karabo_id_control)
run_folder = f'{raw_folder}/r{dark_run:04d}/'
raw_dc = RunDirectory(run_folder)
# Read operating conditions from AGIPD00 files
instrument_src_mod = [
s for s in list(raw_dc.all_sources) if "0CH" in s][0]
ctrl_src = [
s for s in list(raw_dc.all_sources) if ctrl_source in s][0]
```
%% Cell type:code id: tags:
``` python
# Evaluate creation time
creation_time = calcat_creation_time(raw_folder, dark_run, creation_time)
offset = parser.parse(creation_date_offset)
delta = timedelta(hours=offset.hour, minutes=offset.minute, seconds=offset.second)
creation_time += delta
print(f"Creation time: {creation_time}")
agipd_cond = AgipdCtrl(
run_dc=raw_dc,
image_src=instrument_src_mod,
ctrl_src=ctrl_src,
raise_error=False, # to be able to process very old data without mosetting value
)
if mem_cells == -1:
mem_cells = agipd_cond.get_num_cells()
if mem_cells is None:
raise ValueError(f"No raw images found in {run_folder}")
if acq_rate == -1.:
acq_rate = agipd_cond.get_acq_rate()
if gain_setting == -1:
gain_setting = agipd_cond.get_gain_setting(creation_time)
if bias_voltage == -1:
bias_voltage = agipd_cond.get_bias_voltage(karabo_id_control)
if integration_time -1:
integration_time = agipd_cond.get_integration_time()
# Evaluate detector instance for mapping
instrument = karabo_id.split("_")[0]
nmods = 8 if instrument == 'HED' else 16
print(f"Using {creation_time} as creation time")
print(f"Operating conditions are:\n• Bias voltage: {bias_voltage}\n• Memory cells: {mem_cells}\n"
f"• Acquisition rate: {acq_rate}\n• Gain setting: {gain_setting}\n• Integration time: {integration_time}\n"
f"• Acquisition rate: {acq_rate}\n• Gain setting: {gain_setting}"
)
```
%% Cell type:code id: tags:
``` python
# Sanitised constants keys:
# mH: high gain slope
# mM: medium gain slope
# mL: low gain slope
#
# bH: high gain intercept
# bM: medium gain intercept
# bL: low gain intercept
#
# H-M: ratio of high gain and medium gain slope
# M-L: ratio of medium gain and low gain slope
# Consts saved to DB as (8,mem_cells,128,512)
# order: ['mH', 'mM', 'mL', 'bH', 'bM', 'bL']
keys_fit = ['m', 'b']
gs = ['H', 'M', 'L']
keys_ratio = ['H-M', 'M-L']
labels = {'m': 'Slope (m)',
'b': 'Intercept (b)',
'H-M': "High-to-Medium Ratio",
'M-L': 'Medium-to-Low Ratio',
'mask': 'Fraction of bad pixels over cells' }
fit_data = {}
ratios = {}
BPmap = {}
sanitised_const = {}
modules = []
karabo_da = []
for mod in range(nmods):
qm = module_index_to_qm(mod)
ratios[mod] = {}
fit_data[mod] = {}
sanitised_const[mod] = {}
constants_file = f'{out_folder}/CSconst_{karabo_id}_M{mod}.h5'
if os.path.exists(constants_file):
print(f'Data available for module {qm}')
with h5py.File(constants_file, 'r') as hf:
BPmap[mod] = hf['/BadPixels/data'][()].swapaxes(1,2)
for key in keys_fit:
fit_data[mod][key] = {}
for g in range(0,3):
# loop 1st through keys, then through gains
# to have right order of constants for DB injection
fit_data[mod][key][g] = hf[f'/SanitizedConsts/{g}/{key}/data'][()].swapaxes(1,2)
sanitised_const[mod][f'{key}{gs[g]}'] = hf[f'/SanitizedConsts/{g}/{key}/data'][()]
for key in keys_ratio:
ratios[mod][key]= hf[f'/SanitizedConsts/Ratios/{key}/data'][()].swapaxes(1,2)
sanitised_const[mod][key] = hf[f'/SanitizedConsts/Ratios/{key}/data'][()]
modules.append(mod)
karabo_da.append(f"AGIPD{mod:02d}")
else:
print(f"No fit data available for module {qm}")
# Change the order of dict keys to maintain compatibility for the rest of code
fit_data = {mod: {g: {f: fit_data[mod][f][g] for f in keys_fit
}
for g in range(0,3)
}
for mod in modules
}
```
%% Cell type:code id: tags:
``` python
def slope_dict_to_arr(d):
"""Convert dictionary to numpy array."""
arr = np.zeros((8,mem_cells,128,512), np.float32)
for i, key in enumerate(d):
arr[i,...] = d[key]
return arr
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, out_folder.strip('/').split('/')))[-2]
file_loc = f'Proposal: {proposal}, Run: {dark_run}'
report = get_report(metadata_folder)
```
%% Cell type:markdown id: tags:
## Injection to DB
%% Cell type:code id: tags:
``` python
if not db_output:
print('Injection to DB not requested.')
```
%% Cell type:code id: tags:
``` python
md = None
# set the operating condition
condition = Conditions.Dark.AGIPD(memory_cells=mem_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
integration_time=integration_time)
gain_setting=gain_setting
)
db_modules = get_pdu_from_db(karabo_id, karabo_da, Constants.AGIPD.SlopesCS(),
condition, cal_db_interface,
snapshot_at=creation_time)
cond_dev = {"Memory cells": [352, 352],
"Acquisition rate": [4.5, 4.5]
}
if db_output:
for parm in condition.parameters:
if parm.name in cond_dev:
parm.lower_deviation = cond_dev[parm.name][0]
parm.upper_deviation = cond_dev[parm.name][1]
print(f'Condition for {parm.name} deviation updated to {cond_dev[parm.name]}.')
for mod, pdu in zip(modules, db_modules):
for const in ["SlopesCS", "BadPixelsCS"]:
dbconst = getattr(Constants.AGIPD, const)()
if const == "SlopesCS":
dbconst.data = slope_dict_to_arr(sanitised_const[mod])
else:
dbconst.data = BPmap[mod]
md = send_to_db(pdu, karabo_id, dbconst, condition,
file_loc, report, cal_db_interface,
creation_time=creation_time)
print("Constants parameter conditions are:\n")
print(f"• memory_cells: {mem_cells}\n• bias_voltage: {bias_voltage}\n"
f"• acquisition_rate: {acq_rate}\n• gain_setting: {gain_setting}\n"
f"• integration_time: {integration_time}\n"
f"• creation_time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n")
print(f"• memory_cells: {np.diff(cond_dev['Memory cells'])[0]}-{cond_dev['Memory cells'][0]}\n"
f"• bias_voltage: {bias_voltage}\n"
f"• acquisition_rate: {np.diff(cond_dev['Acquisition rate'])[0]}-{cond_dev['Acquisition rate'][0]}\n"
f"• gain_setting: {gain_setting}\n"
f"• creation_time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n")
```
%% Cell type:code id: tags:
``` python
#Define AGIPD geometry
if instrument == "HED":
geom = AGIPD_500K2GGeometry.from_origin()
else:
geom = AGIPD_1MGeometry.from_quad_positions(quad_pos=[
(-525, 625),
(-550, -10),
(520, -160),
(542.5, 475),
])
```
%% Cell type:code id: tags:
``` python
# Create the arrays that will be used for figures.
# Each array correponds to the data for all processed modules.
pixel_range = [0,0,512,128]
# const_data contains ratios of slopes and BP
const_data = {}
for key in keys_ratio:
const_data[key] = np.full((nmods, mem_cells, 512, 128), np.nan)
for mod in modules:
if key in ratios[mod]:
const_data[key][mod,:,pixel_range[0]:pixel_range[2],
pixel_range[1]:pixel_range[3]] = ratios[mod][key]
const_data['mask'] = np.full((nmods, mem_cells, 512, 128), np.nan)
for mod in modules:
const_data['mask'][mod,:,pixel_range[0]:pixel_range[2],
pixel_range[1]:pixel_range[3]] = BPmap[mod]
# fit_const_data contains fitted slopes and intercepts for each gain stage
fit_const_data = {}
for g in range(0,3):
fit_const_data[g] = {}
for key in keys_fit:
fit_const_data[g][key] = np.full((nmods, mem_cells, 512, 128), np.nan)
for mod in modules:
if key in fit_data[mod][g].keys():
fit_const_data[g][key][mod,:,pixel_range[0]:pixel_range[2],
pixel_range[1]:pixel_range[3]] = fit_data[mod][g][key]
```
%% Cell type:markdown id: tags:
## Summary across pixels ##
%% Cell type:code id: tags:
``` python
g_label = ['High gain', 'Medium gain', 'Low gain']
for g in range(0,3):
gs = gridspec.GridSpec(1, 2)
fig = plt.figure(figsize=(15, 7))
plt.suptitle(f'{g_label[g]}', fontsize=16)
for i, key in enumerate(fit_const_data[g].keys()):
data = np.nanmean(fit_const_data[g][key], axis=1)
vmin, vmax = get_range(data, 5)
ax = fig.add_subplot(gs[0, i])
geom.plot_data_fast(data,
vmin=vmin, vmax=vmax, ax=ax, cmap="jet", figsize=(13,7),
colorbar={'shrink': 1,
'pad': 0.04,
'fraction': 0.1
})
colorbar = ax.images[0].colorbar
ax.set_title(labels[key], fontsize=14)
ax.set_xlabel('Columns', fontsize=12)
ax.set_ylabel('Rows', fontsize=12)
plt.show()
```
%% Cell type:code id: tags:
``` python
gs = gridspec.GridSpec(2, 2)
fig = plt.figure(figsize=(15, 15), tight_layout=True)
for i, key in enumerate(const_data.keys()):
if key=='mask':
ax = fig.add_subplot(gs[0, :])
data = np.nanmean(const_data[key]>0, axis=1)
vmin, vmax = (0,1)
else:
ax = fig.add_subplot(gs[1, i])
data = np.nanmean(const_data[key], axis=1)
vmin, vmax = get_range(data, 5)
geom.plot_data_fast(data,
vmin=vmin, vmax=vmax, ax=ax, cmap="jet", figsize=(13,7),
colorbar={'shrink': 1,
'pad': 0.04,
'fraction': 0.1
})
colorbar = ax.images[0].colorbar
ax.set_title(labels[key], fontsize=14)
ax.set_xlabel('Columns', fontsize=12)
ax.set_ylabel('Rows', fontsize=12)
plt.show()
```
%% Cell type:markdown id: tags:
## Summary across cells ##
Good pixels only.
%% Cell type:code id: tags:
``` python
for key in const_data.keys():
data = np.copy(const_data[key])
if key=='mask':
data = data>0
else:
data[const_data['mask']>0] = np.nan
d = []
for i in range(nmods):
d.append({'x': np.arange(data[i].shape[0]),
'y': np.nanmean(data[i], axis=(1,2)),
'drawstyle': 'steps-pre',
'label': f'{i}',
'linewidth': 2,
'linestyle': '--' if i>7 else '-'
})
fig = plt.figure(figsize=(15, 6))
ax = fig.add_subplot(111)
_ = simplePlot(d, xrange=(-12, 510),
x_label='Memory Cell ID',
y_label=labels[key],
use_axis=ax,
legend='top-left-frame-ncol8',)
ylim = ax.get_ylim()
ax.set_ylim(ylim[0], ylim[1] + np.abs(ylim[1]-ylim[0])*0.2)
ax.grid()
```
%% Cell type:code id: tags:
``` python
table = []
for mod in modules:
table.append((mod,
f"{np.nanmean(ratios[mod]['H-M']):0.1f} +- {np.nanstd(ratios[mod]['H-M']):0.2f}",
f"{np.nanmean(ratios[mod]['M-L']):0.1f} +- {np.nanstd(ratios[mod]['M-L']):0.2f}",
f"{np.nanmean(BPmap[mod]>0)*100:0.1f} ({np.nansum(BPmap[mod]>0)})"
))
all_HM = []
all_ML = []
for mod in modules:
all_HM.extend(ratios[mod]['H-M'])
all_ML.extend(ratios[mod]['M-L'])
all_HM = np.array(all_HM)
all_ML = np.array(all_ML)
all_MSK = np.array([list(msk) for msk in BPmap.values()])
table.append(('overall',
f"{np.nanmean(all_HM):0.1f} +- {np.nanstd(all_HM):0.2f}",
f"{np.nanmean(all_ML):0.1f} +- {np.nanstd(all_ML):0.2f}",
f"{np.nanmean(all_MSK>0)*100:0.1f} ({np.nansum(all_MSK>0)})"
))
md = display(Latex(tabulate.tabulate(table, tablefmt='latex',
headers=["Module", "Ratio High-Medium", "Ratio Medium-Low","Bad pixels [%(Count)]"])))
```
%% Cell type:markdown id: tags:
## Summary gain ratios (good pixels only) + histograms
%% Cell type:code id: tags:
``` python
gs = gridspec.GridSpec(2, 2)
fig = plt.figure(figsize=(15, 15))
colors = cm.rainbow(np.linspace(0, 1, nmods))
for i, key in enumerate(const_data.keys()):
if key != 'mask':
const_data[key][const_data['mask']>0] = np.nan
data = np.nanmean(const_data[key], axis=1)
vmin, vmax = get_range(data, 5)
ax = fig.add_subplot(gs[0, i])
geom.plot_data_fast(data,
vmin=vmin, vmax=vmax, ax=ax, cmap="jet", figsize=(13,7),
colorbar={'shrink': 1,
'pad': 0.04,
'fraction': 0.1
})
colorbar = ax.images[0].colorbar
ax.set_title(labels[key], fontsize=14)
ax.set_xlabel('Columns', fontsize=12)
ax.set_ylabel('Rows', fontsize=12)
ax = fig.add_subplot(gs[1,i])
for nmod in range(const_data[key].shape[0]):
h, e = np.histogram(const_data[key][nmod,...].flatten(), bins=100, range=(vmin, vmax))
ax.plot(e[:-1], h, color=colors[nmod],linewidth=2, label=f'{nmod}', alpha=0.8)
ax.set_xlabel(f'{labels[key]}')
ax.set_ylabel('Counts')
ax.grid()
ax.legend()
plt.show()
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment