Skip to content
Snippets Groups Projects
Commit b5a27892 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

RemFeat: Remove pydetlib dependency and optimize fitting performance

parent 1e64d429
No related branches found
No related tags found
1 merge request!1055[Jungfrau][FF] Improve Fitting performance and stop using pydetlib + many refactors.
%% Cell type:markdown id: tags:
# Jungfrau Detector Flatfield Gain Calibration - Part 1: Histogram Creation and Fitting
Author: European XFEL Detector Group, Version: 1.0
This notebook performs the first part of the Jungfrau detector flatfield gain calibration process.
#### Overview
1. Load Raw Data: Read detector data for specified run. Currently works for only one run.
2. Initialize Histograms: Create empty histograms for each module and memory cell
3. Offset Correction: Apply offset subtraction to raw data
4. Histogram Population: Fill histograms with offset-corrected data
5. Histogram Fitting: Apply specified function (Charge Sharing, Double Charge Sharing, or Gaussian)
6. Save Intermediate Results:
- Store histogram data as HDF5 files
- Save fit results (gain map, sigma map, chi-square map, alpha map) as HDF5 files
Note: Results from this notebook are used later for Gain Map and BadPixelsFF Generation.
%% Cell type:code id: tags:
``` python
in_folder = '/gpfs/exfel/exp/SPB/202330/p900343/raw' # RAW data path, required
out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove/JF4M_SPB_gain/r66/no_offset' # Output path for gain data, required
metadata_folder = '.' # Directory containing calibration_metadata.yml when run by xfel-calibrate
runs = [66] # can be a list of runs
sensor_size = [512, 1024] # size of the array in the 'row' and 'col' dimensions
chunked_trains = 1000 # Number of trains per chunk.
gains = [0, 1, 2] # gain bit values
karabo_id = "SPB_IRDA_JF4M" # karabo prefix of Jungfrau devices
karabo_da = [""] # Detector's data aggregators. Leave empty to derive from CALCAT.
creation_time = '' # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
# Parameter conditions
bias_voltage = -1 # bias voltage - Set to -1 to derive from CONTROL file.
integration_time = -1. # integration time - Set to -1 to derive from CONTROL file.
gain_mode = -1 # number of memory cells - Set to -1 to derive from CONTROL file.
gain_setting = -1 # gain setting parameter conditions (High or low CDS) - Set to -1 to derive from CONTROL file.
memory_cells = -1 # number of memory cells - Set to -1 to derive from CONTROL file.
sc_start = -1 # storage cell start value - should be derived from CONTROL file.
h_bins_s = 2 # bin width in ADC units of the histogram
h_range = [-50, 450.] # histogram range in ADC units
block_size = [256, 64] # dimension of the chunks in 'row' and 'col'
control_src_template = '{}/DET/CONTROL'
extra_dims = ['cell', 'row', 'col'] # labels for the DataArray dims after the first
fit_func = 'CHARGE_SHARING' # function used to fit the single photon peak. The 3 available options are: CHARGE_SHARING, CHARGE_SHARING_2, and GAUSS
fit_h_range = [150, 350.] # range of the histogram in x-axis units used in fitting
# parameters for the peak finder
n_sigma = 5. # n of sigma above pedestal threshold
ratio = 0.99 # ratio of the next peak amplitude in the peak_finder
rebin = 1
rebin_factor = 1 # Number of adjacent bins to combine before fitting the histogram.
def find_das(in_folder, runs, karabo_da):
run = runs[0]
if (karabo_da is not None) and karabo_da != [""]:
return karabo_da
from pathlib import Path
import re
karabo_da = set()
for file in Path(in_folder, f'r{run:04d}').iterdir():
da = re.search(r'JNGFR\d{2}', file.name)
if da:
karabo_da.add(da.group())
return sorted(karabo_da)
```
%% Cell type:code id: tags:
``` python
import multiprocessing
import time
from functools import partial
from logging import warning
from pathlib import Path
import numpy as np
import pasha as psh
from extra_data import RunDirectory
from h5py import File as h5file
from tqdm import tqdm
import cal_tools.restful_config as rest_cfg
from cal_tools.calcat_interface import JUNGFRAU_CalibrationData
from cal_tools.jungfrau import jungfrau_ff
from cal_tools.jungfrau.jungfraulib import JungfrauCtrl
from cal_tools.step_timing import StepTimer
from cal_tools.tools import calcat_creation_time
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
in_folder = Path(in_folder)
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True, parents=True)
print(f'Creating directory {out_folder}')
```
%% Cell type:code id: tags:
``` python
begin_stuff = time.localtime()
control_src = control_src_template.format(karabo_id, karabo_da[0])
first_run_folder = in_folder / f'r{runs[0]:04d}'
ctrl_data = JungfrauCtrl(RunDirectory(first_run_folder), control_src)
# Run's creation time:
creation_time = calcat_creation_time(in_folder, runs[0], creation_time)
print(f"Creation time: {creation_time}")
print("Chunked trains: ", chunked_trains)
```
%% Cell type:code id: tags:
``` python
def read_detector_conditions(
run_dc,
run_number,
bias_voltage,
memory_cells,
integration_time,
gain_mode,
gain_setting,
):
"""Get parameter conditions for a run.
Args:
run_dc(extra_data.DataCollection):
run_number(int):
integration_time(float)
bias_voltage(float):
gain_mode(int):
gain_setting(int):
Returns:
float, int, float, int, int, int
"""
## Retrieve DB constants
ctrl_data = JungfrauCtrl(run_dc, control_src)
if memory_cells < 0:
memory_cells, sc_start = ctrl_data.get_memory_cells()
if integration_time < 0:
integration_time = ctrl_data.get_integration_time()
if bias_voltage < 0:
bias_voltage = ctrl_data.get_bias_voltage()
if gain_setting < 0:
gain_setting = ctrl_data.get_gain_setting()
if gain_mode < 0:
gain_mode = ctrl_data.get_gain_mode()
if memory_cells == 16 and gain_mode == 0:
warning(
"Data is in burst mode and dynamic gain. Forcing gain mode to fixed gain.")
gain_mode = 1
return (
bias_voltage,
memory_cells,
sc_start,
integration_time,
gain_mode,
gain_setting,
run_number,
)
condition_lists = {
"integration_time": [],
"memory_cells": [],
"sc_start": [],
"bias_voltage": [],
"gain_setting": [],
"gain_mode": [],
"runs": [],
}
for run in runs:
voltage, cells, sc_start, integration, mode, setting, run_number = read_detector_conditions( # noqa
RunDirectory(in_folder / f"r{run:04d}"),
run,
bias_voltage=bias_voltage,
memory_cells=memory_cells,
integration_time=integration_time,
gain_mode=gain_mode,
gain_setting=gain_setting)
condition_lists["bias_voltage"].append(voltage)
condition_lists["memory_cells"].append(cells)
condition_lists["integration_time"].append(integration)
condition_lists["gain_mode"].append(mode)
condition_lists["gain_setting"].append(setting)
condition_lists["sc_start"].append(sc_start)
# Validate run conditions
runs_results = condition_lists["runs"]
for c, lst in condition_lists.items():
if c == "runs":
continue
if not all(val == lst[0] for val in lst):
warning(
f"{c} is not the same for all runs: {lst} "
f"for runs {runs_results}, respectively"
)
memory_cells = condition_lists["memory_cells"][0]
sc_start = condition_lists["sc_start"][0]
print(f"Memory cells: {memory_cells:d}")
bias_voltage = condition_lists["bias_voltage"][0]
print(f"Bias Voltage: {bias_voltage:3.1f} V")
integration_time = condition_lists["integration_time"][0]
print(f"Exposure Time: {integration_time:1.2f} us")
gain_mode = condition_lists["gain_mode"][0]
print(f"Gain mode: {gain_mode}")
gain_setting = condition_lists["gain_setting"][0]
print(f"Gain setting: {gain_setting}")
```
%% Cell type:code id: tags:
``` python
# Retrieve calibration constants
jf_cal = JUNGFRAU_CalibrationData(
detector_name=karabo_id,
sensor_bias_voltage=bias_voltage,
event_at=creation_time,
modules=karabo_da,
memory_cells=memory_cells,
integration_time=integration_time,
gain_setting=gain_setting,
# Retrieve fixed gain constants for burst mode.
# Dark MDL pushes fixed gain constants for burst mode.
gain_mode=gain_mode,
client=rest_cfg.calibration_client(),
)
jf_metadata = jf_cal.metadata(
calibrations=["Offset10Hz", "Noise10Hz"])
# TODO: display CCV timestamp
const_data = jf_cal.ndarray_map(metadata=jf_metadata)
jf_cal.display_markdown_retrieved_constants(metadata=jf_metadata)
for mod in karabo_da:
constants_data = const_data[mod]
if "Offset10Hz" in constants_data:
# From (512, 1024, 1, 3) to (3, 1, 512, 1024)
constants_data["Offset10Hz"] = np.transpose(
np.swapaxes(constants_data["Offset10Hz"], 0, 1)).astype(np.float32)
else:
raise ValueError(f"Offset constant is not found for {mod}.")
if "Noise10Hz" in constants_data:
# From (512, 1024, 1, 3) to (3, 1, 512, 1024)
constants_data["Noise10Hz"] = np.transpose(
np.swapaxes(constants_data["Noise10Hz"], 0, 1)).astype(np.float32)
else:
constants_data["Noise10Hz"] = None
warning(f"Noise constant is not found for {mod}")
```
%% Cell type:markdown id: tags:
## Filling the histogram
* first it will retreive DB offsets for a given run
* on a file-by-file basis it will perform offset subtraction and fill the histogram
%% Cell type:markdown id: tags:
### Creating empty histogram
%% Cell type:code id: tags:
``` python
h_n_bins = int((h_range[1] - h_range[0])/h_bins_s)
h_bins = np.linspace(h_range[0], h_range[1], h_n_bins, dtype=np.float32)
h_spectra = dict()
edges = dict()
for da in karabo_da:
h_spectra[da] = np.zeros(
(h_n_bins-1, memory_cells, sensor_size[0], sensor_size[1]),
dtype=np.int32,
)
edges[da] = None
```
%% Cell type:code id: tags:
``` python
def correct_train(wid, index, d):
"""Offset correct per train."""
g = gain[index].values
# Jungfrau gains 0[00], 1[01], 3[11]
# Change low gain to 2 for indexing purposes.
g[g==3] = 2
# Select memory cells
if memory_cells > 1:
"""
Even though it is correct to assume that memory cells pattern
can be the same across all trains (for one correction run
taken with one acquisition), it is preferred to not assume
this to account for exceptions that can happen.
"""
m = memcells[index].copy()
# 255 is a cell value pointing to no cell image data (image of 0 pixels).
# Corresponding image will be corrected with constant of cell 0. To avoid values of 0.
# This line is depending on not storing the modified memory cells in the corrected data.
m[m==255] = 0
offset_map_cell = offset_map[:, m, ...] # [16 + empty cell, x, y]
else:
offset_map_cell = offset_map[:, 0, ...]
# Offset correction
offset = np.choose(g, offset_map_cell)
d -= offset
# Sort correct data based on memory cell order.
sort_indices = np.argsort(m)
data_corr[index, ...] = d[sort_indices, ...]
```
%% Cell type:code id: tags:
``` python
step_timer = StepTimer()
n_cpus = multiprocessing.cpu_count()
context = psh.context.ProcessContext(num_workers=n_cpus)
print(f"Using {n_cpus} workers for correction.")
```
%% Cell type:code id: tags:
``` python
for run in runs:
for da in karabo_da:
det_src = f'{karabo_id}/DET/{da}:daqOutput'
offset_map = const_data[da]["Offset10Hz"]
run_folder = in_folder / f'r{run:04d}'
## Offset subtraction & Histogram filling
### performs offset subtraction and chunk_trains fills the histogram
### looping on individual files because single photon runs can be very large
chunks_list = list(RunDirectory(
run_folder, include=f"*{da}*").split_trains(trains_per_part=chunked_trains))
print(f"Processing raw data and filling histogram in {len(chunks_list)} chunks")
for dc_chunk in chunks_list:
memcells = dc_chunk.get_array(
det_src,
'data.memoryCell',
extra_dims=extra_dims[0:1]).astype(np.int16)
adc = dc_chunk.get_array(det_src, 'data.adc', extra_dims=extra_dims).astype(np.float32)
gain = dc_chunk.get_array(det_src, 'data.gain', extra_dims=extra_dims)
step_timer.start()
# gain = gain.where(gain < 2, other = 2).astype(np.int16)
# Allocate shared arrays for corrected data. Used in `correct_train()`
data_corr = context.alloc(shape=adc.shape, dtype=np.float32)
context.map(correct_train, adc)
step_timer.done_step("correct_train")
step_timer.start()
chunks = jungfrau_ff.chunk_multi(data_corr, block_size)
ch_inp = [(c, h_bins) for c in chunks]
with multiprocessing.Pool(processes=min(n_cpus, len(chunks))) as pool:
results = pool.starmap(jungfrau_ff.fill_histogram, ch_inp)
for i, (h_chk, e_chk) in enumerate(results):
if edges[da] is None:
edges[da] = e_chk
n_blocks_col = int(h_spectra[da].shape[-1]/block_size[1])
irow = int(np.floor(i/n_blocks_col)) * block_size[0]
icol = i%n_blocks_col * block_size[1]
h_spectra[da][..., irow:irow+block_size[0], icol:icol+block_size[1]] += h_chk
step_timer.done_step("Histogram created")
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
%% Cell type:markdown id: tags:
### Saving histogram
%% Cell type:code id: tags:
``` python
for da in karabo_da:
# transpose h_spectra for the following cells.
h_spectra[da] = h_spectra[da]
fout_h_path = out_folder/ f"R{runs[0]:04d}_{proposal.upper()}_Gain_Spectra_{da}_Histo.h5"
hists = h_spectra[da]
with h5file(fout_h_path, 'w') as fout_h:
print(f"Saving histograms at {fout_h_path}.")
dset_h = fout_h.create_dataset("histos", data=hists)
dset_e = fout_h.create_dataset("edges", data=edges[da])
dset_noi = fout_h.create_dataset(
"noise_map",
data=const_data[da]["Noise10Hz"])
fout_h.attrs["memory_cells"] = memory_cells
fout_h.attrs["gain_setting"] = gain_setting
fout_h.attrs["gain_mode"] = gain_mode
fout_h.attrs["integration_time"] = integration_time
fout_h.attrs["bias_voltage"] = bias_voltage
fout_h.attrs["creation_time"] = str(creation_time)
```
%% Cell type:code id: tags:
``` python
def create_and_fill_map(r_maps, shape, dtype=np.float32):
g0_map = np.zeros(shape, dtype=dtype)
sigma_map = np.zeros(shape, dtype=dtype)
chi2ndf_map = np.zeros(shape, dtype=dtype)
alpha_map = np.zeros(shape, dtype=dtype)
n_blocks_col = int(shape[-1] / block_size[1])
for i, (g0_chk, sigma_chk, chi2ndf_chk, alpha_chk) in tqdm(enumerate(r_maps)):
for i, (g0_chk, sigma_chk, chi2ndf_chk, alpha_chk) in enumerate(r_maps):
irow = int(np.floor(i / n_blocks_col)) * block_size[0]
icol = i % n_blocks_col * block_size[1]
slice_obj = (
...,
slice(irow, irow + block_size[0]),
slice(icol, icol + block_size[1])
)
g0_map[slice_obj] = g0_chk
sigma_map[slice_obj] = sigma_chk
chi2ndf_map[slice_obj] = chi2ndf_chk
alpha_map[slice_obj] = alpha_chk
return g0_map, sigma_map, chi2ndf_map, alpha_map
```
%% Cell type:markdown id: tags:
## Fitting histograms
%% Cell type:code id: tags:
``` python
fout_temp = f"R{runs[0]:04d}_{proposal.upper()}_Gain_Spectra_{{}}_{fit_func}_Fit.h5"
for da in karabo_da:
_edges = np.array(edges[da])
x = (_edges[1:] + _edges[:-1]) / 2.0
bin_center = (_edges[1:] + _edges[:-1]) / 2.0
x, _h_spectra = jungfrau_ff.set_histo_range(x, h_spectra[da], fit_h_range)
print(f'adjusting histogram range: {x[0]} - {x[-1]}')
bin_center, _h_spectra = jungfrau_ff.set_histo_range(bin_center, h_spectra[da], fit_h_range)
print(f'adjusting histogram range: {bin_center[0]} - {bin_center[-1]}')
print(f'Histo shape updated from {h_spectra[da].shape} to {_h_spectra.shape}')
print(f'x-axis: {x.shape}')
print(f'x-axis: {bin_center.shape}')
chunks = jungfrau_ff.chunk_multi(_h_spectra, block_size)
pool = multiprocessing.Pool()
partial_fit = partial(
jungfrau_ff.fit_histogram,
x,
bin_center,
fit_func,
n_sigma,
rebin,
rebin_factor,
ratio,
const_data[da]["Noise10Hz"],
)
print("starting spectra fit")
print(f"Starting spectra fit for {len(chunks)} chunks")
step_timer.start()
with multiprocessing.Pool() as pool:
r_maps = pool.map(partial_fit, chunks)
step_timer.done_step("r_maps calculation")
step_timer.start()
map_shape = (memory_cells, sensor_size[0], sensor_size[1])
g0_map, sigma_map, chi2ndf_map, alpha_map = create_and_fill_map(r_maps, map_shape)
step_timer.done_step("Finished fitting")
step_timer.start()
fout_path = out_folder / fout_temp.format(da)
with h5file(fout_path, "w") as fout:
dset_chi2 = fout.create_dataset("chi2map", data=np.transpose(chi2ndf_map))
dset_gmap_fit = fout.create_dataset("gainMap_fit", data=np.transpose(g0_map))
dset_std = fout.create_dataset("sigmamap", data=np.transpose(sigma_map))
dset_alpha = fout.create_dataset("alphamap", data=np.transpose(alpha_map))
dset_noi = fout.create_dataset("noise_map", data=const_data[da]["Noise10Hz"])
fout.attrs["memory_cells"] = memory_cells
fout.attrs["integration_time"] = integration_time
fout.attrs["bias_voltage"] = bias_voltage
fout.attrs["gain_setting"] = gain_setting
fout.attrs["gain_mode"] = gain_mode
fout.attrs["creation_time"] = str(creation_time)
fout.attrs["karabo_id"] = karabo_id
step_timer.done_step("Saved fit results")
```
......
This diff is collapsed.
import time
import numpy as np
import pytest
from numpy.testing import assert_array_equal, assert_array_almost_equal
from scipy.stats import norm
from cal_tools.jungfrau.jungfrau_ff import (
chunk_multi,
fill_histogram,
histogram_errors,
rebin_histo,
set_histo_range,
_peak_position,
)
......@@ -196,3 +199,89 @@ def test_chunk_multi_large_array():
chunks = chunk_multi(data, block_size)
assert len(chunks) == 32
assert chunks[0].shape == (1000, 1, 256, 64)
def test_histogram_errors():
# Test case 1: Array with positive, negative, and zero values
input_array = np.array([4, -9, 0, 16, -25, 1])
expected_output = np.array([2, 3, 1, 4, 5, 1])
assert_array_equal(histogram_errors(input_array), expected_output)
# Test case 2: Array with all positive values
input_array = np.array([1, 4, 9, 16])
expected_output = np.array([1, 2, 3, 4])
assert_array_equal(histogram_errors(input_array), expected_output)
# Test case 3: Array with all negative values
input_array = np.array([-1, -4, -9, -16])
expected_output = np.array([1, 2, 3, 4])
assert_array_equal(histogram_errors(input_array), expected_output)
# Test case 4: Array with all zeros
input_array = np.zeros(5)
expected_output = np.ones(5)
assert_array_equal(histogram_errors(input_array), expected_output)
# Test case 5: Empty array
input_array = np.array([])
expected_output = np.array([])
assert_array_equal(histogram_errors(input_array), expected_output)
# Test case 6: Large values
input_array = np.array([1e6, -1e6])
expected_output = np.array([1e3, 1e3])
assert_array_almost_equal(histogram_errors(input_array), expected_output)
# Test case 7: Small values
input_array = np.array([1e-6, -1e-6])
expected_output = np.array([1e-3, 1e-3])
assert_array_almost_equal(histogram_errors(input_array), expected_output)
def test_rebin_histo():
"""
Test function for rebin_histo.
"""
# Test case 1: Successful rebinning
hist = np.array([1, 2, 3, 4, 5, 6])
bin_edges = np.array([0, 1, 2, 3, 4, 5, 6])
rebin_factor = 2
expected_hist = np.array([3, 7, 11])
expected_bin_edges = np.array([0, 2, 4, 6])
h_out, x_out = rebin_histo(hist, bin_edges, rebin_factor)
assert_array_equal(h_out, expected_hist)
assert_array_equal(x_out, expected_bin_edges)
# Test case 2: Rebinning with factor 3
hist = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
bin_edges = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
rebin_factor = 3
expected_hist = np.array([6, 15, 24])
expected_bin_edges = np.array([0, 3, 6, 9])
h_out, x_out = rebin_histo(hist, bin_edges, rebin_factor)
assert_array_equal(h_out, expected_hist)
assert_array_equal(x_out, expected_bin_edges)
# Test case 3: Error case - rebin_factor is not an exact divisor of the number of bin edges
hist = np.array([1, 2, 3, 4, 5])
bin_edges = np.array([0, 1, 2, 3, 4, 5])
rebin_factor = 2
with pytest.raises(ValueError):
rebin_histo(hist, bin_edges, rebin_factor)
def test_set_histo_range_gaussian():
# Generate a Gaussian-like histogram
x = np.linspace(-5, 5, 1000)
h = norm.pdf(x, loc=0, scale=1)
h_range = (-2, 2)
# Apply set_histo_range
new_x, new_h = set_histo_range(x, h, h_range)
# Check if the new range is within the specified range
assert new_x[0] >= h_range[0] and new_x[-1] <= h_range[1]
# Check if the peak is preserved (should be near 0 for this Gaussian)
assert abs(x[np.argmax(h)] - new_x[np.argmax(new_h)]) < 0.1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment