Skip to content
Snippets Groups Projects
Commit a75704d7 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

Update docstrings and variable names. Avoid many prints for each histogram chunk

parent 2f15817c
No related branches found
No related tags found
1 merge request!1055[Jungfrau][FF] Improve Fitting performance and stop using pydetlib + many refactors.
%% Cell type:markdown id: tags:
# Jungfrau Detector Flatfield Gain Calibration - Part 1: Histogram Creation and Fitting
Author: European XFEL Detector Group, Version: 1.0
This notebook performs the first part of the Jungfrau detector flatfield gain calibration process.
#### Overview
1. Load Raw Data: Read detector data for specified run. Currently works for only one run.
2. Initialize Histograms: Create empty histograms for each module and memory cell
3. Offset Correction: Apply offset subtraction to raw data
4. Histogram Population: Fill histograms with offset-corrected data
5. Histogram Fitting: Apply specified function (Charge Sharing, Double Charge Sharing, or Gaussian)
6. Save Intermediate Results:
- Store histogram data as HDF5 files
- Save fit results (gain map, sigma map, chi-square map, alpha map) as HDF5 files
Note: Results from this notebook are used later for Gain Map and BadPixelsFF Generation.
%% Cell type:code id: tags:
``` python
in_folder = '/gpfs/exfel/exp/SPB/202330/p900343/raw' # RAW data path, required
out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove/JF4M_SPB_gain/r66/no_offset' # Output path for gain data, required
metadata_folder = '.' # Directory containing calibration_metadata.yml when run by xfel-calibrate
runs = [66] # can be a list of runs
sensor_size = [512, 1024] # size of the array in the 'row' and 'col' dimensions
chunked_trains = 1000 # Number of trains per chunk.
gains = [0, 1, 2] # gain bit values
karabo_id = "SPB_IRDA_JF4M" # karabo prefix of Jungfrau devices
karabo_da = [""] # Detector's data aggregators. Leave empty to derive from CALCAT.
creation_time = '' # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
# Parameter conditions
bias_voltage = -1 # bias voltage - Set to -1 to derive from CONTROL file.
integration_time = -1. # integration time - Set to -1 to derive from CONTROL file.
gain_mode = -1 # number of memory cells - Set to -1 to derive from CONTROL file.
gain_setting = -1 # gain setting parameter conditions (High or low CDS) - Set to -1 to derive from CONTROL file.
memory_cells = -1 # number of memory cells - Set to -1 to derive from CONTROL file.
sc_start = -1 # storage cell start value - should be derived from CONTROL file.
h_bins_s = 2 # bin width in ADC units of the histogram
h_range = [-50, 450.] # histogram range in ADC units
block_size = [256, 64] # dimension of the chunks in 'row' and 'col'
control_src_template = '{}/DET/CONTROL'
extra_dims = ['cell', 'row', 'col'] # labels for the DataArray dims after the first
fit_func = 'CHARGE_SHARING' # function used to fit the single photon peak. The 3 available options are: CHARGE_SHARING, CHARGE_SHARING_2, and GAUSS
fit_h_range = [150, 350.] # range of the histogram in x-axis units used in fitting
# parameters for the peak finder
n_sigma = 5. # n of sigma above pedestal threshold
ratio = 0.99 # ratio of the next peak amplitude in the peak_finder
rebin_factor = 1 # Number of adjacent bins to combine before fitting the histogram.
def find_das(in_folder, runs, karabo_da):
run = runs[0]
if (karabo_da is not None) and karabo_da != [""]:
return karabo_da
from pathlib import Path
import re
karabo_da = set()
for file in Path(in_folder, f'r{run:04d}').iterdir():
da = re.search(r'JNGFR\d{2}', file.name)
if da:
karabo_da.add(da.group())
return sorted(karabo_da)
```
%% Cell type:code id: tags:
``` python
import multiprocessing
import time
from functools import partial
from logging import warning
from pathlib import Path
import numpy as np
import pasha as psh
from extra_data import RunDirectory
from h5py import File as h5file
import cal_tools.restful_config as rest_cfg
from cal_tools.calcat_interface import JUNGFRAU_CalibrationData
from cal_tools.jungfrau import jungfrau_ff
from cal_tools.jungfrau.jungfraulib import JungfrauCtrl
from cal_tools.step_timing import StepTimer
from cal_tools.tools import calcat_creation_time
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
in_folder = Path(in_folder)
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True, parents=True)
print(f'Creating directory {out_folder}')
```
%% Cell type:code id: tags:
``` python
begin_stuff = time.localtime()
control_src = control_src_template.format(karabo_id, karabo_da[0])
first_run_folder = in_folder / f'r{runs[0]:04d}'
ctrl_data = JungfrauCtrl(RunDirectory(first_run_folder), control_src)
# Run's creation time:
creation_time = calcat_creation_time(in_folder, runs[0], creation_time)
print(f"Creation time: {creation_time}")
print("Chunked trains: ", chunked_trains)
```
%% Cell type:code id: tags:
``` python
def read_detector_conditions(
run_dc,
run_number,
bias_voltage,
memory_cells,
integration_time,
gain_mode,
gain_setting,
):
"""Get parameter conditions for a run.
Args:
run_dc(extra_data.DataCollection):
run_number(int):
integration_time(float)
bias_voltage(float):
gain_mode(int):
gain_setting(int):
Returns:
float, int, float, int, int, int
"""
## Retrieve DB constants
ctrl_data = JungfrauCtrl(run_dc, control_src)
if memory_cells < 0:
memory_cells, sc_start = ctrl_data.get_memory_cells()
if integration_time < 0:
integration_time = ctrl_data.get_integration_time()
if bias_voltage < 0:
bias_voltage = ctrl_data.get_bias_voltage()
if gain_setting < 0:
gain_setting = ctrl_data.get_gain_setting()
if gain_mode < 0:
gain_mode = ctrl_data.get_gain_mode()
if memory_cells == 16 and gain_mode == 0:
warning(
"Data is in burst mode and dynamic gain. Forcing gain mode to fixed gain.")
gain_mode = 1
return (
bias_voltage,
memory_cells,
sc_start,
integration_time,
gain_mode,
gain_setting,
run_number,
)
condition_lists = {
"integration_time": [],
"memory_cells": [],
"sc_start": [],
"bias_voltage": [],
"gain_setting": [],
"gain_mode": [],
"runs": [],
}
for run in runs:
voltage, cells, sc_start, integration, mode, setting, run_number = read_detector_conditions( # noqa
RunDirectory(in_folder / f"r{run:04d}"),
run,
bias_voltage=bias_voltage,
memory_cells=memory_cells,
integration_time=integration_time,
gain_mode=gain_mode,
gain_setting=gain_setting)
condition_lists["bias_voltage"].append(voltage)
condition_lists["memory_cells"].append(cells)
condition_lists["integration_time"].append(integration)
condition_lists["gain_mode"].append(mode)
condition_lists["gain_setting"].append(setting)
condition_lists["sc_start"].append(sc_start)
# Validate run conditions
runs_results = condition_lists["runs"]
for c, lst in condition_lists.items():
if c == "runs":
continue
if not all(val == lst[0] for val in lst):
warning(
f"{c} is not the same for all runs: {lst} "
f"for runs {runs_results}, respectively"
)
memory_cells = condition_lists["memory_cells"][0]
sc_start = condition_lists["sc_start"][0]
print(f"Memory cells: {memory_cells:d}")
bias_voltage = condition_lists["bias_voltage"][0]
print(f"Bias Voltage: {bias_voltage:3.1f} V")
integration_time = condition_lists["integration_time"][0]
print(f"Exposure Time: {integration_time:1.2f} us")
gain_mode = condition_lists["gain_mode"][0]
print(f"Gain mode: {gain_mode}")
gain_setting = condition_lists["gain_setting"][0]
print(f"Gain setting: {gain_setting}")
```
%% Cell type:code id: tags:
``` python
# Retrieve calibration constants
jf_cal = JUNGFRAU_CalibrationData(
detector_name=karabo_id,
sensor_bias_voltage=bias_voltage,
event_at=creation_time,
modules=karabo_da,
memory_cells=memory_cells,
integration_time=integration_time,
gain_setting=gain_setting,
# Retrieve fixed gain constants for burst mode.
# Dark MDL pushes fixed gain constants for burst mode.
gain_mode=gain_mode,
client=rest_cfg.calibration_client(),
)
jf_metadata = jf_cal.metadata(
calibrations=["Offset10Hz", "Noise10Hz"])
# TODO: display CCV timestamp
const_data = jf_cal.ndarray_map(metadata=jf_metadata)
jf_cal.display_markdown_retrieved_constants(metadata=jf_metadata)
for mod in karabo_da:
constants_data = const_data[mod]
if "Offset10Hz" in constants_data:
# From (512, 1024, 1, 3) to (3, 1, 512, 1024)
constants_data["Offset10Hz"] = np.transpose(
np.swapaxes(constants_data["Offset10Hz"], 0, 1)).astype(np.float32)
else:
raise ValueError(f"Offset constant is not found for {mod}.")
if "Noise10Hz" in constants_data:
# From (512, 1024, 1, 3) to (3, 1, 512, 1024)
constants_data["Noise10Hz"] = np.transpose(
np.swapaxes(constants_data["Noise10Hz"], 0, 1)).astype(np.float32)
else:
constants_data["Noise10Hz"] = None
warning(f"Noise constant is not found for {mod}")
```
%% Cell type:markdown id: tags:
## Filling the histogram
* first it will retreive DB offsets for a given run
* on a file-by-file basis it will perform offset subtraction and fill the histogram
%% Cell type:markdown id: tags:
### Creating empty histogram
%% Cell type:code id: tags:
``` python
h_n_bins = int((h_range[1] - h_range[0])/h_bins_s)
h_bins = np.linspace(h_range[0], h_range[1], h_n_bins, dtype=np.float32)
h_spectra = dict()
edges = dict()
for da in karabo_da:
h_spectra[da] = np.zeros(
(h_n_bins-1, memory_cells, sensor_size[0], sensor_size[1]),
dtype=np.int32,
)
edges[da] = None
```
%% Cell type:code id: tags:
``` python
def correct_train(wid, index, d):
"""Offset correct per train."""
g = gain[index].values
# Jungfrau gains 0[00], 1[01], 3[11]
# Change low gain to 2 for indexing purposes.
g[g==3] = 2
# Select memory cells
if memory_cells > 1:
"""
Even though it is correct to assume that memory cells pattern
can be the same across all trains (for one correction run
taken with one acquisition), it is preferred to not assume
this to account for exceptions that can happen.
"""
m = memcells[index].copy()
# 255 is a cell value pointing to no cell image data (image of 0 pixels).
# Corresponding image will be corrected with constant of cell 0. To avoid values of 0.
# This line is depending on not storing the modified memory cells in the corrected data.
m[m==255] = 0
offset_map_cell = offset_map[:, m, ...] # [16 + empty cell, x, y]
else:
offset_map_cell = offset_map[:, 0, ...]
# Offset correction
offset = np.choose(g, offset_map_cell)
d -= offset
# Sort correct data based on memory cell order.
sort_indices = np.argsort(m)
data_corr[index, ...] = d[sort_indices, ...]
```
%% Cell type:code id: tags:
``` python
step_timer = StepTimer()
n_cpus = multiprocessing.cpu_count()
context = psh.context.ProcessContext(num_workers=n_cpus)
print(f"Using {n_cpus} workers for correction.")
```
%% Cell type:code id: tags:
``` python
for run in runs:
for da in karabo_da:
det_src = f'{karabo_id}/DET/{da}:daqOutput'
offset_map = const_data[da]["Offset10Hz"]
run_folder = in_folder / f'r{run:04d}'
## Offset subtraction & Histogram filling
### performs offset subtraction and chunk_trains fills the histogram
### looping on individual files because single photon runs can be very large
chunks_list = list(RunDirectory(
run_folder, include=f"*{da}*").split_trains(trains_per_part=chunked_trains))
print(f"Processing raw data and filling histogram in {len(chunks_list)} chunks")
for dc_chunk in chunks_list:
memcells = dc_chunk.get_array(
det_src,
'data.memoryCell',
extra_dims=extra_dims[0:1]).astype(np.int16)
adc = dc_chunk.get_array(det_src, 'data.adc', extra_dims=extra_dims).astype(np.float32)
gain = dc_chunk.get_array(det_src, 'data.gain', extra_dims=extra_dims)
step_timer.start()
# gain = gain.where(gain < 2, other = 2).astype(np.int16)
# Allocate shared arrays for corrected data. Used in `correct_train()`
data_corr = context.alloc(shape=adc.shape, dtype=np.float32)
context.map(correct_train, adc)
step_timer.done_step("correct_train")
step_timer.done_step("correcting trains per chunk", print_step=False)
step_timer.start()
chunks = jungfrau_ff.chunk_multi(data_corr, block_size)
ch_inp = [(c, h_bins) for c in chunks]
with multiprocessing.Pool(processes=min(n_cpus, len(chunks))) as pool:
results = pool.starmap(jungfrau_ff.fill_histogram, ch_inp)
for i, (h_chk, e_chk) in enumerate(results):
if edges[da] is None:
edges[da] = e_chk
n_blocks_col = int(h_spectra[da].shape[-1]/block_size[1])
irow = int(np.floor(i/n_blocks_col)) * block_size[0]
icol = i%n_blocks_col * block_size[1]
h_spectra[da][..., irow:irow+block_size[0], icol:icol+block_size[1]] += h_chk
step_timer.done_step("Histogram created")
step_timer.done_step("Histogram created per chunk", print_step=False)
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
%% Cell type:markdown id: tags:
### Saving histogram
%% Cell type:code id: tags:
``` python
for da in karabo_da:
# transpose h_spectra for the following cells.
h_spectra[da] = h_spectra[da]
fout_h_path = out_folder/ f"R{runs[0]:04d}_{proposal.upper()}_Gain_Spectra_{da}_Histo.h5"
hists = h_spectra[da]
with h5file(fout_h_path, 'w') as fout_h:
print(f"Saving histograms at {fout_h_path}.")
dset_h = fout_h.create_dataset("histos", data=hists)
dset_e = fout_h.create_dataset("edges", data=edges[da])
dset_noi = fout_h.create_dataset(
"noise_map",
data=const_data[da]["Noise10Hz"])
fout_h.attrs["memory_cells"] = memory_cells
fout_h.attrs["gain_setting"] = gain_setting
fout_h.attrs["gain_mode"] = gain_mode
fout_h.attrs["integration_time"] = integration_time
fout_h.attrs["bias_voltage"] = bias_voltage
fout_h.attrs["creation_time"] = str(creation_time)
```
%% Cell type:code id: tags:
``` python
def create_and_fill_map(r_maps, shape, dtype=np.float32):
g0_map = np.zeros(shape, dtype=dtype)
sigma_map = np.zeros(shape, dtype=dtype)
chi2ndf_map = np.zeros(shape, dtype=dtype)
alpha_map = np.zeros(shape, dtype=dtype)
n_blocks_col = int(shape[-1] / block_size[1])
for i, (g0_chk, sigma_chk, chi2ndf_chk, alpha_chk) in enumerate(r_maps):
irow = int(np.floor(i / n_blocks_col)) * block_size[0]
icol = i % n_blocks_col * block_size[1]
slice_obj = (
...,
slice(irow, irow + block_size[0]),
slice(icol, icol + block_size[1])
)
g0_map[slice_obj] = g0_chk
sigma_map[slice_obj] = sigma_chk
chi2ndf_map[slice_obj] = chi2ndf_chk
alpha_map[slice_obj] = alpha_chk
return g0_map, sigma_map, chi2ndf_map, alpha_map
```
%% Cell type:markdown id: tags:
## Fitting histograms
%% Cell type:code id: tags:
``` python
fout_temp = f"R{runs[0]:04d}_{proposal.upper()}_Gain_Spectra_{{}}_{fit_func}_Fit.h5"
for da in karabo_da:
_edges = np.array(edges[da])
bin_center = (_edges[1:] + _edges[:-1]) / 2.0
bin_center, _h_spectra = jungfrau_ff.set_histo_range(bin_center, h_spectra[da], fit_h_range)
print(f'adjusting histogram range: {bin_center[0]} - {bin_center[-1]}')
print(f'Histo shape updated from {h_spectra[da].shape} to {_h_spectra.shape}')
print(f'x-axis: {bin_center.shape}')
chunks = jungfrau_ff.chunk_multi(_h_spectra, block_size)
partial_fit = partial(
jungfrau_ff.fit_histogram,
bin_center,
fit_func,
n_sigma,
rebin_factor,
ratio,
const_data[da]["Noise10Hz"],
)
print(f"Starting spectra fit for {len(chunks)} chunks")
step_timer.start()
with multiprocessing.Pool() as pool:
r_maps = pool.map(partial_fit, chunks)
step_timer.done_step("r_maps calculation")
step_timer.start()
map_shape = (memory_cells, sensor_size[0], sensor_size[1])
g0_map, sigma_map, chi2ndf_map, alpha_map = create_and_fill_map(r_maps, map_shape)
step_timer.done_step("Finished fitting")
step_timer.start()
fout_path = out_folder / fout_temp.format(da)
with h5file(fout_path, "w") as fout:
dset_chi2 = fout.create_dataset("chi2map", data=np.transpose(chi2ndf_map))
dset_gmap_fit = fout.create_dataset("gainMap_fit", data=np.transpose(g0_map))
dset_std = fout.create_dataset("sigmamap", data=np.transpose(sigma_map))
dset_alpha = fout.create_dataset("alphamap", data=np.transpose(alpha_map))
dset_noi = fout.create_dataset("noise_map", data=const_data[da]["Noise10Hz"])
fout.attrs["memory_cells"] = memory_cells
fout.attrs["integration_time"] = integration_time
fout.attrs["bias_voltage"] = bias_voltage
fout.attrs["gain_setting"] = gain_setting
fout.attrs["gain_mode"] = gain_mode
fout.attrs["creation_time"] = str(creation_time)
fout.attrs["karabo_id"] = karabo_id
step_timer.done_step("Saved fit results")
```
......
......@@ -3,25 +3,28 @@ from scipy.special import erf
from iminuit import Minuit
def histogram_errors(x):
def histogram_errors(bin_counts):
"""
Calculate the errors for histogram bins.
This function computes the error for each bin in a histogram,
treating positive and negative bin counts differently.
Args:
x (np.ndarray): Input array of histogram bin counts.
Returns:
np.ndarray: Array of calculated errors.
"""
out = np.ones_like(x)
out = np.ones_like(bin_counts)
# Handle positive values
positive_mask = x > 0
out[positive_mask] = np.sqrt(x[positive_mask])
positive_mask = bin_counts > 0
out[positive_mask] = np.sqrt(bin_counts[positive_mask])
# Handle negative values
negative_mask = x < 0
out[negative_mask] = np.sqrt(-x[negative_mask])
negative_mask = bin_counts < 0
out[negative_mask] = np.sqrt(-bin_counts[negative_mask])
return out
......@@ -73,37 +76,41 @@ def chunk_multi(data, block_size):
return chunks
def fill_histogram(imgs, h_bins):
def fill_histogram(image_data, histogram_bins):
"""
Fills an histogram with shape
(n_bins-1, memory_cells, n_rows, n_cols)
from input images.
Fill a histogram from input images.
This function creates a histogram with shape
(n_bins-1, memory_cells, n_rows, n_cols) from the input image data.
Args:
h_bins (list, float): the binning of the x-axis
imgs (np.ndarray): image data to histogram
histogram_bins (list, float): the binning of the x-axis
image_data (np.ndarray): image data to histogram
(trains, memory_cells, row, col).
Returns: histogram bin counts, bin edges.
Raises
TypeError: If imgs is not a numpy ndarray.
ValueError: If imgs is not a 4D array.
"""
if not isinstance(imgs, np.ndarray):
if not isinstance(image_data, np.ndarray):
raise TypeError("Expected imgs numpy ndarray type.")
if imgs.ndim < 4:
if image_data.ndim < 4:
raise ValueError("Expected 4D imgs array.")
n_cells, n_rows, n_cols = imgs.shape[1:]
n_cells, n_rows, n_cols = image_data.shape[1:]
h_chk = np.zeros(
(len(h_bins)-1, n_cells, n_rows, n_cols),
(len(histogram_bins)-1, n_cells, n_rows, n_cols),
dtype=np.int32)
e_chk = None
for cell in range(n_cells):
for row in range(n_rows):
for col in range(n_cols):
this_pix = imgs[:, cell, row, col]
_h, _e = np.histogram(this_pix, bins=h_bins)
this_pix = image_data[:, cell, row, col]
_h, _e = np.histogram(this_pix, bins=histogram_bins)
h_chk[..., cell, row, col] += _h
if e_chk is None:
e_chk = np.array(_e)
......@@ -237,7 +244,7 @@ def fit_histogram(
- CHARGE_SHARING_2: sum of two CHARGE_SHARING
- GAUSS: gaussian function
n_sigma (int): to calculate threshold of the peak finder as
thr = n_sigma * sigma0
thr = n_sigma * initial_sigma
ratio (float): ratio parameter of the peak finder
histo (ndarray): histogram bin counts with shape
(n_bins, memory_cells, row, col)
......@@ -253,12 +260,12 @@ def fit_histogram(
CHARGE_SHARING_2=fit_double_charge_sharing,
GAUSS=fit_gauss,
)
sigma0 = 15.
initial_sigma = 15.
fit_func = _funcs[_fit_func]
n_cells, n_rows, n_cols = histo.shape[1:]
_init_array = np.zeros((n_cells, n_rows, n_cols), dtype=np.float32)
g0_chk = _init_array.copy()
peak_positions = _init_array.copy()
sigma_chk = _init_array.copy()
chi2ndf_chk = _init_array.copy()
alpha_chk = _init_array.copy()
......@@ -271,19 +278,19 @@ def fit_histogram(
yerr = histogram_errors(y)
if noise[0, cell, row, col] > 0.:
sigma0 = noise[0, cell, row, col]
initial_sigma = noise[0, cell, row, col]
q, sigma, chi2ndf, alpha = fit_func(
x_rebinned, y, yerr, sigma0, n_sigma, ratio)
x_rebinned, y, yerr, initial_sigma, n_sigma, ratio)
g0_chk[cell, row, col] = q
peak_positions[cell, row, col] = q
sigma_chk[cell, row, col] = sigma
chi2ndf_chk[cell, row, col] = chi2ndf
alpha_chk[cell, row, col] = alpha
return g0_chk, sigma_chk, chi2ndf_chk, alpha_chk
return peak_positions, sigma_chk, chi2ndf_chk, alpha_chk
def erfbox(x, ped, q0, s0):
def erfbox(x, ped, q, sigma):
"""
Compute the error function box used in charge sharing calculations.
......@@ -291,34 +298,34 @@ def erfbox(x, ped, q0, s0):
erfbox is mostly flat between ped and q0 and quickly goes to zero
outside the range [ped, q0]. The function integral is normalized to 1.
The slope of the box sides is given by erf(x)/s0; if s0 == 0 then the
The slope of the box sides is given by erf(x)/sigma; if sigma == 0 then the
function goes abruptly to 0 at the edges.
Args:
x (array-like): Input values
ped (float): Pedestal value
q0 (float): Charge value
s0 (float): Width parameter
sigma (float): Width parameter
Returns:
array-like: Computed erfbox values
"""
if ped == q0:
if ped == q:
return np.zeros_like(x)
if ped > q0:
q0, ped = ped, q0
slope = lambda x: erf(x / abs(s0)) if s0 != 0 else 2 * np.heaviside(x, 1) # noqa
return (slope(x - ped) - slope(x - q0)) * 0.5 / (q0 - ped)
if ped > q:
q, ped = ped, q
def double_peak(x, ped, q_a, sigma_a, alpha, norm_a, q_b, sigma_b, norm_b):
k_a = charge_sharing(x, ped, q_a, sigma_a, alpha, norm_a)
k_b = charge_sharing(x, ped, q_b, sigma_b, alpha, norm_b)
return k_a + k_b
slope = lambda x: erf(x / abs(sigma)) if sigma != 0 else 2 * np.heaviside(x, 1) # noqa
return (slope(x - ped) - slope(x - q)) * 0.5 / (q - ped)
def gauss(x, amp, mean, sigma):
"""
Compute a Gaussian function.
This function calculates the values of a Gaussian distribution
for given input parameters.
"""
z = (x - mean) / sigma
return amp * np.exp(-0.5 * z**2)
......@@ -328,11 +335,7 @@ def charge_sharing(x, ped, q, sigma, alpha, norm):
Model the detector response to a single photon with charge sharing.
This function combines a Gaussian peak
(representing full charge collection)
with a charge sharing tail.
More details on JOURNAL OF SYNCHROTRON RADIATION 23, 1462-1473 (2016)
- f(x) = norm * (gaussian_peak + charge_sharing_tail) / ((1 + α)²)
(representing full charge collection) with a charge sharing tail.
Args:
x (array-like): Input charge values
......@@ -344,22 +347,42 @@ def charge_sharing(x, ped, q, sigma, alpha, norm):
Returns:
array-like: Modeled detector response
"""
# A small threshold to avoid division by zero
small_value_threshold = 1e-10
log_term = (x - ped) / (q - ped)
# Avoid log of zero or negative numbers
log_term = np.maximum(log_term, small_value_threshold)
Notes: More details on JOURNAL OF SYNCHROTRON RADIATION 23, 1462-1473 (2016)
- f(x) = norm * (gaussian_peak + charge_sharing_tail) / ((1 + α)²)
"""
A = ((1. - alpha)**2) / (sigma * np.sqrt(2. * np.pi))
B = 4. * alpha * (1. - alpha)
C = 4. * alpha**2
charge_sh_tail = (B - C * np.log1p(log_term)) * erfbox(x, ped, q, sigma)
charge_sh_tail = (
B - C * np.log1p((x - ped) / (q - ped))) * erfbox(x, ped, q, sigma)
return norm * (charge_sh_tail + gauss(x, A, q, sigma)) / ((1. + alpha)**2)
def double_peak(x, ped, q_a, sigma_a, alpha, norm_a, q_b, sigma_b, norm_b):
"""
Model the detector response for two overlapping peaks with charge sharing.
This function combines two charge sharing models to represent two
overlapping peaks in the detector response.
"""
k_a = charge_sharing(x, ped, q_a, sigma_a, alpha, norm_a)
k_b = charge_sharing(x, ped, q_b, sigma_b, alpha, norm_b)
return k_a + k_b
def set_fit_range(x, x1, x2):
"""
Set the range of x values to be used in fitting.
This function determines the indices of x that fall within the
specified range [x1, x2].
Notes: If x2 <= x1, the function ensures at least two points are included
in the range. The returned x values are cast to np.float64 for
higher precision in fitting.
"""
i1 = 0
i2 = len(x)
i = 0
......@@ -389,7 +412,7 @@ def set_fit_range(x, x1, x2):
def fit_charge_sharing(
x, y, yerr, sigma0, n_sigma, ratio, alpha0=0.3
x, y, yerr, initial_sigma, n_sigma, ratio, alpha0=0.3
):
"""
Perform charge sharing fit using iminuit optimization.
......@@ -401,12 +424,13 @@ def fit_charge_sharing(
x (array-like): Input charge values
y (array-like): Observed counts or intensities
yerr (array-like): Errors on y values
sigma0 (float): Initial guess for peak width
initial_sigma (float): Initial guess for peak width
n_sigma (float): Number of sigmas for threshold calculation
(not used in this implementation)
ratio (float): Ratio parameter (not used in this implementation)
alpha0: Initial guess for alpha
Returns:AC
Returns:
tuple: (q, sigma, chi2ndf, alpha)
q (float): Fitted peak position
sigma (float): Fitted peak width
......@@ -422,15 +446,14 @@ def fit_charge_sharing(
norm = np.sum(y) * (x[1] - x[0])
# Use _peak_position function to find initial q0
_peaks, _ = _peak_position(x, y, thr=n_sigma*sigma0, ratio=ratio)
_peaks, _ = _peak_position(x, y, thr=n_sigma*initial_sigma, ratio=ratio)
if len(_peaks) > 0:
q0 = np.min(x[_peaks])
else:
return -1, -1, -1, -1
x_fit, i1, i2 = set_fit_range(x, 0.5 * q0, q0 + 2. * sigma0)
x_fit, i1, i2 = set_fit_range(x, 0.5 * q0, q0 + 2. * initial_sigma)
x_fit = x_fit
y_fit = y[i1:i2]
yerr_fit = yerr[i1:i2]
......@@ -441,12 +464,12 @@ def fit_charge_sharing(
m = Minuit(
cost_function,
ped=0., q=q0, sigma=sigma0, alpha=alpha0, norm=norm,
error_ped=0.1, error_q=0.1*q0, error_sigma=0.1*sigma0,
ped=0., q=q0, sigma=initial_sigma, alpha=alpha0, norm=norm,
error_ped=0.1, error_q=0.1*q0, error_sigma=0.1*initial_sigma,
error_alpha=0.1, error_norm=0.1*norm,
fix_ped=True,
limit_q=(q0 - sigma0, q0 + sigma0),
limit_sigma=(0.1 * sigma0, 2. * sigma0),
limit_q=(q0 - initial_sigma, q0 + initial_sigma),
limit_sigma=(0.1 * initial_sigma, 2. * initial_sigma),
limit_alpha=(0.01 * alpha0, 1.),
limit_norm=(np.sqrt(0.1 * norm), 3. * norm),
errordef=1, # for least squares cost function
......@@ -454,19 +477,18 @@ def fit_charge_sharing(
)
m.migrad()
m.hesse()
if m.get_fmin().is_valid:
q = m.values['q']
sigma = m.values['sigma']
alpha = m.values['alpha']
chi2ndf = m.fval / (len(x) - len(m.values))
chi2ndf = m.fval / (len(x_fit) - len(m.values))
else:
q, sigma, chi2ndf, alpha = -1000, -1000, -1000, -1000
return q, sigma, chi2ndf, alpha
def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
def fit_double_charge_sharing(x, y, yerr, initial_sigma, n_sigma, ratio):
"""
Fits histogram with sum of two single photon functions with
charge sharing tail.
......@@ -475,9 +497,9 @@ def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
x (array, float): x values
y (array, float): y values
yerr (array, float): errors of the y values
sigma0 (float): rough estimate of peak variance
initial_sigma (float): rough estimate of peak variance
n_sigma (int): to calculate threshold of the peak finder as
thr = n_sigma * sigma0
thr = n_sigma * initial_sigma
ratio (float): ratio parameter of the peak finder
Returns:
......@@ -487,14 +509,14 @@ def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
alpha0 = 0.3
norm = np.sum(y) * (x[1] - x[0])
_peaks, _ = _peak_position(x, y, thr=n_sigma*sigma0, ratio=ratio)
_peaks, _ = _peak_position(x, y, thr=n_sigma*initial_sigma, ratio=ratio)
if len(_peaks) > 0:
q0 = np.min(x[_peaks])
else:
return -1, -1, -1, -1
q_b = 1.1 * q0
x_fit, i1, i2 = set_fit_range(x, 0.5 * q0, q_b + sigma0)
x_fit, i1, i2 = set_fit_range(x, 0.5 * q0, q_b + initial_sigma)
y_fit = y[i1:i2]
yerr_fit = yerr[i1:i2]
......@@ -509,25 +531,25 @@ def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
limit_alpha = (0.01, 1.)
limit_norm_a = (np.sqrt(0.1 * norm), 2. * norm)
limit_q_b = (q_b - sigma0, q_b + sigma0)
limit_sigma = (0.1 * sigma0, 2. * sigma0)
limit_q_b = (q_b - initial_sigma, q_b + initial_sigma)
limit_sigma = (0.1 * initial_sigma, 2. * initial_sigma)
limit_norm_b = (np.sqrt(0.1 * 0.2 * norm), 2 * 0.2 * norm)
m = Minuit(
cost_function,
ped=0.,
q_a=q0,
sigma_a=sigma0,
sigma_a=initial_sigma,
alpha=alpha0,
norm_a=norm,
q_b=q_b,
sigma_b=sigma0,
sigma_b=initial_sigma,
norm_b=norm,
error_ped=0.1, error_q_a=0.1*q0, error_sigma_a=0.1*sigma0,
error_ped=0.1, error_q_a=0.1*q0, error_sigma_a=0.1*initial_sigma,
error_alpha=0.1, error_norm_a=0.1*norm,
error_q_b=0.1*q_b, error_sigma_b=0.1*sigma0,
error_q_b=0.1*q_b, error_sigma_b=0.1*initial_sigma,
error_norm_b=0.1*norm,
limit_q_a=(q0 - sigma0, q0 + sigma0),
limit_q_a=(q0 - initial_sigma, q0 + initial_sigma),
limit_sigma_a=limit_sigma,
limit_alpha=limit_alpha,
limit_norm_a=limit_norm_a,
......@@ -540,19 +562,18 @@ def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio):
)
m.migrad()
m.hesse()
if m.get_fmin().is_valid:
q = m.values['q_a']
sigma = m.values['sigma_a']
alpha = m.values['alpha']
chi2ndf = m.fval / (len(x) - len(m.values))
chi2ndf = m.fval / (len(x_fit) - len(m.values))
else:
q, sigma, chi2ndf, alpha = -1000, -1000, -1000, -1000
return q, sigma, chi2ndf, alpha
def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio):
def fit_gauss(x, y, yerr, initial_sigma, n_sigma, ratio):
"""
Fits histogram with a gaussian function
......@@ -560,9 +581,9 @@ def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio):
x (array, float): x values
y (array, float): y values
yerr (array, float): errors of the y values
sigma0 (float): rough estimate of peak variance
initial_sigma (float): rough estimate of peak variance
n_sigma (int): to calculate threshold of the peak finder as
thr = n_sigma * sigma0
thr = n_sigma * initial_sigma
ratio (float): ratio parameter of the peak finder
Returns:
......@@ -570,15 +591,15 @@ def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio):
(last one alway == 0)
all of them are kept for compatibility with the wrap around function
"""
norm = np.sum(y) * (x[1] - x[0])/np.sqrt(2.*np.pi*sigma0**2)
norm = np.sum(y) * (x[1] - x[0])/np.sqrt(2.*np.pi*initial_sigma**2)
_peaks, _ = _peak_position(x, y, thr=n_sigma*sigma0, ratio=ratio)
_peaks, _ = _peak_position(x, y, thr=n_sigma*initial_sigma, ratio=ratio)
if len(_peaks) > 0:
q0 = np.min(x[_peaks])
else:
return -1, -1, -1, -1
x_fit, i1, i2 = set_fit_range(x, q0 - sigma0, q0 + 2.*sigma0)
x_fit, i1, i2 = set_fit_range(x, q0 - initial_sigma, q0 + 2.*initial_sigma)
y_fit = y[i1:i2]
yerr_fit = yerr[i1:i2]
......@@ -589,23 +610,22 @@ def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio):
m = Minuit(
cost_function,
amp=norm, mean=q0, sigma=sigma0,
error_amp=0.1*norm, error_mean=0.1*sigma0, error_sigma=0.1*sigma0,
amp=norm, mean=q0, sigma=initial_sigma,
error_amp=0.1*norm, error_mean=0.1*initial_sigma, error_sigma=0.1*initial_sigma,
limit_amp=(0.01*norm, 2.*norm),
limit_mean=(q0-sigma0, q0+sigma0),
limit_sigma=(0.1*sigma0, 3.*sigma0),
limit_mean=(q0-initial_sigma, q0+initial_sigma),
limit_sigma=(0.1*initial_sigma, 3.*initial_sigma),
errordef=1, # for least squares cost function
print_level=0,
)
m.migrad()
m.hesse()
if m.get_fmin().is_valid:
q = m.values['mean']
sigma = m.values['sigma']
alpha = 0.
chi2ndf = m.fval / (len(x) - len(m.values))
chi2ndf = m.fval / (len(x_fit) - len(m.values))
else:
q, sigma, chi2ndf, alpha = -1000, -1000, -1000, -1000
return q, sigma, chi2ndf, alpha
......@@ -17,12 +17,13 @@ class StepTimer:
self.t0 = t
self.t_latest = t
def done_step(self, step_name):
def done_step(self, step_name, print_step=True):
"""Record a step timing, since the last call to done_step() or start()
"""
t = perf_counter()
self.steps[step_name].append(t - self.t_latest)
print(f"{step_name}: {t - self.t_latest:.01f} s")
if print_step:
print(f"{step_name}: {t - self.t_latest:.01f} s")
self.t_latest = t
def timespan(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment