#  ePix100 Flat Field Characterization

Author: European XFEL Detector Group, Version 1.0

Generate gain maps from flat-field runs.

In [None]:
in_folder = '/gpfs/exfel/exp/MID/202231/p900310/raw' # input folder, required
out_folder = '' # output folder, required
metadata_folder = ''  # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 29 # which run to read data from, required

# Parameters for accessing the raw data.
karabo_id = "MID_EXP_EPIX-2"  # karabo ID
karabo_da = "EPIX02"  # data aggregator
receiver_template = "RECEIVER" # detector receiver template for accessing raw data files
instrument_source_template = '{}/DET/{}:daqOutput' # instrument detector data source in h5files

# Fit parameters
peak_fitting = 'gauss' # method to find the peak position per pixel: 'median' or 'gauss'
N_sigma_interval = 5   # sigma interval to find singles peak in each per pixel 
peak_energy = 8.048    # [keV] Cu Kα1

# ADU range
ADU_range = [-50,500] # expected range that encloses the raw signal from the FF run 

# Cluster calculators (given in N times sigma noise)
split_evt_primary_threshold = 7   # Split event primary threshold 
split_evt_secondary_threshold = 3  # Split event secondary threshold
split_evt_mip_threshold = 1000     # Threshold for rejection of MIP events (e.g, cosmic-rays)

# Parameters for the calibration database.
cal_db_interface = "tcp://max-exfl-cal001:8020" # calibration DB interface to use
cal_db_timeout = 300000 # timeout on caldb requests
creation_time = ""  # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
db_output = False # Output constants to the calibration database
local_output = True # Output constants locally

# Conditions used for injected calibration constants.
bias_voltage = 200 # Bias voltage
in_vacuum = False # Detector operated in vacuum
fix_integration_time = -1 # Integration time. Set to -1 to read from .h5 file
fix_temperature = -1 # Fixed temperature in Kelvin. Set to -1 to read from .h5 file
temp_limits = 5 # Limit for parameter Operational temperature

# Parameters used during selecting raw data trains.
min_trains = 1 # Minimum number of trains that should be available. Default 1.
max_trains = 0 # Maximum number of trains to use for processing. Set to 0 to use all available trains.

In [None]:
import warnings

import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
import numpy as np
import pasha as psh
from extra_data import RunDirectory
from pathlib import Path
from prettytable import PrettyTable
from scipy.optimize import curve_fit

import XFELDetAna.xfelprofiler as xprof
from XFELDetAna import xfelpyanatools as xana
from XFELDetAna import xfelpycaltools as xcal
from XFELDetAna.plotting.util import prettyPlotting

from cal_tools.enums import BadPixels
from cal_tools.step_timing import StepTimer
from cal_tools.epix100 import epix100lib
from cal_tools.tools import (
    calcat_creation_time,
    get_pdu_from_db,
    get_constant_from_db,
    get_report,
    save_const_to_h5,
    send_to_db,
)
from iCalibrationDB import Conditions, Constants

In [None]:
%matplotlib inline

warnings.filterwarnings('ignore')

prettyPlotting = True

profiler = xprof.Profiler()
profiler.disable()

step_timer = StepTimer()

## Load Data

In [None]:
instrument_src = instrument_source_template.format(karabo_id, receiver_template)

# Run directory
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = f'proposal:{proposal} runs:{run}'
report = get_report(metadata_folder)

ped_dir = Path(in_folder) / f'r{run:04d}'
run_dc = RunDirectory(ped_dir)

print(f"Run is: {run}")
print(f"Instrument H5File source: {instrument_src}")

creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f"Using {creation_time.isoformat()} as creation time")

In [None]:
# Path to pixels ADC values
pixels_src = (instrument_src, "data.image.pixels")

# Specify the total number of images to process
n_trains = run_dc.get_data_counts(*pixels_src).shape[0]

# Modify n_trains to process based on the given maximum and minimum number of trains.
if max_trains:
    n_trains = min(max_trains, n_trains)
    
if n_trains < min_trains:
    raise ValueError(
        f"Less than {min_trains} trains are available in RAW data."
         " Not enough data to process flat fields.")

all_trains = len(run_dc.select(instrument_src).train_ids)
if n_trains != all_trains:
    print(f"Warning: {all_trains - n_trains} trains with empty data.")

print(f'Images to analyze: {n_trains}')

In [None]:
# Read sensor size
sensor_size = run_dc[instrument_src, 'data.image.dims'].as_single_value(reduce_by='first') # (x=768, y=708) expected
sensor_size = sensor_size[sensor_size != 1].tolist()  # data.image.dims for old data is [768, 708, 1]
assert sensor_size == [768,708], 'Unexpected sensor dimensions.' 

ctrl_data = epix100lib.epix100Ctrl(
    run_dc=run_dc,
    instrument_src=instrument_src,
    ctrl_src=f"{karabo_id}/DET/CONTROL",
    )
# Read integration time
if fix_integration_time == -1:
    integration_time = ctrl_data.get_integration_time()
    integration_time_str_add = ''
else:
    integration_time = fix_integration_time
    integration_time_str_add = '(manual input)'
    
# Read temperature    
if fix_temperature == -1:
    temperature = ctrl_data.get_temprature()
    temperature_k = temperature + 273.15
    temp_str_add = ''
else:
    temperature_k = fix_temperature
    temperature = fix_temperature - 273.15
    temp_str_add = '(manual input)'
    
# Print operating conditions
print(f"Bias voltage: {bias_voltage} V")
print(f"Detector integration time: {integration_time} \u03BCs {integration_time_str_add}")
print(f"Mean temperature: {temperature:0.2f}\u00B0C / {temperature_k:0.2f} K {temp_str_add}")
print(f"Operated in vacuum: {in_vacuum}")

In [None]:
step_timer = StepTimer()
step_timer.start()

# Read data
data_dc = run_dc.select(*pixels_src, require_all=True).select_trains(np.s_[:n_trains])
dshape = data_dc[pixels_src].shape

step_timer.done_step('Flat-fields loaded. Elapsed Time')

## Retrieve Necessary Calibration Constants

In [None]:
const_data = dict()
constants = ['Offset', 'Noise', 'BadPixelsDark']

condition =  Conditions.Dark.ePix100(bias_voltage=bias_voltage,
                                     integration_time=integration_time,
                                     temperature=temperature_k,
                                     in_vacuum=in_vacuum)

for cname in constants:        
    const_data[cname] = get_constant_from_db(
        karabo_id=karabo_id,
        karabo_da=karabo_da,
        constant=getattr(Constants.ePix100, cname)(),
        condition=condition,
        empty_constant=None,
        cal_db_interface=cal_db_interface,
        creation_time=creation_time,
        timeout=cal_db_timeout
    )

## Instantiate calculators

In [None]:
block_size = [sensor_size[0]//2, sensor_size[1]//2]
noiseSigma = 5

cmCorrection_block = xcal.CommonModeCorrection(
    sensor_size,
    block_size,
    'block',
    noiseMap=const_data['Noise'].swapaxes(0,1),
    noiseSigma=noiseSigma,
    parallel=False)
cmCorrection_col = xcal.CommonModeCorrection(
    sensor_size,
    block_size,
    'col',
    noiseMap=const_data['Noise'].swapaxes(0,1),
    noiseSigma=noiseSigma,
    parallel=False)
cmCorrection_row = xcal.CommonModeCorrection(
    sensor_size,
    block_size,
    'row',
    noiseMap=const_data['Noise'].swapaxes(0,1),
    noiseSigma=noiseSigma,
    parallel=False)
  
patternClassifier = xcal.PatternClassifier(
    shape=sensor_size,
    noisemap=const_data['Noise'].swapaxes(0,1),
    primaryThreshold=split_evt_primary_threshold,
    secondaryThreshold=split_evt_secondary_threshold,
    upperThreshold=split_evt_mip_threshold,
    blockSize=block_size,
    setPixelMask = const_data['BadPixelsDark'].flatten(),
    parallel=False
)

patternSelector = xcal.PatternSelector(
    sensor_size, 
    selectionList = [100, 101], # singles patterns
    blockSize=block_size, 
    parallel=False)

## Correct data

In [None]:
bin_min = ADU_range[0]
bin_max = ADU_range[1]
bin_width = 1

bins = np.arange(bin_min,bin_max,bin_width)
hist = {'O': 0,'CM': 0,'CS': 0, 'S': 0}

In [None]:
def correct_train(worker_id, index, train_id, dc):

    d = dc[pixels_src[0]][pixels_src[1]].astype(np.float32)

    # Offset correction
    d -= const_data['Offset'].squeeze()
    hist['O'] += np.histogram(d.flatten(),bins=bins)[0]
     
    # Common Mode correction
    d = d.swapaxes(0,-1)
    d = cmCorrection_block.correct(d)
    d = cmCorrection_col.correct(d)
    d = cmCorrection_row.correct(d)
    d = d.swapaxes(0,-1)
    hist['CM'] += np.histogram(d.flatten(),bins=bins)[0]
    
    # Charge Sharing correction
    d = d.swapaxes(0,-1)
    d, patterns = patternClassifier.classify(d)
    sing,fs = patternSelector.select(d,patterns)
    d = d.swapaxes(0,-1)
    hist['CS'] += np.histogram(d[d>0].flatten(),bins=bins)[0]
    hist['S'] += np.histogram(sing[sing>0].flatten(),bins=bins)[0]
    
    data_corr[index+prev_chunk] = d
    data_singles[index+prev_chunk] = sing.swapaxes(0,-1)

In [None]:
step_timer.start()

chunk_size = 1000

psh.set_default_context('threads', num_workers=35) # num_workers=35 was found to be optimal
data_corr = psh.alloc(shape=dshape, dtype=np.float32)
data_singles = psh.alloc(shape=dshape, dtype=int)

chunk = 0
while chunk < dshape[0]-1:
    
    prev_chunk = chunk
    chunk+=chunk_size
    if chunk > dshape[0]: # last chunk may have different size
        chunk = dshape[0]-1
        
    psh.map(correct_train, data_dc.select_trains(np.arange(prev_chunk,chunk)))
        
    print(f'Corrected trains: {chunk} ({round(chunk/dshape[0]*100)}%)',end='\r')

step_timer.done_step('Corrected data. Elapsed Time')

## Plot histograms

In [None]:
bins_c = bins[:-1]+np.diff(bins)[0]/2 # center of bins

plt.figure(figsize=(12,8))
plt.plot(bins_c,hist['O'], label='Offset corrected')
plt.plot(bins_c,hist['CM'], label='Common Mode corrected')
plt.plot(bins_c,hist['CS'], label='Charge Sharing corrected')
plt.plot(bins_c,hist['S'], label='Singles')
plt.xlim(ADU_range)
plt.yscale('log')
plt.xlabel('ADU',fontsize=12)
plt.title(f'{karabo_id} | {proposal} - r{run}', fontsize=14)
plt.legend(fontsize=12);
plt.grid(ls=':')

In [None]:
print(f'Primary threshold: {split_evt_primary_threshold}')
print(f'Secondary threshold: {split_evt_secondary_threshold}')

patternStats = patternClassifier.getPatternStats()

n_singles = np.sum(patternStats['singles'])
n_doubles = np.sum(patternStats['doubles'])
n_triples = np.sum(patternStats['triples'])
n_quads = np.sum(patternStats['quads'])
n_clusters = np.sum(patternStats['clusters'])
known_patterns = np.sum((n_singles, n_doubles, n_triples, n_quads))

t1,t2 = PrettyTable(),PrettyTable()
t1.field_names = ['Photon Hits', 'Frequency']
t1.add_row(['Big Clusters', f'{n_clusters/(known_patterns+n_clusters)*100: .2f} %'])
t1.add_row(['Listed Patterns', f'{known_patterns/(known_patterns+n_clusters)*100: .2f} %'])

print(t1)

t2.field_names = ['Listed Patterns', 'Frequency']
t2.add_row(['Singles', f'{n_singles/known_patterns*100: .2f} %'])
t2.add_row(['Doubles', f'{n_doubles/known_patterns*100: .2f} %'])
t2.add_row(['Triples', f'{n_triples/known_patterns*100: .2f} %'])
t2.add_row(['Quadruplets', f'{n_quads/known_patterns*100: .2f} %'])

print(t2)

## Flat-Field Statistics

In [None]:
# Definition of gaussian function for fitting
def gauss(x, *p):
    A, mu, sigma = p
    return A*np.exp(-(x-mu)**2/(2.*sigma**2))

# rough initial estimate of fit parameters
fit_estimates = [np.max(hist['S']),           # amplitude
                 bins[np.argmax(hist['S'])],  # centroid
                 10]                          # sigma

In [None]:
coeff, _ = curve_fit(gauss, bins_c, hist['S'], p0=fit_estimates)
singles_mu = coeff[1]
singles_sig = abs(coeff[2])
ROI = np.round([singles_mu-N_sigma_interval*singles_sig, # region of interest to find first photopeak per pixel
                singles_mu+N_sigma_interval*singles_sig]).astype(int)
y_fit = gauss(bins_c, *coeff)

plt.figure(figsize=(9,6))
plt.plot(bins_c,hist['S'],'k',label = 'singles')
plt.plot(bins_c,y_fit,'g--',label = 'gauss fit') 
plt.ylim(1,max(hist['S'])*1.5);
plt.xlim(ADU_range)
plt.vlines(coeff[1],0,plt.gca().get_ylim()[1],color='g',ls=':')

plt.axvspan(ROI[0],
            ROI[1],
            alpha = .2,
            color = 'green',
            label = f'\u03BC ± {N_sigma_interval}\u03c3')

plt.legend(fontsize=12);
plt.xlabel('ADU',fontsize=12)
plt.yscale('log')
plt.grid(ls=':')
plt.show()

print('--------------------')
print('Fit parameters:')
print(f'  centroid = {np.round(singles_mu,3)}')
print(f'     sigma = {np.round(singles_sig,3)}')
print('---------------------')

In [None]:
# Calculate singles per pixel
step_timer.start()

singles_per_pixel = np.empty(np.flip(sensor_size))

for py in range(0,int(sensor_size[1])):
    for px in range(0,int(sensor_size[0])):
        singles_per_pixel[py,px] = np.sum((data_singles[:,py,px]>=ROI[0]) & (data_singles[:,py,px]<ROI[1]))

step_timer.done_step('Calculated singles per pixel. Elapsed Time')

In [None]:
mask_bins = np.unique(singles_per_pixel,return_counts=True)[1] > np.max(np.unique(singles_per_pixel,return_counts=True)[1])*.01
last_bin = np.max(np.unique(singles_per_pixel)[mask_bins]) # xlim on bin that has less than 1% of max counts

# Plot singles distribution
fig = xana.heatmapPlot(
    singles_per_pixel,
    lut_label='# singles',
    x_label='Column',
    y_label='Row',
    vmax = last_bin
)
fig.suptitle(f'Singles Distribution', x=.48, y=.9, fontsize=14)
fig.set_size_inches(h=10, w=10);

plt.figure(figsize=(7,5))
plt.hist(singles_per_pixel.flatten(),bins=np.arange(0,last_bin,1),
         align = 'left',
         histtype = 'bar',
         edgecolor='black', 
         linewidth=1.2)
plt.xlabel('Singles per pixel',fontsize=12)
plt.grid(ls='--',axis='y',color='b',alpha=.5)
plt.show()

print(f'Average number of singles per pixel: {np.round(np.sum(data_singles>0)/np.prod(sensor_size),2)}')

## Plot random sample pixels 

In [None]:
N_sample_pixels = 16

# Plot some random pixels, avoiding bad ones
np.random.seed(0)
sample_pixels = np.transpose([np.random.randint(0, sensor_size[0], N_sample_pixels),
                              np.random.randint(0, sensor_size[1], N_sample_pixels)])
while np.sum(const_data['BadPixelsDark'][sample_pixels[:,1],sample_pixels[:,0]]):
    sample_pixels = np.transpose([np.random.randint(0, sensor_size[0], N_sample_pixels),
                                  np.random.randint(0, sensor_size[1], N_sample_pixels)])

fig = plt.figure(figsize=(20,20))
roi_bins = np.arange(ROI[0], ROI[1])
it_counter = 0
for px,py in sample_pixels:
    it_counter+=1    
    
    plt.subplot(int(np.sqrt(N_sample_pixels)),int(np.sqrt(N_sample_pixels)),it_counter)
    
    h,ADU = np.histogram(data_singles[:,py,px],bins=roi_bins)
    ADU_c = ADU[:-1] + np.diff(ADU)[0]/2 # center of bins
    
    p1 = plt.plot([],[],' ',label = f'({px},{py})')
    p2 = plt.scatter(ADU_c[h>0], h[h>0],marker = 'x',c = 'k', label = 'singles')

    mdn = np.median(ADU_c[h>0])
    if ~np.isnan(mdn):
        p3 = plt.plot([mdn, mdn],[0,plt.gca().get_ylim()[1]],color='g', label = f'median={int(mdn)}')
    else:
        p3 = plt.plot([],[],' ', label = 'empty')
        
    try:
        coeff, _ = curve_fit(gauss, ADU_c, h, p0=[0, np.median(ADU_c[h>0]), singles_sig]) 
        y_fit = gauss(ADU_c, *coeff)
        p4 = plt.plot(ADU_c, y_fit, label = f'fit: \u03BC={int(np.round(coeff[1]))}')

    except (RuntimeError, ValueError):
        p4 = plt.plot([],[],' ', label = 'fit error')
    
    plt.grid(ls=':')
    plt.xlabel('ADU')
    plt.xlim(ROI)
    plt.ylim(bottom=0)
    plt.legend()

## Fit single photon peaks per pixel

In [None]:
step_timer.start()
peak_map = np.zeros(np.flip(sensor_size))[...,np.newaxis]

for py in range(0,int(sensor_size[1])):
    for px in range(0,int(sensor_size[0])):            
        h,ADU = np.histogram(data_singles[:,py,px],bins=np.arange(ROI[0],ROI[1]))
        ADU_c = ADU[:-1] + np.diff(ADU)[0]/2 # center of bins
        
        if np.sum(h):
            if peak_fitting=='median':
                peak_map[py,px] = np.median(ADU_c[h>0])
            elif peak_fitting=='gauss':
                try:
                    coeff, _ = curve_fit(gauss, ADU_c, h, p0=[0, np.median(ADU_c[h>0]), singles_sig]) 
                    peak_map[py,px] = coeff[1]
                except RuntimeError:
                    pass         # Failed fits remain 0 
        else:
            peak_map[py,px] = -1 # Assign -1 to empty pixels

peak_map[np.isnan(peak_map)] = 0 # Failed fits can throw no expection but return nan coeffs
step_timer.done_step(f'Calculated relative gain map using {peak_fitting} fit. Elapsed Time')

In [None]:
plt.figure(figsize=(7,5))
plt.hist(peak_map.flatten(),bins=np.arange(ROI[0],ROI[1]),
         histtype = 'bar',
         edgecolor='black',
         alpha = .5,
         linewidth=1.2);

h,ADU = np.histogram(peak_map.flatten(),bins=np.arange(ROI[0],ROI[1]))
ADU_c = ADU[:-1] + np.diff(ADU)[0]/2 # center of bins

coeff, _ = curve_fit(gauss, ADU_c, h, p0=[h.max()/2, singles_mu, singles_sig])
BP_fit_threshold = [coeff[1]-N_sigma_interval*abs(coeff[2]),
                    coeff[1]+N_sigma_interval*abs(coeff[2])]
y_fit = gauss(ADU_c, *coeff)
plt.plot(ADU_c,y_fit, label = f'fit: \u03BC={int(np.round(coeff[1]))}')
plt.vlines(coeff[1],0,plt.gca().get_ylim()[1],color='orange',ls=':')
plt.axvspan(BP_fit_threshold[0],
            BP_fit_threshold[1],
            alpha = .3,
            color = 'orange',
            label = f'\u03BC ± {N_sigma_interval}\u03c3')

plt.grid(ls=':')
plt.xlim(np.array(BP_fit_threshold)*[.9,1.1])
plt.xlabel('Peak position [ADU]',fontsize=12);
plt.legend(fontsize=12)
plt.title(f'{karabo_id} | {proposal} - r{run}', fontsize=12)
plt.ylim((1, coeff[0]*1.2))
plt.show()

print('--------------------')
print('Fit parameters:')
print(f'  centroid = {np.round(coeff[1],3)}')
print(f'     sigma = {np.round(abs(coeff[2]),3)}')
print('---------------------')

## Flat-Field Bad Pixels

In [None]:
const_data['BadPixelsFF'] = np.zeros(np.flip(sensor_size))[...,np.newaxis]

# Empty Pixels
const_data['BadPixelsFF'][peak_map==-1] = BadPixels.FF_NO_ENTRIES.value

# Failed Fits
const_data['BadPixelsFF'][peak_map==0] = BadPixels.FF_GAIN_EVAL_ERROR.value

# Gain out of range
const_data['BadPixelsFF'][(peak_map!=0) & (peak_map!=-1) & ((peak_map<BP_fit_threshold[0]) | (peak_map>BP_fit_threshold[1]))] = BadPixels.FF_GAIN_DEVIATION.value

# Plot Bad Pixels Map
fig = xana.heatmapPlot(
    np.nan_to_num(np.log2(const_data['BadPixelsFF'].squeeze())+1, neginf=np.nan),
    cb_label='Bad pixel bit',
    x_label='Column',
    y_label='Row',
)
fig.suptitle(f'FF Bad Pixels Map({karabo_id} | {proposal} - r{run})', x=.5, y=.9, fontsize=16)
fig.set_size_inches(h=12, w=12)

t = PrettyTable()
t.title = 'Flat-Field Bad Pixel Analysis'
t.field_names = ['Bit', 'Value', 'Type       ', 'Counts', '%']
t.align['Type       '] = 'r'

for BP_type in [BadPixels.FF_GAIN_DEVIATION, BadPixels.FF_GAIN_EVAL_ERROR, BadPixels.FF_NO_ENTRIES]:
    t.add_row([BP_type.bit_length(),
               BP_type.value,
               BP_type.name,
               np.sum(const_data['BadPixelsFF']==BP_type.value),
               np.round(100*np.sum(const_data['BadPixelsFF']==BP_type.value)/np.prod(sensor_size),2)
              ])
t.add_row(['-','-',
           'Total',
           np.sum(const_data['BadPixelsFF']>0),
           np.round(100*np.sum(const_data['BadPixelsFF']>0)/np.prod(sensor_size),2)
          ])
print(t)

## Relative Gain Map

In [None]:
# Replace FF bad pixels with mean peak value
peak_map[const_data['BadPixelsFF']>0] = np.nanmean(peak_map[const_data['BadPixelsFF']==0])

# Calculate relative gain
rel_gain_map = 1/(peak_map.squeeze()/np.mean(peak_map))

fig = xana.heatmapPlot(
    rel_gain_map,
    cb_label='Relative gain',
    x_label='Column',
    y_label='Row',
    vmin=np.floor(np.min(rel_gain_map)/.2)*.2, # force cb limits to be multiples of 0.2 
    vmax=np.ceil(np.max(rel_gain_map)/.2)*.2
)
fig.suptitle(f'Relative Gain Map ({karabo_id} | {proposal} - r{run})', x=.48, y=.9, fontsize=16)
fig.set_size_inches(h=12, w=12)

## Absolute Gain Conversion Constant

In [None]:
step_timer.start()

# Correct data with calculated gain map
data_gain_corrected = data_corr*rel_gain_map

h,ADU = np.histogram(data_gain_corrected.flatten(),
                     bins=np.arange(BP_fit_threshold[0],BP_fit_threshold[1]).astype(int))
ADU_c = ADU[:-1] + np.diff(ADU)[0]/2 # center of bins

coeff, _ = curve_fit(gauss, ADU_c, h, p0=[h.max()/2, singles_mu, singles_sig])
y_fit = gauss(ADU_c, *coeff)

gain_conv_const = coeff[1] / peak_energy

abs_gain_map = rel_gain_map / gain_conv_const

step_timer.done_step('Calculated Gain Conversion Constant. Elapsed Time')

In [None]:
plt.figure(figsize=(7,5))

plt.scatter(ADU_c/gain_conv_const, h, color='k', marker='x', label='Gain Corrected')
plt.plot(ADU_c/gain_conv_const, y_fit, color='orange', label = f'fit: \u03BC={(np.round(coeff[1],2))} ADU');

plt.ylim(bottom=0)
plt.legend()
plt.grid(ls=':')

plt.plot([peak_energy, peak_energy],[0,plt.gca().get_ylim()[1]],color='orange', ls = '--')

ax1 = plt.gca()
ax2 = ax1.twiny()
ax2.set_xticks(ax1.get_xticks())
ax2.set_xbound(ax1.get_xbound())
ax2.set_xticklabels((ax1.get_xticks()*gain_conv_const).astype(int))
ax2.set_xlabel('ADU',fontsize=12)
ax1.set_xlabel('keV',fontsize=12)

ax1.xaxis.label.set_color('red')
ax1.tick_params(axis='x', colors='red')
ax2.xaxis.label.set_color('blue')
ax2.tick_params(axis='x', colors='blue')

plt.suptitle(f'Absolute Gain Conversion ({karabo_id} | {proposal} - r{run})',y =1.02,fontsize = 12)
plt.show()

print(f'Gain conversion constant: {np.round(gain_conv_const,4)} ADU/keV')

## Gain Map Validation

Validation tests:
1. Inspect correlation between calculated gain map and gain map loaded from DB
2. Perform gain correction of current FF with calculated gain map and DB gain map and compare energy resolution and linearity

In [None]:
# Retrieve DB RelativeGain Map
illum_condition_db = Conditions.Illuminated.ePix100(
    bias_voltage=bias_voltage,
    integration_time=integration_time,
    temperature=temperature_k,
    in_vacuum=in_vacuum,
    photon_energy=peak_energy
)

db_gain_map = get_constant_from_db(
    karabo_id=karabo_id,
    karabo_da=karabo_da,
    constant=getattr(Constants.ePix100, 'RelativeGain')(),
    condition=illum_condition_db,
    empty_constant=None,
    cal_db_interface=cal_db_interface,
    creation_time=creation_time,
    timeout=cal_db_timeout
)

if db_gain_map is None:
    print('Waring: No previous RelativeGain map was found for this detector conditions.')

In [None]:
if db_gain_map is not None:
    
    # Calculate gain conversion constant of DB gain map
    gain_conv_const_db = 1/np.median(db_gain_map[const_data['BadPixelsDark'].squeeze()>0])
    
    # Correlate new and DB gain maps
    plt.figure(figsize=(7,7))

    plt.hist2d(db_gain_map.flatten(),
               abs_gain_map.flatten(),
               bins = 200,
               norm=LogNorm(),
              );
    plt.xlabel('DB noise map',fontsize=12)
    plt.ylabel('New noise map',fontsize=12)

    plt.xlim(np.min([db_gain_map,abs_gain_map]),np.max([db_gain_map,abs_gain_map]))
    plt.ylim(np.min([db_gain_map,abs_gain_map]),np.max([db_gain_map,abs_gain_map]))
    plt.grid(ls=':')

    rel_change = np.mean(abs(abs_gain_map-db_gain_map)/abs_gain_map)
    print(f'Average relative change of new gain map: {np.round(rel_change*100,3)} %')

In [None]:
def correct_validation_train(worker_id, index, train_id, dc):

    d = dc[pixels_src[0]][pixels_src[1]].astype(np.float32)

    # Offset correction
    d -= const_data['Offset'].squeeze()

    # Common Mode correction
    d = d.swapaxes(0,-1)
    d = cmCorrection_block.correct(d)
    d = cmCorrection_col.correct(d)
    d = cmCorrection_row.correct(d)
    d = d.swapaxes(0,-1)

    # Relative Gain correction
    d_new_map = d*rel_gain_map
    if db_gain_map is not None:
        d_db_map  = d*db_gain_map*gain_conv_const_db

    # Charge Sharing correction
    d, patterns = patternClassifier.classify(d.swapaxes(0,-1))
    FF_data[index] = d.swapaxes(0,-1) # no gain correction
    
    d_new_map, patterns = patternClassifier.classify(d_new_map.swapaxes(0,-1))
    FF_data_new_map[index] = d_new_map.swapaxes(0,-1) # gain correction with new gain map
    
    if db_gain_map is not None:
        d_db_map, patterns = patternClassifier.classify(d_db_map.swapaxes(0,-1))
        FF_data_db_map[index] = d_db_map.swapaxes(0,-1) # gain correction with DB gain map

In [None]:
# Correct validation trains
step_timer.start()

N_validation_trains = 1000

FF_data = psh.alloc(shape=(N_validation_trains,dshape[1],dshape[2]), dtype=np.float32)
FF_data_new_map = psh.alloc(shape=(N_validation_trains,dshape[1],dshape[2]), dtype=np.float32)
if db_gain_map is not None:
    FF_data_db_map = psh.alloc(shape=(N_validation_trains,dshape[1],dshape[2]), dtype=np.float32)

psh.map(correct_validation_train, data_dc.select_trains(np.s_[:N_validation_trains]))

step_timer.done_step('Corrected evaluation data. Elapsed Time')

In [None]:
# Calculate histograms
bins_FF = np.arange(-50,800)
FF_hist_CS = np.histogram(FF_data[FF_data>0].flatten(),bins=bins_FF)[0]

bins_keV_new = bins_FF/gain_conv_const
FF_hist_GC_new_map = np.histogram(FF_data_new_map/gain_conv_const,bins=bins_keV_new)[0]

plt.figure(figsize=(12,8))
bins_ADU = bins_FF[:-1] + np.diff(bins_FF)[0]/2 # center of bins
bins_keV_new = bins_keV_new[:-1] + np.diff(bins_keV_new)[0]/2 # center of bins
plt.plot(bins_ADU,FF_hist_CS, color='black', label='Before gain correction')
plt.plot(bins_keV_new*gain_conv_const, FF_hist_GC_new_map, color='b', label='Gain correction with new map')

if db_gain_map is not None:
    bins_keV_db = bins_FF/gain_conv_const_db
    FF_hist_GC_db_map = np.histogram(FF_data_db_map/gain_conv_const_db,bins=bins_keV_db)[0]
    bins_keV_db = bins_keV_db[:-1] + np.diff(bins_keV_db)[0]/2 # center of bins
    plt.plot(bins_keV_db*gain_conv_const_db, FF_hist_GC_db_map, color='r', label='Gain correction with DB map')

plt.yscale('log')
plt.xlim(1,bins_FF[-1]+1)

plt.xlabel('ADU',fontsize=12)
plt.legend(fontsize=12)
plt.title(f'{karabo_id} | {proposal} - r{run}', fontsize=14)
plt.grid(ls=':')

In [None]:
N_peaks = 4
sigma_tol = 2 # sigma tolerance to show in gauss fit

# Ignore split events below primary energy threshold
E_cut = np.mean(const_data['Noise'])*split_evt_primary_threshold/gain_conv_const

too_many_peaks = True
while too_many_peaks: # Iterate backwards on number of peaks until no exception is thrown
    try:
        FF_hist_AGC_new = FF_hist_GC_new_map/gain_conv_const
        if db_gain_map is not None:
            FF_hist_AGC_db = FF_hist_GC_db_map/gain_conv_const_db
        else:
            FF_hist_AGC_db=None
            bins_keV_db=None

        E_res,rel_dev,abs_dev = [],[],[]
        colors = ['blue','red']

        for FF_hist_AGC,bins_keV,leg in zip([FF_hist_AGC_new,FF_hist_AGC_db],[bins_keV_new,bins_keV_db],['new','DB']):

            if FF_hist_AGC is None:
                continue

            FF_hist_AGC = FF_hist_AGC[bins_keV>E_cut]
            FF_hist_AGC[0]=0

            bins_keV = bins_keV[bins_keV>E_cut]
            c = colors[0]
            colors.pop(0)

            fig = plt.figure(figsize=(12,6))
            plt.suptitle('Correction with '+leg+' gain map',fontsize = 15)

            plt.fill(bins_keV,FF_hist_AGC, color='k',alpha=.2,label='Corrected data')
            plt.title(f'{karabo_id} | {proposal} - r{run}', fontsize=14)

            ylim_top = plt.gca().get_ylim()[1]

            ROI_shift = 0
            for p in range(1,N_peaks+1):

                peak_ROI = np.array([p*peak_energy-peak_energy/2, p*peak_energy+peak_energy/2]) + ROI_shift
                xx = (bins_keV>peak_ROI[0]) & (bins_keV<peak_ROI[1])

                coeff, _ = curve_fit(gauss, bins_keV[xx], FF_hist_AGC[xx], p0=[FF_hist_AGC[xx].max(), p*peak_energy, 1])
                y_fit = gauss(bins_keV[xx], *coeff)

                xx_sigma_lim = (bins_keV>coeff[1]-abs(coeff[2])*sigma_tol) & (bins_keV<coeff[1]+abs(coeff[2])*sigma_tol)

                plt.vlines(p*peak_energy,0,ylim_top,ls='-',color='grey',label=f'expected peaks')
                plt.fill_between(bins_keV[xx_sigma_lim],
                                 FF_hist_AGC[xx_sigma_lim],
                                 color='orange',
                                 alpha=.5,
                                 label=f'\u03BC ± {sigma_tol}\u03c3')
                plt.plot(bins_keV[xx],y_fit,color=c)
                plt.vlines(coeff[1],0,ylim_top,ls='--',color=c,label=f'peak {p}: {coeff[1]:,.2f} keV')

                ROI_shift = coeff[1] - p*peak_energy   

                E_res.append(abs(2*np.sqrt(2*np.log(2))*coeff[2]/coeff[1])*100)
                abs_dev.append(coeff[1]-peak_energy*p)
                rel_dev.append(abs(abs_dev[-1])/(peak_energy*p)*100)

            plt.yscale('log')    
            plt.xlabel('keV',fontsize=12)
            plt.xlim(left=0)
            plt.ylim(.1,ylim_top)

            # Remove repeated entries from legend
            handles, labels = plt.gca().get_legend_handles_labels()
            by_label = dict(zip(labels, handles))
            plt.legend(by_label.values(), by_label.keys())
            plt.grid(ls=':')

            t = PrettyTable()
            t.field_names = ['Peak','Energy Resolution','Rel. Deviation','Abs. Deviation']
            t.title = f'{leg} gain map'
            for p in range(-N_peaks,0):
                t.add_row([f'#{p+N_peaks+1}: {peak_energy*(p+N_peaks+1):,.3f} keV',
                            f'{E_res[p]:,.2f} %', 
                            f'{rel_dev[p]:,.2f} %',
                            f'{abs_dev[p]:,.2f} keV'])        
            print(t)
            
            too_many_peaks = False
            plt.show()

    # throw exception if fit fails due to wrong estimate of number of peaks
    except RuntimeError: 
        N_peaks -= 1
        plt.close(fig) # delete plots if exception was found due to wrong number of peaks

## Linearity Analysis

In [None]:
peaks = np.arange(1,N_peaks+1)
plt.figure(figsize=(15,6))
plt.subplot(1,2,1)
plt.plot(peaks,[peak_energy*p for p in peaks], '-', c='k', label='expected')
plt.plot(peaks,[peak_energy*p for p in peaks]+np.array(abs_dev[:N_peaks]), 'o', c='b')
fit_coeffs= np.polyfit(peaks,[peak_energy*p for p in peaks]+np.array(abs_dev[:N_peaks]),1)

plt.plot(peaks,fit_coeffs[0]*peaks+fit_coeffs[1], '--', c='b', label='New gain map')
str_theo  = f'$a_1$={peak_energy :,.4f}, $a_0$=0'
str_new = f'$a_1$={fit_coeffs[0]:,.4f}, $a_0$={fit_coeffs[1]:,.4f}'
plt.annotate(s=str_theo,xy=(.36,.94),xycoords='axes fraction',fontsize=11,bbox=dict(facecolor='k',alpha=.2,pad=1))
plt.annotate(s=str_new ,xy=(.36,.88),xycoords='axes fraction',fontsize=11,bbox=dict(facecolor='b',alpha=.2,pad=1))

xx = np.arange(1,100,.1) # in photons
y_fit_new = fit_coeffs[0]*xx+fit_coeffs[1] # extrapolation for 100 photons

plt.xticks(peaks)
plt.title(f'Linearity ({karabo_id} | {proposal} - r{run})')
plt.xlabel('# Photons')
plt.ylabel('Energy (keV)')
plt.legend(fontsize=12)
plt.grid(ls=':')

plt.subplot(1,2,2)
dev_new = (y_fit_new-(peak_energy*xx))/(peak_energy*xx)*100
plt.plot(xx*peak_energy,dev_new,c='b', label='New gain map')
plt.xscale('log')
plt.xlim(right=100)
plt.xlabel('Energy (keV)')
plt.ylabel('Linearity Deviation (%)')
plt.title(f'Linearity extrapolation ({karabo_id} | {proposal} - r{run})')
plt.grid(ls=':',which='both')

if db_gain_map is not None:
    plt.subplot(1,2,1)
    
    db_fit = np.polyfit(peaks,[peak_energy*p for p in peaks]+np.array(abs_dev[N_peaks:]),1)
    plt.plot(peaks,[peak_energy*p for p in peaks]+np.array(abs_dev[N_peaks:]), 'o', c='r')
    plt.plot(peaks,db_fit[0]*peaks+db_fit[1], '--', c='r', label='DB gain map')
    
    str_db  = f'$a_1$={db_fit[0] :,.4f}, $a_0$={db_fit[1] :,.4f}'
    y_fit_db = db_fit[0]*xx+db_fit[1] # extrapolation for 100 photons
    plt.annotate(s=str_db  ,xy=(.36,.82),xycoords='axes fraction',fontsize=11,bbox=dict(facecolor='r',alpha=.2,pad=1))

    plt.subplot(1,2,2)
    dev_db = (y_fit_db-(peak_energy*xx))/(peak_energy*xx)*100
    plt.plot(xx*peak_energy,dev_db,c='r', label='DB gain map')
    plt.legend(fontsize=12)

## Energy Resolution Analysis

In [None]:
def power_function(x,*p):
    a,b,c = p
    return a*x**b + c
# rough initial estimate of fit parameters
fit_estimates = [20,-.5,0]

In [None]:
# Linearity of the visualized peaks
plt.figure(figsize=(8,6))

xx = np.arange(0,50,.1)
if db_gain_map is not None:
    plt.plot(peaks*peak_energy,E_res[N_peaks:], 'o', c='r', label='DB gain map')
    coeff,_ = curve_fit(power_function,peaks*peak_energy,E_res[N_peaks:],p0=fit_estimates)
    power_fit = power_function(xx,*coeff)
    plt.plot(xx,power_fit, '--', c='r')

plt.plot(peaks*peak_energy,E_res[:N_peaks], 'o', c='b', label='New gain map')
coeff,_ = curve_fit(power_function,peaks*peak_energy,E_res[:N_peaks],p0=fit_estimates)
power_fit = power_function(xx,*coeff)
plt.plot(xx,power_fit, '--', c='b')

plt.title(f'Energy Resolution ({karabo_id} | {proposal} - r{run})')
plt.xlabel('Energy (keV)')
plt.ylabel('Energy Resolution (%)')
plt.legend(fontsize=12)
plt.xlim(1,np.ceil(xx[-1]))
plt.ylim(0,30)
plt.grid(ls=':')

## Calibration Constants DB
Send the flat-field constants (RelativeGain and BadPixelsIlluminated) to the database and/or save them locally.

In [None]:
# Save constants to DB

md = None

constant_maps = {'RelativeGain': abs_gain_map,
                 'BadPixelsIlluminated': const_data['BadPixelsFF']
                } 

for const_name in constant_maps.keys():
    const = getattr(Constants.ePix100, const_name)()
    const.data = constant_maps[const_name].data

    for parm in illum_condition_db.parameters:
        if parm.name == "Sensor Temperature":
            parm.lower_deviation = temp_limits
            parm.upper_deviation = temp_limits

    # Get physical detector unit
    db_module = get_pdu_from_db(
        karabo_id=karabo_id,
        karabo_da=karabo_da,
        constant=const,
        condition=illum_condition_db,
        cal_db_interface=cal_db_interface,
        snapshot_at=creation_time)[0]

    # Inject or save calibration constants
    if db_output:
        md = send_to_db(
            db_module=db_module,
            karabo_id=karabo_id,
            constant=const,
            condition=illum_condition_db,
            file_loc=file_loc,
            report_path=report,
            cal_db_interface=cal_db_interface,
            creation_time=creation_time,
            timeout=cal_db_timeout
        )

    if local_output:
        md = save_const_to_h5(
            db_module=db_module,
            karabo_id=karabo_id,
            constant=const,
            condition=illum_condition_db,
            data=const.data,
            file_loc=file_loc,
            report=report,
            creation_time=creation_time,
            out_folder=out_folder
        )
        print(f"Calibration constant {const_name} is stored locally at {out_folder} \n")

print("Constants parameter conditions are:\n"
      f"• Bias voltage: {bias_voltage}\n"
      f"• Integration time: {integration_time}\n"
      f"• Temperature: {temperature_k}\n"
      f"• Source Energy: {peak_energy}\n"      
      f"• In Vacuum: {in_vacuum}\n"
      f"• Creation time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n")