diff --git a/notebooks/Jungfrau/Jungfrau_Create_Fit_Spectra_Histos_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_Create_Fit_Spectra_Histos_NBC.ipynb index 65826a29913aa24de5e67038eac0659960047b6e..11765a8d7102e7cbd2c795fbbd31f3cb99312dce 100644 --- a/notebooks/Jungfrau/Jungfrau_Create_Fit_Spectra_Histos_NBC.ipynb +++ b/notebooks/Jungfrau/Jungfrau_Create_Fit_Spectra_Histos_NBC.ipynb @@ -60,7 +60,7 @@ "# parameters for the peak finder\n", "n_sigma = 5. # n of sigma above pedestal threshold\n", "ratio = 0.99 # ratio of the next peak amplitude in the peak_finder\n", - "rebin = 1\n", + "rebin_factor = 1 # Number of adjacent bins to combine before fitting the histogram.\n", "\n", "\n", "def find_das(in_folder, runs, karabo_da):\n", @@ -93,7 +93,6 @@ "import pasha as psh\n", "from extra_data import RunDirectory\n", "from h5py import File as h5file\n", - "from tqdm import tqdm\n", "\n", "import cal_tools.restful_config as rest_cfg\n", "from cal_tools.calcat_interface import JUNGFRAU_CalibrationData\n", @@ -486,7 +485,7 @@ " alpha_map = np.zeros(shape, dtype=dtype)\n", " n_blocks_col = int(shape[-1] / block_size[1])\n", "\n", - " for i, (g0_chk, sigma_chk, chi2ndf_chk, alpha_chk) in tqdm(enumerate(r_maps)):\n", + " for i, (g0_chk, sigma_chk, chi2ndf_chk, alpha_chk) in enumerate(r_maps):\n", "\n", " irow = int(np.floor(i / n_blocks_col)) * block_size[0]\n", " icol = i % n_blocks_col * block_size[1]\n", @@ -522,25 +521,24 @@ "\n", "for da in karabo_da:\n", " _edges = np.array(edges[da])\n", - " x = (_edges[1:] + _edges[:-1]) / 2.0\n", + " bin_center = (_edges[1:] + _edges[:-1]) / 2.0\n", "\n", - " x, _h_spectra = jungfrau_ff.set_histo_range(x, h_spectra[da], fit_h_range)\n", - " print(f'adjusting histogram range: {x[0]} - {x[-1]}')\n", + " bin_center, _h_spectra = jungfrau_ff.set_histo_range(bin_center, h_spectra[da], fit_h_range)\n", + " print(f'adjusting histogram range: {bin_center[0]} - {bin_center[-1]}')\n", " print(f'Histo shape updated from {h_spectra[da].shape} to {_h_spectra.shape}')\n", - " print(f'x-axis: {x.shape}')\n", + " print(f'x-axis: {bin_center.shape}')\n", " chunks = jungfrau_ff.chunk_multi(_h_spectra, block_size)\n", - " pool = multiprocessing.Pool()\n", "\n", " partial_fit = partial(\n", " jungfrau_ff.fit_histogram,\n", - " x,\n", + " bin_center,\n", " fit_func,\n", " n_sigma,\n", - " rebin,\n", + " rebin_factor,\n", " ratio,\n", " const_data[da][\"Noise10Hz\"],\n", " )\n", - " print(\"starting spectra fit\")\n", + " print(f\"Starting spectra fit for {len(chunks)} chunks\")\n", "\n", " step_timer.start()\n", " with multiprocessing.Pool() as pool:\n", @@ -588,7 +586,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.8.11" + "version": "3.11.9" } }, "nbformat": 4, diff --git a/src/cal_tools/jungfrau/jungfrau_ff.py b/src/cal_tools/jungfrau/jungfrau_ff.py index e43b1272d0757aa93bf1f02f0f6aec2863ad12ed..f1c421efe2e55f2baf43572d6b3c968e7fceefc5 100644 --- a/src/cal_tools/jungfrau/jungfrau_ff.py +++ b/src/cal_tools/jungfrau/jungfrau_ff.py @@ -1,24 +1,41 @@ import numpy as np -from XFELDetAna.detectors.jungfrau.fitFuncs import ( - charge_sharing, - chi_square_fit, - double_peak, - gauss, - histogram_errors, -) +from scipy.special import erf +from iminuit import Minuit + + +def histogram_errors(x): + """ + Calculate the errors for histogram bins. + + Args: + x (np.ndarray): Input array of histogram bin counts. + + Returns: + np.ndarray: Array of calculated errors. + """ + out = np.ones_like(x) + + # Handle positive values + positive_mask = x > 0 + out[positive_mask] = np.sqrt(x[positive_mask]) + + # Handle negative values + negative_mask = x < 0 + out[negative_mask] = np.sqrt(-x[negative_mask]) + return out def chunk_multi(data, block_size): """ Chunks input array along the 'row' and 'col' dimensions. - + Args: data (ndarray): input data array, at least 2D. block_size (list or tuple of int): [chunk_row, chunk_col], dimension of the 2-D chunk. - + Returns: list of chunked data. - + Raises: ValueError: If input data is not at least 2D or block_size is invalid. """ @@ -100,15 +117,16 @@ def _peak_position(x, h, thr=75, dist=30., ratio=0.4): """ finds peaks in the histogram very rough, self-made function, should be replaced by better one - + arguments: * x (list, float): histogram bin centers * h (list, float): histogram bin counts * thr (int): lower threshold along x-axis in ADC units to look for a peak * dist (float): minimal distance between the last found peak and the next * ratio (float): minimal ratio between the last found peak and the next - - returns: list of sorted peak positions along the x-axis, index of the peak with lowes value along x-axis + + returns: list of sorted peak positions along the x-axis, index + of the peak with lowes value along x-axis """ imin = 0 peak_bins = [] @@ -122,7 +140,7 @@ def _peak_position(x, h, thr=75, dist=30., ratio=0.4): peak_bins.append(sorted_i[-1] + imin) low_h = ratio * h[sorted_i[-1] + imin] for j in range(1, len(sorted_i)): - ibin = sorted_i[-1 -j] + imin + ibin = sorted_i[-1 - j] + imin if h[ibin] > low_h: min_dist = (x[peak_bins] - x[ibin])**2. > dist**2. if np.all(min_dist): @@ -135,318 +153,459 @@ def _peak_position(x, h, thr=75, dist=30., ratio=0.4): return peak_bins, imin -def fit_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio): +def set_histo_range(bin_centers, histogram, h_range): """ - fits histogram with single photon function with charge sharing tail + Reduces the histogram bin axis to the range specified + + Args: + bin_centers (array, float): the bin centers array + histogram (array, integers): the histogram with shape + (bins, cells, columns, row) + h_range (tuple, float): the (liminf, limsup) of the desired range + + Returns: the new bin centers array and the new histogram - arguments: - * x (array, float): x values - * y (array, float): y values - * yerr (array, float): errors of the y values - * sigma0 (float): rough estimate of peak variance - * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0 - * ratio (float): ratio parameter of the peak finder - - returns: peak position, peak variance, chi2/ndf, charge sharing parameter """ + if len(h_range) != 2: + raise ValueError("h_ranges should have two values.") + min_val, max_val = sorted(h_range) - norm = np.sum(y) * (x[1] - x[0]) - alpha0 = 0.3 - thr = n_sigma * sigma0 - _peaks, imin = _peak_position(x, y, thr, ratio=ratio) - - q = -1. - sigma = -1. - alpha = -1. - chi2ndf = -1. - q0 = -1. + i_min = np.abs(bin_centers - min_val).argmin() + i_max = np.abs(bin_centers - max_val).argmin() - if len(_peaks) > 0: - q0 = np.ma.min(x[_peaks]) - - if q0 > -1.: - limit_q = (q0 - sigma0, q0 + sigma0) - limit_sigma = (0.1 * sigma0, 2. * sigma0) - limit_alpha = (0.01 * alpha0, 1.) - limit_norm = (np.sqrt(0.1 * norm), 3. *norm) - - res = chi_square_fit( - charge_sharing, - x, - y, - yerr, - fit_range=(0.5 * q0, q0 + 2. * sigma0), - ped=0., - q=q0, - sigma=sigma0, - alpha=alpha0, - norm=norm, - limit_q=limit_q, - limit_sigma=limit_sigma, - limit_alpha=limit_alpha, - limit_norm=limit_norm, - fix_ped=True, - print_level=0, - pedantic=False, - ) - values = res[0] - errors = res[1] - chi2 = res[2] - ndf = res[3] - is_valid = res[4] - - if is_valid: - q = values['q'] - sigma = values['sigma'] - alpha = values['alpha'] - chi2ndf = chi2/ndf - else: - q = -1000. - sigma = -1000. - alpha = -1000. - chi2ndf = -1000. + # Ensure i_max is greater than i_min + if i_max <= i_min: + i_max = i_min + 1 - return q, sigma, chi2ndf, alpha + return bin_centers[i_min:i_max], histogram[i_min:i_max, ...] -def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio): +def rebin_histo(hist, bin_centers, rebin_factor): """ - fits histogram with sum of two single photon functions with charge sharing tail - - arguments: - * x (array, float): x values - * y (array, float): y values - * yerr (array, float): errors of the y values - * sigma0 (float): rough estimate of peak variance - * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0 - * ratio (float): ratio parameter of the peak finder - - returns: peak position, peak variance, chi2/ndf, charge sharing parameter (all of them of the first peak) + Rebin a histogram by combining adjacent bins. - """ - norm = np.sum(y) * (x[1] - x[0]) - alpha0 = 0.3 - thr = n_sigma * sigma0 - _peaks, imin = _peak_position(x, y, thr, ratio=ratio) - - qa = -1. - if len(_peaks) > 0: - qa = np.ma.min(x[_peaks]) - - q = -1. - sigma = -1. - alpha = -1. - chi2ndf = -1. - - if qa > -1.: - - qb = 1.1 * qa - limit_q_a = (qa - sigma0, qa + sigma0) - limit_sigma_a = (0.1 * sigma0, 2. * sigma0) - limit_alpha = (0.01, 1.) - limit_norm_a = (np.ma.sqrt(0.1 *norm), 2. * norm) - limit_q_b = (qb - sigma0, qb + sigma0) - limit_sigma_b = (0.1 * sigma0, 2. * sigma0) - limit_norm_b = (np.ma.sqrt(0.1 * 0.2 * norm), 2 * 0.2 *norm) - - res = chi_square_fit( - double_peak, x, y, yerr, - fit_range=(0.5*qa, qb + sigma0), - ped=0., - q_a=qa, - sigma_a=sigma0, - alpha=alpha0, - norm_a=norm, - q_b=qb, - sigma_b=sigma0, - norm_b=norm, - limit_q_a=limit_q_a, - limit_sigma_a=limit_sigma_a, - limit_alpha=limit_alpha, - limit_norm_a=limit_norm_a, - limit_q_b=limit_q_b, - limit_sigma_b=limit_sigma_b, - limit_norm_b=limit_norm_b, - fix_ped=True, - print_level=0, - pedantic=False, - ) - values = res[0] - errors = res[1] - chi2 = res[2] - ndf = res[3] - is_valid = res[4] - - if is_valid: - q = values['q_a'] - sigma = values['sigma_a'] - alpha = values['alpha'] - chi2ndf = chi2/ndf - else: - q = -1000. - sigma = -1000. - alpha = -1000. - chi2ndf = -1000. - - return q, sigma, chi2ndf, alpha + Args: + hist (np.ndarray): Input histogram values. + bin_edges (np.ndarray): Bin edges or centers. + rebin_factor (int): Rebinning factor. Number of adjacent + bins to combine. + Returns: + Tuple[np.ndarray, np.ndarray]: Rebinned histogram values and + corresponding bin edges/centers. -def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio): - """ - fits histogram with a gaussian function - - arguments: - * x (array, float): x values - * y (array, float): y values - * yerr (array, float): errors of the y values - * sigma0 (float): rough estimate of peak variance - * n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0 - * ratio (float): ratio parameter of the peak finder - - returns: peak position, peak variance, chi2/ndf, charge sharing parameter (last one alway == 0) - all of them are kept for compatibility with the wrap around function - + Raises: + ValueError: If the rebinning factor is not an exact divisor + of the number of bins. """ - norm = np.sum(y) * (x[1] - x[0])/np.sqrt(2.*np.pi*sigma0**2) - alpha0 = 0.3 - thr = n_sigma * sigma0 - _peaks, imin = _peak_position(x, y, thr, ratio=ratio) - - q0 = -1000. - if len(_peaks) > 0: - q0 = np.ma.min(x[_peaks]) - - q = -1. - sigma = -1. - alpha = -1. - chi2ndf = -1. - - if q0 > -1000.: - limit_amp = (0.01*norm, 2.*norm) - limit_mean = (q0-sigma0, q0+sigma0) - limit_sigma = (0.1*sigma0, 3.*sigma0) - - res = chi_square_fit( - gauss, x, y, yerr, - fit_range=[q0 - sigma0, q0 + 2.*sigma0], - amp=norm, - mean=q0, - sigma=sigma0, - limit_amp=limit_amp, - limit_mean=limit_mean, - limit_sigma=limit_sigma, - print_level=0, - pedantic=False, - ) - values = res[0] - errors = res[1] - chi2 = res[2] - ndf = res[3] - is_valid = res[4] - - if is_valid: - q = values['mean'] - sigma = values['sigma'] - alpha = 0. - chi2ndf = chi2/ndf - else: - q = -1000. - sigma = -1000. - alpha = -1000. - chi2ndf = -1000. - - return q, sigma, chi2ndf, alpha + if rebin_factor == 1: + return hist, bin_centers + # Check if rebinning is possible + if len(hist) % rebin_factor != 0: + raise ValueError( + "Rebin factor is not an exact divisor " + "of the number of bins.") -def set_histo_range(_x, _h, _h_range): - """ - reduces the histogram bin axis to the range specified - - args: - * _x (array, float): the bin centers array - * _h (array, integers): the histogram with shape (bins, cells, columns, row) - * _h_range (tuple, float): the (liminf, limsup) of the desired range - - returns the new bin centers array and the new histogram - - """ + h_out = None + x_out = bin_centers[::rebin_factor] - i_min = (np.abs(_x - _h_range[0])).argmin() - i_max = (np.abs(_x - _h_range[1])).argmin() - - return _x[i_min:i_max], _h[i_min:i_max] - - -def rebin_histo(h, x, r): - if len(x)%r == 0: - h_out = None - x_out = x[::r] - - for i in range(0, len(h), r): - if h_out is not None: - h_out = np.append(h_out, np.sum(h[i:i+r], axis=0)) - else: - h_out = np.array(np.sum(h[i:i+r], axis=0)) - - else: - raise AttributeError('Rebin factor is not exact divisor of bins!') - + h_out = np.sum( + hist[:len(hist) - (len(hist) % rebin_factor)].reshape(-1, rebin_factor), # noqa + axis=1, + ) + x_out = bin_centers[::rebin_factor] return h_out, x_out -def fit_histogram(x, _fit_func, n_sigma, rebin, ratio, noise, histo): +def fit_histogram( + bin_centers, + _fit_func, + n_sigma, + rebin, + ratio, + noise, + histo, +): """ - wrap around function for fitting of histogram - + Wrap around function for fitting of histogram + Args: x (list, float): - bin centers along x _fit_func (string): which function to use for fit - CHARGE_SHARING: single peak with charge sharing tail - CHARGE_SHARING_2: sum of two CHARGE_SHARING - GAUSS: gaussian function - n_sigma (int): to calculate threshold of the peak finder as thr = n_sigma * sigma0 + n_sigma (int): to calculate threshold of the peak finder as + thr = n_sigma * sigma0 ratio (float): ratio parameter of the peak finder - histo (ndarray): histogram bin counts with shape (n_bins, memory_cells, row, col) + histo (ndarray): histogram bin counts with shape + (n_bins, memory_cells, row, col) Returns: - map of peak values - map of peak variances - - map of chi2/ndf - map of charge sharing parameter values + - map of chi2/ndf """ _funcs = dict( - CHARGE_SHARING=fit_charge_sharing, - GAUSS=fit_gauss, + CHARGE_SHARING=fit_charge_sharing, CHARGE_SHARING_2=fit_double_charge_sharing, + GAUSS=fit_gauss, ) + sigma0 = 15. fit_func = _funcs[_fit_func] - n_cells, n_rows, n_cols = histo.shape[1:] - sigma0 = 15. - _init_array = np.zeros((n_cells, n_rows, n_cols), dtype=np.float32) g0_chk = _init_array.copy() sigma_chk = _init_array.copy() chi2ndf_chk = _init_array.copy() alpha_chk = _init_array.copy() - + for cell in range(n_cells): for row in range(n_rows): for col in range(n_cols): - - y = histo[:, cell, row, col] - y, _x = rebin_histo(y, x, rebin) + y = histo[:, cell, row, col] + y, x_rebinned = rebin_histo(y, bin_centers, rebin) yerr = histogram_errors(y) - + if noise[0, cell, row, col] > 0.: sigma0 = noise[0, cell, row, col] - q, sigma, chi2ndf, alpha = fit_func(_x, y, yerr, sigma0, n_sigma, ratio) + q, sigma, chi2ndf, alpha = fit_func( + x_rebinned, y, yerr, sigma0, n_sigma, ratio) g0_chk[cell, row, col] = q sigma_chk[cell, row, col] = sigma chi2ndf_chk[cell, row, col] = chi2ndf alpha_chk[cell, row, col] = alpha - return g0_chk, sigma_chk, chi2ndf_chk, alpha_chk + + +def erfbox(x, ped, q0, s0): + """ + Compute the error function box used in charge sharing calculations. + + This function models the box-like shape of charge collection efficiency. + + erfbox is mostly flat between ped and q0 and quickly goes to zero + outside the range [ped, q0]. The function integral is normalized to 1. + The slope of the box sides is given by erf(x)/s0; if s0 == 0 then the + function goes abruptly to 0 at the edges. + + Args: + x (array-like): Input values + ped (float): Pedestal value + q0 (float): Charge value + s0 (float): Width parameter + + Returns: + array-like: Computed erfbox values + """ + if ped == q0: + return np.zeros_like(x) + if ped > q0: + q0, ped = ped, q0 + + slope = lambda x: erf(x / abs(s0)) if s0 != 0 else 2 * np.heaviside(x, 1) # noqa + return (slope(x - ped) - slope(x - q0)) * 0.5 / (q0 - ped) + + +def double_peak(x, ped, q_a, sigma_a, alpha, norm_a, q_b, sigma_b, norm_b): + k_a = charge_sharing(x, ped, q_a, sigma_a, alpha, norm_a) + k_b = charge_sharing(x, ped, q_b, sigma_b, alpha, norm_b) + return k_a + k_b + + +def gauss(x, amp, mean, sigma): + z = (x - mean) / sigma + return amp * np.exp(-0.5 * z**2) + + +def charge_sharing(x, ped, q, sigma, alpha, norm): + """ + Model the detector response to a single photon with charge sharing. + + This function combines a Gaussian peak + (representing full charge collection) + with a charge sharing tail. + + More details on JOURNAL OF SYNCHROTRON RADIATION 23, 1462-1473 (2016) + - f(x) = norm * (gaussian_peak + charge_sharing_tail) / ((1 + α)²) + + Args: + x (array-like): Input charge values + ped (float): Pedestal value (usually 0 for pedestal-subtracted data) + q (float): Peak position (full charge collection) + sigma (float): Peak width + alpha (float): Charge sharing probability + norm (float): Normalization factor + + Returns: + array-like: Modeled detector response + """ + # A small threshold to avoid division by zero + small_value_threshold = 1e-10 + log_term = (x - ped) / (q - ped) + # Avoid log of zero or negative numbers + log_term = np.maximum(log_term, small_value_threshold) + + A = ((1. - alpha)**2) / (sigma * np.sqrt(2. * np.pi)) + B = 4. * alpha * (1. - alpha) + C = 4. * alpha**2 + + charge_sh_tail = (B - C * np.log1p(log_term)) * erfbox(x, ped, q, sigma) + return norm * (charge_sh_tail + gauss(x, A, q, sigma)) / ((1. + alpha)**2) + + +def set_fit_range(x, x1, x2): + i1 = 0 + i2 = len(x) + i = 0 + + if x2 > x1: + if x1 > x[0]: + while i < len(x) and x[i] < x1: + i += 1 + i1 = i + else: + i1 = 0 + if x2 < x[-1]: + while i < len(x) and x[i] < x2: + i += 1 + i2 = i + else: + i2 = len(x) + else: + i2 = i1 + + if i2 <= i1: + i2 = i1 + 2 + # Use np.float64 for higher precision when fitting. + # With this higher precision we avoid failed fitting + # on very small difference. + return x[i1:i2].astype(np.float64), i1, i2 + + +def fit_charge_sharing( + x, y, yerr, sigma0, n_sigma, ratio, alpha0=0.3 +): + """ + Perform charge sharing fit using iminuit optimization. + + This function fits the charge sharing model to the provided data using + chi-square minimization via iminuit. + + Args: + x (array-like): Input charge values + y (array-like): Observed counts or intensities + yerr (array-like): Errors on y values + sigma0 (float): Initial guess for peak width + n_sigma (float): Number of sigmas for threshold calculation + (not used in this implementation) + ratio (float): Ratio parameter (not used in this implementation) + alpha0: Initial guess for alpha + Returns:AC + tuple: (q, sigma, chi2ndf, alpha) + q (float): Fitted peak position + sigma (float): Fitted peak width + chi2ndf (float): Chi-square per degree of freedom + alpha (float): Fitted charge sharing probability + + Notes: + - The pedestal (ped) is fixed to 0 in this implementation. + - n_sigma and ratio parameters are included for compatibility + with the original function signature + but are not used in the current implementation. + """ + norm = np.sum(y) * (x[1] - x[0]) + + # Use _peak_position function to find initial q0 + _peaks, _ = _peak_position(x, y, thr=n_sigma*sigma0, ratio=ratio) + if len(_peaks) > 0: + q0 = np.min(x[_peaks]) + else: + return -1, -1, -1, -1 + + x_fit, i1, i2 = set_fit_range(x, 0.5 * q0, q0 + 2. * sigma0) + + x_fit = x_fit + y_fit = y[i1:i2] + yerr_fit = yerr[i1:i2] + + def cost_function(ped, q, sigma, alpha, norm): + return np.sum( + ((y_fit - charge_sharing( + x_fit, ped, q, sigma, alpha, norm)) / yerr_fit)**2) + + m = Minuit( + cost_function, + ped=0., q=q0, sigma=sigma0, alpha=alpha0, norm=norm, + error_ped=0.1, error_q=0.1*q0, error_sigma=0.1*sigma0, + error_alpha=0.1, error_norm=0.1*norm, + fix_ped=True, + limit_q=(q0 - sigma0, q0 + sigma0), + limit_sigma=(0.1 * sigma0, 2. * sigma0), + limit_alpha=(0.01 * alpha0, 1.), + limit_norm=(np.sqrt(0.1 * norm), 3. * norm), + errordef=1, # for least squares cost function + print_level=0, + ) + + m.migrad() + m.hesse() + + if m.get_fmin().is_valid: + q = m.values['q'] + sigma = m.values['sigma'] + alpha = m.values['alpha'] + chi2ndf = m.fval / (len(x) - len(m.values)) + else: + q, sigma, chi2ndf, alpha = -1000, -1000, -1000, -1000 + return q, sigma, chi2ndf, alpha + + +def fit_double_charge_sharing(x, y, yerr, sigma0, n_sigma, ratio): + """ + Fits histogram with sum of two single photon functions with + charge sharing tail. + + Args: + x (array, float): x values + y (array, float): y values + yerr (array, float): errors of the y values + sigma0 (float): rough estimate of peak variance + n_sigma (int): to calculate threshold of the peak finder as + thr = n_sigma * sigma0 + ratio (float): ratio parameter of the peak finder + + Returns: + peak position, peak variance, chi2/ndf, charge sharing parameter + (all of them of the first peak) + """ + alpha0 = 0.3 + norm = np.sum(y) * (x[1] - x[0]) + + _peaks, _ = _peak_position(x, y, thr=n_sigma*sigma0, ratio=ratio) + if len(_peaks) > 0: + q0 = np.min(x[_peaks]) + else: + return -1, -1, -1, -1 + + q_b = 1.1 * q0 + x_fit, i1, i2 = set_fit_range(x, 0.5 * q0, q_b + sigma0) + y_fit = y[i1:i2] + yerr_fit = yerr[i1:i2] + + def cost_function( + ped, q_a, sigma_a, alpha, norm_a, q_b, sigma_b, norm_b + ): + return np.sum(( + (y_fit - double_peak( + x_fit, ped, q_a, sigma_a, alpha, + norm_a, q_b, sigma_b, norm_b + )) / yerr_fit)**2) + + limit_alpha = (0.01, 1.) + limit_norm_a = (np.sqrt(0.1 * norm), 2. * norm) + limit_q_b = (q_b - sigma0, q_b + sigma0) + limit_sigma = (0.1 * sigma0, 2. * sigma0) + limit_norm_b = (np.sqrt(0.1 * 0.2 * norm), 2 * 0.2 * norm) + + m = Minuit( + cost_function, + ped=0., + q_a=q0, + sigma_a=sigma0, + alpha=alpha0, + norm_a=norm, + q_b=q_b, + sigma_b=sigma0, + norm_b=norm, + error_ped=0.1, error_q_a=0.1*q0, error_sigma_a=0.1*sigma0, + error_alpha=0.1, error_norm_a=0.1*norm, + error_q_b=0.1*q_b, error_sigma_b=0.1*sigma0, + error_norm_b=0.1*norm, + limit_q_a=(q0 - sigma0, q0 + sigma0), + limit_sigma_a=limit_sigma, + limit_alpha=limit_alpha, + limit_norm_a=limit_norm_a, + limit_q_b=limit_q_b, + limit_sigma_b=limit_sigma, + limit_norm_b=limit_norm_b, + fix_ped=True, + print_level=0, + errordef=1, # for least squares cost function + ) + + m.migrad() + m.hesse() + + if m.get_fmin().is_valid: + q = m.values['q_a'] + sigma = m.values['sigma_a'] + alpha = m.values['alpha'] + chi2ndf = m.fval / (len(x) - len(m.values)) + else: + q, sigma, chi2ndf, alpha = -1000, -1000, -1000, -1000 + return q, sigma, chi2ndf, alpha + + +def fit_gauss(x, y, yerr, sigma0, n_sigma, ratio): + """ + Fits histogram with a gaussian function + + Args: + x (array, float): x values + y (array, float): y values + yerr (array, float): errors of the y values + sigma0 (float): rough estimate of peak variance + n_sigma (int): to calculate threshold of the peak finder as + thr = n_sigma * sigma0 + ratio (float): ratio parameter of the peak finder + + Returns: + peak position, peak variance, chi2/ndf, charge sharing parameter + (last one alway == 0) + all of them are kept for compatibility with the wrap around function + """ + norm = np.sum(y) * (x[1] - x[0])/np.sqrt(2.*np.pi*sigma0**2) + + _peaks, _ = _peak_position(x, y, thr=n_sigma*sigma0, ratio=ratio) + if len(_peaks) > 0: + q0 = np.min(x[_peaks]) + else: + return -1, -1, -1, -1 + + x_fit, i1, i2 = set_fit_range(x, q0 - sigma0, q0 + 2.*sigma0) + y_fit = y[i1:i2] + yerr_fit = yerr[i1:i2] + + def cost_function(amp, mean, sigma): + return np.sum((( + y_fit - gauss( + x_fit, amp, mean, sigma)) / yerr_fit)**2) + + m = Minuit( + cost_function, + amp=norm, mean=q0, sigma=sigma0, + error_amp=0.1*norm, error_mean=0.1*sigma0, error_sigma=0.1*sigma0, + limit_amp=(0.01*norm, 2.*norm), + limit_mean=(q0-sigma0, q0+sigma0), + limit_sigma=(0.1*sigma0, 3.*sigma0), + errordef=1, # for least squares cost function + print_level=0, + ) + + m.migrad() + m.hesse() + + if m.get_fmin().is_valid: + q = m.values['mean'] + sigma = m.values['sigma'] + alpha = 0. + chi2ndf = m.fval / (len(x) - len(m.values)) + else: + q, sigma, chi2ndf, alpha = -1000, -1000, -1000, -1000 + return q, sigma, chi2ndf, alpha diff --git a/tests/test_jungfrau_ff.py b/tests/test_jungfrau_ff.py index b464e9e993665d3aa84126026ffec3e93cad5cb1..b15db475166a211506c0a936070d16bb48256ad1 100644 --- a/tests/test_jungfrau_ff.py +++ b/tests/test_jungfrau_ff.py @@ -1,11 +1,14 @@ -import time - import numpy as np import pytest +from numpy.testing import assert_array_equal, assert_array_almost_equal +from scipy.stats import norm from cal_tools.jungfrau.jungfrau_ff import ( chunk_multi, fill_histogram, + histogram_errors, + rebin_histo, + set_histo_range, _peak_position, ) @@ -196,3 +199,89 @@ def test_chunk_multi_large_array(): chunks = chunk_multi(data, block_size) assert len(chunks) == 32 assert chunks[0].shape == (1000, 1, 256, 64) + + +def test_histogram_errors(): + # Test case 1: Array with positive, negative, and zero values + input_array = np.array([4, -9, 0, 16, -25, 1]) + expected_output = np.array([2, 3, 1, 4, 5, 1]) + assert_array_equal(histogram_errors(input_array), expected_output) + + # Test case 2: Array with all positive values + input_array = np.array([1, 4, 9, 16]) + expected_output = np.array([1, 2, 3, 4]) + assert_array_equal(histogram_errors(input_array), expected_output) + + # Test case 3: Array with all negative values + input_array = np.array([-1, -4, -9, -16]) + expected_output = np.array([1, 2, 3, 4]) + assert_array_equal(histogram_errors(input_array), expected_output) + + # Test case 4: Array with all zeros + input_array = np.zeros(5) + expected_output = np.ones(5) + assert_array_equal(histogram_errors(input_array), expected_output) + + # Test case 5: Empty array + input_array = np.array([]) + expected_output = np.array([]) + assert_array_equal(histogram_errors(input_array), expected_output) + + # Test case 6: Large values + input_array = np.array([1e6, -1e6]) + expected_output = np.array([1e3, 1e3]) + assert_array_almost_equal(histogram_errors(input_array), expected_output) + + + # Test case 7: Small values + input_array = np.array([1e-6, -1e-6]) + expected_output = np.array([1e-3, 1e-3]) + assert_array_almost_equal(histogram_errors(input_array), expected_output) + + +def test_rebin_histo(): + """ + Test function for rebin_histo. + """ + # Test case 1: Successful rebinning + hist = np.array([1, 2, 3, 4, 5, 6]) + bin_edges = np.array([0, 1, 2, 3, 4, 5, 6]) + rebin_factor = 2 + expected_hist = np.array([3, 7, 11]) + expected_bin_edges = np.array([0, 2, 4, 6]) + h_out, x_out = rebin_histo(hist, bin_edges, rebin_factor) + assert_array_equal(h_out, expected_hist) + assert_array_equal(x_out, expected_bin_edges) + + # Test case 2: Rebinning with factor 3 + hist = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9]) + bin_edges = np.array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + rebin_factor = 3 + expected_hist = np.array([6, 15, 24]) + expected_bin_edges = np.array([0, 3, 6, 9]) + h_out, x_out = rebin_histo(hist, bin_edges, rebin_factor) + assert_array_equal(h_out, expected_hist) + assert_array_equal(x_out, expected_bin_edges) + + # Test case 3: Error case - rebin_factor is not an exact divisor of the number of bin edges + hist = np.array([1, 2, 3, 4, 5]) + bin_edges = np.array([0, 1, 2, 3, 4, 5]) + rebin_factor = 2 + with pytest.raises(ValueError): + rebin_histo(hist, bin_edges, rebin_factor) + + +def test_set_histo_range_gaussian(): + # Generate a Gaussian-like histogram + x = np.linspace(-5, 5, 1000) + h = norm.pdf(x, loc=0, scale=1) + h_range = (-2, 2) + + # Apply set_histo_range + new_x, new_h = set_histo_range(x, h, h_range) + + # Check if the new range is within the specified range + assert new_x[0] >= h_range[0] and new_x[-1] <= h_range[1] + + # Check if the peak is preserved (should be near 0 for this Gaussian) + assert abs(x[np.argmax(h)] - new_x[np.argmax(new_h)]) < 0.1