Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • calibration/pycalibration
1 result
Show changes
Commits on Source (15)
%% Cell type:markdown id: tags:
# Jungfrau Dark Image Characterization #
Author: European XFEL Detector Group, Version: 2.0
Analyzes Jungfrau dark image data to deduce offset, noise and resulting bad pixel maps
%% Cell type:code id: tags:
``` python
in_folder = '/gpfs/exfel/exp/SPB/202130/p900204/raw/' # folder under which runs are located, required
out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove' # path to place reports at, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate
run_high = 141 # run number for G0 dark run, required
run_med = 142 # run number for G1 dark run, required
run_low = 143 # run number for G2 dark run, required
# Parameters used to access raw data.
karabo_da = ['JNGFR01', 'JNGFR02','JNGFR03','JNGFR04', 'JNGFR05', 'JNGFR06','JNGFR07','JNGFR08'] # list of data aggregators, which corresponds to different JF modules
karabo_id = 'SPB_IRDA_JF4M' # karabo_id (detector identifier) prefix of Jungfrau detector to process.
karabo_id_control = '' # if control is on a different ID, set to empty string if it is the same a karabo-id
receiver_template = 'JNGFR{:02}' # inset for receiver devices
instrument_source_template = '{}/DET/{}:daqOutput' # template for instrument source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput'
ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control)
# Parameters for calibration database and storing constants.
use_dir_creation_date = True # use dir creation date
cal_db_interface = 'tcp://max-exfl-cal001:8016#8045' # calibrate db interface to connect to
cal_db_timeout = 300000 # timeout on caldb requests
local_output = True # output constants locally
db_output = False # output constants to database
# Parameters affecting creating dark calibration constants.
badpixel_threshold_sigma = 5. # bad pixels defined by values outside n times this std from median
offset_abs_threshold_low = [1000, 10000, 10000] # absolute bad pixel threshold in terms of offset, lower values
offset_abs_threshold_high = [8000, 15000, 15000] # absolute bad pixel threshold in terms of offset, upper values
max_trains = 1000 # Maximum trains to process darks. Set to 0 to process all available train images. 1000 trains is enough resolution to create the dark constants
min_trains = 100 # Minimum number of trains to process dark constants. Raise a warning if the run has fewer trains.
manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values
time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
# Parameters to be used for injecting dark calibration constants.
integration_time = -1 # Integration time in us. Set to -1 to overwrite by value in file.
gain_setting = -1 # 0 for dynamic, forceswitchg1, forceswitchg2, 1 for dynamichg0, fixgain1, fixgain2. Set to overwrite by value in file.
gain_mode = -1 # 1 if medium and low runs are fixgain1 and fixgain2, otherwise 0. Set to -1 to overwrite by value in file.
bias_voltage = -1 # sensor bias voltage in V, will be overwritten by value in file
memory_cells = -1 # Number of memory cells.
# Parameters used for plotting
detailed_report = False
# TODO: this is used for only Warning check at AGIPD dark.
# Need to rethink if it makes sense to use it here as well.
operation_mode = 'ADAPTIVE_GAIN' # Detector operation mode, optional
```
%% Cell type:code id: tags:
``` python
import os
import warnings
from datetime import timedelta
from logging import warning
warnings.filterwarnings('ignore')
from pathlib import Path
from tempfile import NamedTemporaryFile
import matplotlib
import matplotlib.pyplot as plt
import multiprocessing
import numpy as np
import pasha as psh
import yaml
from IPython.display import Markdown, display
from extra_data import RunDirectory
matplotlib.use('agg')
%matplotlib inline
from XFELDetAna.plotting.heatmap import heatmapPlot
from XFELDetAna.plotting.histogram import histPlot
from cal_tools import step_timing
from cal_tools.jungfrau import jungfraulib
from cal_tools.calcat_interface2 import (
CalibrationData,
JUNGFRAUConditions,
)
from cal_tools.constants import inject_ccv, write_ccv
from cal_tools.enums import BadPixels, JungfrauGainMode
from cal_tools.tools import (
get_dir_creation_date,
get_pdu_from_db,
get_random_db_interface,
get_report,
save_const_to_h5,
send_to_db,
from cal_tools.jungfrau import jungfraulib
from cal_tools.restful_config import (
extra_calibration_client,
)
from iCalibrationDB import Conditions, Constants
from cal_tools.tools import calcat_creation_time
matplotlib.use('agg')
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
# Constants relevant for the analysis
run_nums = [run_high, run_med, run_low] # run number for G0/HG0, G1, G2
sensor_size = (1024, 512)
gains = [0, 1, 2]
fixed_settings = [
JungfrauGainMode.FIX_GAIN_1.value, JungfrauGainMode.FIX_GAIN_2.value]
dynamic_settings = [
JungfrauGainMode.FORCE_SWITCH_HG1.value, JungfrauGainMode.FORCE_SWITCH_HG2.value]
old_fixed_settings = ["fixgain1", "fixgain2"]
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run_high)
print(f"Using {creation_time} as creation time")
creation_time = calcat_creation_time(in_folder, run_high, creation_time)
print(f"Using {creation_time} as creation time")
os.makedirs(out_folder, exist_ok=True)
cal_db_interface = get_random_db_interface(cal_db_interface)
print(f'Calibration database interface: {cal_db_interface}')
if karabo_id_control == "":
karabo_id_control = karabo_id
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = f"proposal:{proposal} runs:{run_high} {run_med} {run_low}"
report = get_report(metadata_folder)
step_timer = step_timing.StepTimer()
```
%% Cell type:markdown id: tags:
## Reading control data
%% Cell type:code id: tags:
``` python
step_timer.start()
gain_runs = dict()
med_low_settings = []
ctrl_src = ctrl_source_template.format(karabo_id_control)
run_nums = jungfraulib.sort_runs_by_gain(
raw_folder=in_folder,
runs=run_nums,
ctrl_src=ctrl_src,
)
_gain_mode = None
for gain, run_n in enumerate(run_nums):
run_dc = RunDirectory(f"{in_folder}/r{run_n:04d}/")
gain_runs[run_n] = [gain, run_dc]
ctrl_data = jungfraulib.JungfrauCtrl(run_dc, ctrl_src)
# Read control data for the high gain run only.
if gain == 0:
run_mcells, sc_start = ctrl_data.get_memory_cells()
if integration_time < 0:
integration_time = ctrl_data.get_integration_time()
print(f"Integration time is {integration_time} us.")
else:
print(f"Integration time is manually set to {integration_time} us.")
if bias_voltage < 0:
bias_voltage = ctrl_data.get_bias_voltage()
print(f"Bias voltage is {bias_voltage} V.")
else:
print(f"Bias voltage is manually set to {bias_voltage} V.")
if gain_setting < 0:
gain_setting = ctrl_data.get_gain_setting()
print(f"Gain setting is {gain_setting} ({ctrl_data.run_settings})")
else:
print(f"Gain setting is manually set to {gain_setting}.")
if run_mcells == 1:
memory_cells = 1
print('Dark runs in single cell mode, '
f'storage cell start: {sc_start:02d}')
else:
memory_cells = 16
print('Dark runs in burst mode, '
f'storage cell start: {sc_start:02d}')
else: # medium and low gain
_gain_mode = ctrl_data.get_gain_mode()
med_low_settings.append(ctrl_data.run_mode)
# TODO: consider updating this cell into something similar to agipdlib.AgipdCtrlsRuns()
if gain_mode < 0:
gain_mode = _gain_mode
print(f"Gain mode is {gain_mode} ({med_low_settings})")
else:
print(f"Gain mode is manually set to {gain_mode}.")
step_timer.done_step(f'Reading control data.')
```
%% Cell type:code id: tags:
``` python
step_timer.start()
# set the operating condition
condition = Conditions.Dark.jungfrau(
conditions = JUNGFRAUConditions(
sensor_bias_voltage=bias_voltage,
memory_cells=memory_cells,
bias_voltage=bias_voltage,
integration_time=integration_time,
gain_setting=gain_setting,
gain_mode=gain_mode,
)
db_modules = get_pdu_from_db(
karabo_id=karabo_id,
karabo_da=karabo_da,
constant=Constants.jungfrau.Offset(),
condition=condition,
cal_db_interface=cal_db_interface,
snapshot_at=creation_time)
step_timer.done_step('Set conditions and get PDU names from CalCat.')
```
cc = extra_calibration_client()
%% Cell type:code id: tags:
pdus = cc.pdus_by_detector_name(
det_name=karabo_id, snapshot_at=creation_time)
``` python
# Start retrieving existing constants for comparison
step_timer.start()
mod_x_const = [(mod, const) for const in ["Offset", "Noise", "BadPixelsDark"] for mod in karabo_da]
from cal_tools.tools import get_from_db
from datetime import timedelta
da_to_pdu = dict()
pdu_to_uuid = dict()
for pdu in pdus:
if pdu['karabo_da'] in karabo_da: # exclude unselected das
da_to_pdu[pdu['karabo_da']] = pdu['physical_name']
pdu_to_uuid[pdu['physical_name']] = pdu['uuid']
first_pdu = pdus[0]
detector_info = first_pdu['detector']
detector_info['detector_type'] = first_pdu['detector_type']['name']
def retrieve_old_constant(mod, const):
dconst = getattr(Constants.jungfrau, const)()
data, mdata = get_from_db(
karabo_id=karabo_id,
karabo_da=mod,
constant=dconst,
condition=condition,
empty_constant=None,
cal_db_interface=cal_db_interface,
creation_time=creation_time-timedelta(seconds=60) if creation_time else None,
strategy="pdu_prior_in_time",
verbosity=1,
timeout=cal_db_timeout
)
if mdata is None or data is None:
timestamp = "Not found"
filepath = None
h5path = None
else:
timestamp = mdata.calibration_constant_version.begin_at.isoformat()
filepath = os.path.join(
mdata.calibration_constant_version.hdf5path,
mdata.calibration_constant_version.filename
)
h5path = mdata.calibration_constant_version.h5path
return data, timestamp, filepath, h5path
old_retrieval_pool = multiprocessing.Pool()
old_retrieval_res = old_retrieval_pool.starmap_async(
retrieve_old_constant, mod_x_const
)
old_retrieval_pool.close()
step_timer.done_step('Retrieved old dark constants for comparison.')
step_timer.done_step('Set conditions and get PDU names from CalCat.')
```
%% Cell type:code id: tags:
``` python
# Use only high gain threshold for all gains in case of fixed_gain.
if gain_mode: # fixed_gain
offset_abs_threshold = [[offset_abs_threshold_low[0]]*3, [offset_abs_threshold_high[0]]*3]
else:
offset_abs_threshold = [offset_abs_threshold_low, offset_abs_threshold_high]
```
%% Cell type:code id: tags:
``` python
context = psh.context.ThreadContext(num_workers=memory_cells)
```
%% Cell type:code id: tags:
``` python
"""
All jungfrau runs are taken through one acquisition, except for the forceswitch runs.
While taking non-fixed dark runs, a procedure of multiple acquisitions is used to switch the storage cell indices.
This is done for medium and low gain dark dynamic runs, only [forceswitchg1, forceswitchg2]:
Switching the cell indices in burst mode is a work around for hardware procedure
deficiency that produces wrong data for dark runs except for the first storage cell.
This is why multiple acquisitions are taken to switch the used storage cells and
acquire data through two cells for each of the 16 cells instead of acquiring darks through all 16 cells.
"""
print(f"Maximum trains to process is set to {max_trains}")
noise_map = dict()
offset_map = dict()
bad_pixels_map = dict()
for mod in karabo_da:
step_timer.start()
instrument_src = instrument_source_template.format(
karabo_id, receiver_template.format(int(mod[-2:])))
print(f"\n- Instrument data path for {mod} is {instrument_src}.")
# (1024, 512, 1 or 16, 3)
offset_map[mod] = context.alloc(
shape=(sensor_size+(memory_cells, 3)), fill=0, dtype=np.float32)
noise_map[mod] = context.alloc(like=offset_map[mod], fill=0)
bad_pixels_map[mod] = context.alloc(shape=offset_map[mod].shape, dtype=np.uint32, fill=0)
for run_n, [gain, run_dc] in gain_runs.items():
def process_cell(worker_id, array_index, cell_number):
cell_slice_idx = acelltable == cell_number
if cell_slice_idx.sum() == 0:
# This cell is not in the data (or it's deliberated excluded)
bad_pixels_map[mod][..., cell_number, gain] = BadPixels.NO_DARK_DATA.value
offset_map[mod][..., cell_number, gain] = np.nan
noise_map[mod][..., cell_number, gain] = np.nan
return
thiscell = images[..., cell_slice_idx] # [1024, 512, n_trains]
# Identify cells/trains with images of 0 pixels.
# TODO: An investigation is ongoing by DET to identify reason for these empty images.
nonzero_adc = np.any(thiscell != 0 , axis=(0, 1)) # [n_trains]
# Exclude empty images with 0 pixels, before calculating offset and noise
thiscell = thiscell[..., nonzero_adc]
offset_map[mod][..., cell_number, gain] = np.mean( # [1024, 512]
thiscell, axis=2, dtype=np.float32)
noise_map[mod][..., cell_number, gain] = np.std( # [1024, 512]
thiscell, axis=2, dtype=np.float32)
del thiscell
# Check if there are wrong bad gain values.
# 1. Exclude empty images.
# 2. Indicate pixels with wrong gain value for any train for each cell.
# TODO: mean is used to use thresholds for accepting gain values, even if not 0 mean value.
gain_avg = np.mean( # [1024, 512]
gain_vals[..., cell_slice_idx][..., nonzero_adc],
axis=2, dtype=np.float32
)
# Assign WRONG_GAIN_VALUE for a pixel in a badpixel map for all gains.
bad_pixels_map[mod][:, :,cell_number][gain_avg != raw_g] |= BadPixels.WRONG_GAIN_VALUE.value
print(f"Gain stage {gain}, run {run_n}")
# load shape of data for memory cells, and detector size (imgs, cells, x, y)
n_trains = run_dc[instrument_src, "data.adc"].shape[0]
# load number of data available, including trains with empty data.
all_trains = len(run_dc.train_ids)
instr_dc = run_dc.select(instrument_src, require_all=True)
empty_trains = all_trains - n_trains
if empty_trains != 0:
print(f"{mod} has {empty_trains} empty trains out of {all_trains} trains")
if max_trains > 0:
n_trains = min(n_trains, max_trains)
print(f"Processing {n_trains} images.")
if n_trains == 0:
raise ValueError(f"{run_n} has no trains to process.")
if n_trains < min_trains:
warning(f"Less than {min_trains} trains are available in RAW data.")
# Select only requested number of images to process darks.
instr_dc = instr_dc.select_trains(np.s_[:n_trains])
images = np.transpose(
instr_dc[instrument_src, "data.adc"].ndarray(), (3, 2, 1, 0))
acelltable = np.transpose(instr_dc[instrument_src, "data.memoryCell"].ndarray())
gain_vals = np.transpose(
instr_dc[instrument_src, "data.gain"].ndarray(), (3, 2, 1, 0))
# define gain value as saved in raw gain map
raw_g = 3 if gain == 2 else gain
if memory_cells == 1:
acelltable -= sc_start
# Only for dynamic medium and low gain runs [forceswitchg1, forceswitchg2] in burst mode.
if (
gain_mode == 0 and # dynamic gain mode
gain > 0 and # Medium and low runs
memory_cells == 16 and # Burst mode
acelltable.shape[0] == 2 # forceswitchg1 and forceswitchg2 acquired with the MDL device.
):
# 255 similar to the receiver which uses the 255
# value to indicate a cell without an image.
# image shape for forceswitchg1 and forceswitchg2 = (1024, 512, 2, trains)
# compared to expected shape of (1024, 512, 16, trains) for high gain run.
acelltable[1:] = 255
# Calculate offset and noise maps
context.map(process_cell, range(memory_cells))
cells_missing = (bad_pixels_map[mod][0, 0, :, gain] & BadPixels.NO_DARK_DATA) > 0
if np.any(cells_missing):
print(f"No dark data in gain stage {gain} found for cells", np.nonzero(cells_missing)[0])
del images
del acelltable
del gain_vals
step_timer.done_step('Creating Offset and noise constants for a module.')
```
%% Cell type:code id: tags:
``` python
if detailed_report:
display(Markdown("## Offset and Noise Maps:"))
display(Markdown(
"Below offset and noise maps for the high ($g_0$) gain stage are shown, "
"alongside the distribution of these values. One expects block-like "
"structures mapping to the ASICs of the detector"))
g_name = ['G0', 'G1', 'G2']
g_range = [(0, 8000), (8000, 16000), (8000, 16000)]
n_range = [(0., 50.), (0., 50.), (0., 50.)]
unit = '[ADCu]'
# TODO: Fix plots arrangment and speed for Jungfrau burst mode.
step_timer.start()
for pdu, mod in zip(db_modules, karabo_da):
for mod, pdu in da_to_pdu.items():
for g_idx in gains:
for cell in range(0, memory_cells):
f_o0 = heatmapPlot(
np.swapaxes(offset_map[mod][..., cell, g_idx], 0, 1),
y_label="Row",
x_label="Column",
lut_label=unit,
aspect=1.,
vmin=g_range[g_idx][0],
vmax=g_range[g_idx][1],
title=f'Pedestal {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})')
fo0, ax_o0 = plt.subplots()
res_o0 = histPlot(
ax_o0, offset_map[mod][..., cell, g_idx],
bins=800,
range=g_range[g_idx],
facecolor='b',
histotype='stepfilled',
)
ax_o0.tick_params(axis='both',which='major',labelsize=15)
ax_o0.set_title(
f'Module pedestal distribution - Cell {cell:02d} - Module {mod} ({pdu})',
fontsize=15)
ax_o0.set_xlabel(f'Pedestal {g_name[g_idx]} {unit}',fontsize=15)
ax_o0.set_yscale('log')
f_n0 = heatmapPlot(
np.swapaxes(noise_map[mod][..., cell, g_idx], 0, 1),
y_label="Row",
x_label="Column",
lut_label= unit,
aspect=1.,
vmin=n_range[g_idx][0],
vmax=n_range[g_idx][1],
title=f"RMS noise {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})",
)
fn0, ax_n0 = plt.subplots()
res_n0 = histPlot(
ax_n0,
noise_map[mod][..., cell, g_idx],
bins=100,
range=n_range[g_idx],
facecolor='b',
histotype='stepfilled',
)
ax_n0.tick_params(axis='both', which='major', labelsize=15)
ax_n0.set_title(
f'Module noise distribution - Cell {cell:02d} - Module {mod} ({pdu})',
fontsize=15)
ax_n0.set_xlabel(
f'RMS noise {g_name[g_idx]} ' + unit, fontsize=15)
plt.show()
step_timer.done_step('Plotting offset and noise maps.')
```
%% Cell type:markdown id: tags:
## Bad Pixel Map ###
The bad pixel map is deduced by comparing offset and noise of each pixel ($v_i$) and each gain ($g$) against the median value for that gain stage:
$$
v_i > \mathrm{median}(v_{k,g}) + n \sigma_{v_{k,g}}
$$
or
$$
v_i < \mathrm{median}(v_{k,g}) - n \sigma_{v_{k,g}}
$$
Values are encoded in a 32 bit mask, where for the dark image deduced bad pixels the following non-zero entries are relevant:
%% Cell type:code id: tags:
``` python
def print_bp_entry(bp):
print("{:<30s} {:032b} -> {}".format(bp.name, bp.value, int(bp.value)))
print_bp_entry(BadPixels.OFFSET_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.NOISE_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.OFFSET_NOISE_EVAL_ERROR)
print_bp_entry(BadPixels.NO_DARK_DATA)
print_bp_entry(BadPixels.WRONG_GAIN_VALUE)
def eval_bpidx(d):
mdn = np.nanmedian(d, axis=(0, 1))[None, None, :, :]
std = np.nanstd(d, axis=(0, 1))[None, None, :, :]
idx = (d > badpixel_threshold_sigma*std+mdn) | (d < (-badpixel_threshold_sigma)*std+mdn)
return idx
```
%% Cell type:code id: tags:
``` python
step_timer.start()
for pdu, mod in zip(db_modules, karabo_da):
for mod, pdu in da_to_pdu.items():
display(Markdown(f"### Badpixels for module {mod} ({pdu}):"))
offset_abs_threshold = np.array(offset_abs_threshold)
bad_pixels_map[mod][eval_bpidx(offset_map[mod])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(offset_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][eval_bpidx(noise_map[mod])] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(noise_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][(offset_map[mod] < offset_abs_threshold[0][None, None, None, :]) | (offset_map[mod] > offset_abs_threshold[1][None, None, None, :])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value # noqa
if detailed_report:
for g_idx in gains:
for cell in range(memory_cells):
bad_pixels = bad_pixels_map[mod][:, :, cell, g_idx]
fn_0 = heatmapPlot(
np.swapaxes(bad_pixels, 0, 1),
y_label="Row",
x_label="Column",
lut_label=f"Badpixels {g_name[g_idx]} [ADCu]",
aspect=1.,
vmin=0, vmax=5,
title=f'G{g_idx} Bad pixel map - Cell {cell:02d} - Module {mod} ({pdu})')
step_timer.done_step('Creating bad pixels constant')
```
%% Cell type:markdown id: tags:
## Inject and save calibration constants
%% Cell type:code id: tags:
``` python
step_timer.start()
for mod, db_mod in zip(karabo_da, db_modules):
constants = {
'Offset': np.moveaxis(offset_map[mod], 0, 1),
'Noise': np.moveaxis(noise_map[mod], 0, 1),
'BadPixelsDark': np.moveaxis(bad_pixels_map[mod], 0, 1),
}
constants = {}
for mod, pdu in da_to_pdu.items():
constants['Offset10Hz'] = np.moveaxis(offset_map[mod], 0, 1)
constants['Noise10Hz'] = np.moveaxis(noise_map[mod], 0, 1)
constants['BadPixelsDark10Hz'] = np.moveaxis(bad_pixels_map[mod], 0, 1)
md = None
upper_lower_dev = {"Integration Time": time_limits}
for const_name, const_data in constants.items():
for key, const_data in constants.items():
with NamedTemporaryFile() as tempf:
ccv_root = write_ccv(
tempf.name,
pdu,
pdu_to_uuid[pdu],
detector_info["detector_type"],
const_name,
conditions,
creation_time,
proposal,[run_high, run_med, run_low],
const_data,
# TODO: what to call the detector dims??
# calng & extra use slow_scan, fast_scan
# dynamicFF use ss and fs
dims=["fs", "ss", "cells", "gain"],
lower_deviations=upper_lower_dev,
upper_deviations=upper_lower_dev,
)
const = getattr(Constants.jungfrau, key)()
const.data = const_data
if db_output:
inject_ccv(tempf.name, ccv_root, metadata_folder)
for parm in condition.parameters:
if parm.name == "Integration Time":
parm.lower_deviation = time_limits
parm.upper_deviation = time_limits
if db_output:
md = send_to_db(
db_module=db_mod,
karabo_id=karabo_id,
constant=const,
condition=condition,
file_loc=file_loc,
report_path=report,
cal_db_interface=cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout,
)
if local_output:
md = save_const_to_h5(
db_module=db_mod,
karabo_id=karabo_id,
constant=const,
condition=condition,
data=const.data,
file_loc=file_loc,
report=report,
creation_time=creation_time,
out_folder=out_folder,
)
print(f"Calibration constant {key} is stored locally at {out_folder}.\n")
if local_output:
ofile = f"{out_folder}/const_{const_name}_{pdu}.h5"
if os.path.isfile(ofile):
print(f'File {ofile} already exists and will be overwritten\n')
from shutil import copyfile
copyfile(tempf.name, ofile)
print(f"Calibration constant {const_name} is stored locally at {out_folder}.\n")
print("Constants parameter conditions are:\n")
print(
f"• Bias voltage: {bias_voltage}\n"
f"• Memory cells: {memory_cells}\n"
f"• Integration time: {integration_time}\n"
f"• Gain setting: {gain_setting}\n"
f"• Gain mode: {gain_mode}\n"
f"• Creation time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n") # noqa
step_timer.done_step("Injecting constants.")
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
%% Cell type:code id: tags:
``` python
# now we need the old constants
# Start retrieving existing constants for comparison
step_timer.start()
old_const = {}
old_mdata = {}
old_retrieval_res.wait()
for (mod, const), (data, timestamp, filepath, h5path) in zip(
mod_x_const, old_retrieval_res.get()):
old_const.setdefault(mod, {})[const] = data
old_mdata.setdefault(mod, {})[const] = {
"timestamp": timestamp,
"filepath": filepath,
"h5path": h5path,
}
jf_caldata = CalibrationData.from_condition(
conditions,
karabo_id,
event_at=creation_time-timedelta(seconds=60) if creation_time else None,
begin_at_strategy="prior",
)
for mod in karabo_da:
old_const[mod] = {}
old_mdata[mod] = {}
for cname in constants.keys():
cmdata = jf_caldata.get(cname, None)
data_found = cmdata and mod in cmdata.aggregator_names
if data_found:
old_const[mod][cname] = cmdata[mod].ndarray()
old_mdata[mod][cname] = {
"timestamp": cmdata[mod].metadata("begin_validity_at"),
"filepath": str(cmdata[mod].get_full_path()),
"dataset": cmdata[mod].dataset,
}
else:
old_const[mod][cname] = None
old_mdata[mod][cname] = {
"timestamp": "Not found",
"filepath": None,
"dataset": None,
}
step_timer.done_step('Retrieved old dark constants for comparison.')
```
%% Cell type:code id: tags:
``` python
display(Markdown("## The following pre-existing constants are used for comparison:"))
for mod, consts in old_mdata.items():
pdu = db_modules[karabo_da.index(mod)]
pdu = da_to_pdu[mod]
display(Markdown(f"- {mod} ({pdu})"))
for const in consts:
display(Markdown(f" - {const} at {consts[const]['timestamp']}"))
display(Markdown(f" - {const}: {consts[const]['timestamp']}"))
# saving locations of old constants for summary notebook
with open(f"{metadata_folder or out_folder}/module_metadata_{mod}.yml", "w") as fd:
yaml.safe_dump(
{
"module": mod,
"pdu": pdu,
"old-constants": old_mdata[mod],
},
fd,
)
```
......
%% Cell type:markdown id: tags:
# Jungfrau Dark Summary
Author: European XFEL Detector Department, Version: 1.0
Summary for process dark constants and a comparison with previously injected constants with the same conditions.
%% Cell type:code id: tags:
``` python
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/jungfrau_assembeled_dark" # path to output to, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate.
# Parameters used to access raw data.
karabo_da = [] # list of data aggregators, which corresponds to different JF modules. This is only needed for the detectors of one module.
karabo_id = "FXE_XAD_JF1M" # detector identifier.
# Parameters to be used for injecting dark calibration constants.
local_output = True # Boolean indicating that local constants were stored in the out_folder
# Skip the whole notebook if local_output is false in the preceding notebooks.
if not local_output:
print('No local constants saved. Skipping summary plots')
import sys
sys.exit(0)
```
%% Cell type:code id: tags:
``` python
import warnings
from pathlib import Path
warnings.filterwarnings('ignore')
import h5py
import matplotlib
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy as np
import yaml
from IPython.display import Markdown, display
matplotlib.use("agg")
%matplotlib inline
import tabulate
from IPython.display import Latex, Markdown, display
from XFELDetAna.plotting.simpleplot import simplePlot
from cal_tools.enums import BadPixels
from cal_tools.plotting import init_jungfrau_geom, show_processed_modules_jungfrau
from cal_tools.tools import CalibrationMetadata
```
%% Cell type:code id: tags:
``` python
# Prepare paths and load previous constants' metadata.
out_folder = Path(out_folder)
metadata = CalibrationMetadata(metadata_folder or out_folder)
mod_mapping = metadata.setdefault("modules-mapping", {})
dark_constants = ["Offset", "Noise", "BadPixelsDark"]
cnames = ["Offset10Hz", "Noise10Hz", "BadPixelsDark10Hz"]
prev_const_metadata = {}
for fn in Path(metadata_folder or out_folder).glob("module_metadata_*.yml"):
with fn.open("r") as fd:
fdict = yaml.safe_load(fd)
module = fdict["module"]
mod_mapping[module] = fdict["pdu"]
prev_const_metadata[module] = fdict["old-constants"]
metadata.save()
```
%% Cell type:code id: tags:
``` python
expected_modules, geom = init_jungfrau_geom(
karabo_id=karabo_id, karabo_da=karabo_da)
nmods = len(expected_modules)
```
%% Cell type:markdown id: tags:
Preparing newly injected and previous constants from produced local folder in out_folder.
%% Cell type:code id: tags:
``` python
fixed_gain = False # constant is adaptive by default.
# Get the constant shape from one of the local constants.
# This is one way to realize the number of memory cells.
with h5py.File(list(out_folder.glob("const_Offset_*"))[0], 'r') as f:
const_shape = f["data"][()].shape
# TODO: replace this shape and mode finding part
# by storing detector conditions in the metadata files.
try:
# Get constant shape and gain mode from an old offset ccv.
cname = cnames[0]
old_offset_mod = next(
mod for mod, v in prev_const_metadata.items() if v[cname]["filepath"] is not None) # noqa
except StopIteration: # no old ccvs
print('No old constants to compare against. Skipping summary plots')
import sys
sys.exit(0)
with h5py.File(
prev_const_metadata[old_offset_mod][cname]["filepath"], 'r') as f:
dataset = prev_const_metadata[old_offset_mod][cname]["dataset"]
const_shape = f[f"{dataset}/data"][()].shape
# Get fixed gain value to decide offset vmin, vmax
# for later constant map plots.
gain_mode = "condition/Gain mode/value"
gain_mode = f"{dataset}/operating_conditions/gain_mode"
if gain_mode in f:
fixed_gain = f[gain_mode][()]
initial_stacked_constants = np.full(((nmods,)+const_shape), np.nan)
curr_constants = { c: initial_stacked_constants.copy() for c in dark_constants}
prev_constants = { c: initial_stacked_constants.copy() for c in dark_constants}
curr_constants = { c: initial_stacked_constants.copy() for c in cnames}
prev_constants = { c: initial_stacked_constants.copy() for c in cnames}
exculded_constants = [] # constants excluded from comparison plots.
excluded_constants = [] # constants excluded from comparison plots.
# Loop over modules
for cname in dark_constants:
for cname in cnames:
excluded_modules = [] # modules with no previous constants.
for i, mod in enumerate(sorted(expected_modules)):
# Loop over expected dark constants in out_folder.
# Some constants can be missing in out_folder.
pdu = mod_mapping[mod]
# first load new constant
# TODO: Loose dependency on local stored ccvs files.
fpath = out_folder / f"const_{cname}_{pdu}.h5"
dataset = f"{pdu}/{cname}/0/data"
with h5py.File(fpath, 'r') as f:
curr_constants[cname][i, ...] = f["data"][()]
curr_constants[cname][i, ...] = f[dataset][()]
# Load previous constants.
old_mod_mdata = prev_const_metadata[mod]
if cname in old_mod_mdata: # a module can be missing from detector dark processing.
filepath = old_mod_mdata[cname]["filepath"]
h5path = old_mod_mdata[cname]["h5path"]
if not filepath or not h5path:
dataset = old_mod_mdata[cname]["dataset"]
if not filepath or not dataset:
excluded_modules.append(mod)
prev_constants[cname][i, ...].fill(np.nan)
else:
with h5py.File(filepath, "r") as fd:
prev_constants[cname][i, ...] = fd[f"{h5path}/data"][()]
prev_constants[cname][i, ...] = fd[f"{dataset}/data"][()]
if excluded_modules:
print(f"Previous {cname} constants for {excluded_modules} are not available.\n.")
# Exclude constants from comparison plots, if the corresponding
# previous constants are not available for all modules.
if len(excluded_modules) == nmods:
exculded_constants.append(cname)
excluded_constants.append(cname)
print(f"No comparison plots for {cname}.\n")
```
%% Cell type:code id: tags:
``` python
display(Markdown('## Processed modules'))
processed_modules = list(mod_mapping.keys())
processed_pdus = list(mod_mapping.values())
show_processed_modules_jungfrau(
jungfrau_geom=geom,
constants=curr_constants,
processed_modules=processed_modules,
expected_modules=expected_modules,
display_module_names=processed_pdus,
)
```
%% Cell type:code id: tags:
``` python
gainstages = 3
gain_names = ["High Gain", "Medium Gain", "Low Gain" ]
gs = gridspec.GridSpec(2, 4)
axes = {
"map": {
"gs": gs[0, 1:3],
"shrink": 0.7,
"pad": 0.05,
"label": "ADCu",
"title": "{}",
"location": "right",
},
"diff": {
"gs": gs[1, :2],
"shrink": 0.7,
"pad": 0.02,
"label": "ADCu",
"location": "left",
"title": "Difference with previous {}",
},
"diff_frac": {
"gs": gs[1, 2:],
"shrink": 0.7,
"pad": 0.02,
"label": "%",
"location": "right",
"title": "Difference with previous {} %",
},
}
```
%% Cell type:code id: tags:
``` python
def badpx(constant_name):
return "bad" in constant_name.lower()
def bp_entry(bp):
return [f"{bp.name:<30s}", f"{bp.value:032b}", f"{int(bp.value)}"]
badpixels = [
BadPixels.OFFSET_OUT_OF_THRESHOLD,
BadPixels.NOISE_OUT_OF_THRESHOLD,
BadPixels.OFFSET_NOISE_EVAL_ERROR,
BadPixels.NO_DARK_DATA,
BadPixels.WRONG_GAIN_VALUE,
]
```
%% Cell type:markdown id: tags:
## Summary figures across pixels and memory cells.
The following plots give an overview of calibration constants averaged across pixels and memory cells. A bad pixel mask is applied.
%% Cell type:code id: tags:
``` python
for cname, const in curr_constants.items():
if badpx(cname):
table = [bp_entry(bp) for bp in badpixels]
display(Markdown("""**The bad pixel** mask is encoded as a bit mask."""))
display(Latex(
tabulate.tabulate(
table,
tablefmt='latex',
headers=["Name", "bit value", "integer value"]
)))
# Prepare the stacked mean of constant,
# the difference with the previous constant
# and the fraction of that difference.
mean_const = np.nanmean(const, axis=3)
mean_diff = np.abs(
np.nanmean(const, axis=3) - np.nanmean(
prev_constants[cname],
axis=3)
)
mean_frac = np.divide(
mean_diff,
mean_const,
out=np.zeros_like(mean_const),
where=(mean_const != 0)
) * 100
for gain in range(gainstages):
data_to_plot = {
f'map': mean_const[..., gain],
f'diff': mean_diff[..., gain],
f'diff_frac': mean_frac[..., gain],
}
# Plotting constant overall modules.
display(Markdown(f'### {cname} - {gain_names[gain]}'))
if nmods > 1:
fig = plt.figure(figsize=(20, 20))
else:
fig = plt.figure(figsize=(20, 10))
for axname, axv in axes.items():
# Avoid difference plots if previous constants
# are missing for the detector.
if cname in exculded_constants and axname != "map":
if cname in excluded_constants and axname != "map":
break
ax = fig.add_subplot(axv["gs"])
if badpx(cname):
vmin, vmax = (0, sorted([bp.value for bp in badpixels])[-2])
else:
vmin, vmax = np.percentile(data_to_plot[axname], [5, 95])
geom.plot_data(
data_to_plot[axname],
vmin=vmin,
vmax=vmax,
ax=ax,
colorbar={
"shrink": axv["shrink"],
"pad": axv["pad"],
"location": axv["location"],
},
)
colorbar = ax.images[0].colorbar
colorbar.set_label(axv["label"], fontsize=15)
colorbar.ax.tick_params(labelsize=15)
ax.tick_params(labelsize=1)
ax.set_title(axv["title"].format(
f"{cname} {gain_names[gain]}"), fontsize=15)
if axname == "map":
ax.set_xlabel('Columns', fontsize=15)
ax.set_ylabel('Rows', fontsize=15)
ax.tick_params(labelsize=15)
else:
ax.tick_params(labelsize=0)
# Remove axes labels for comparison plots.
ax.set_xlabel('')
ax.set_ylabel('')
plt.show()
```
%% Cell type:code id: tags:
``` python
if curr_constants["Offset"].shape[-2] > 1:
if const_shape[-2] > 1:
display(Markdown("## Summary across pixels per memory cells"))
# Plot mean and std of memcells for each module, gain, and constant
# across trains.
for const_name, const in curr_constants.items():
display(Markdown(f'### {const_name}'))
for gain in range(gainstages):
data = np.copy(const[..., gain])
if const_name == 'BadPixelsDark':
data[data > 0] = 1.0
datamean = np.nanmean(data, axis=(1, 2))
datamean[datamean == 1.0] = np.nan
fig = plt.figure(
figsize=(15, 6),
tight_layout={'pad': 0.2, 'w_pad': 1.3, 'h_pad': 1.3})
label = 'Fraction of bad pixels'
ax = fig.add_subplot(1, 1, 1)
else:
datamean = np.nanmean(data, axis=(1, 2))
fig = plt.figure(
figsize=(15, 6),
tight_layout={'pad': 0.2, 'w_pad': 1.3, 'h_pad': 1.3})
label = f'{const_name} value [ADU], good pixels only'
ax = fig.add_subplot(1, 2, 1)
d = []
for i, mod in enumerate(datamean):
d.append({
'x': np.arange(mod.shape[0]),
'y': mod,
'drawstyle': 'steps-pre',
'label': processed_modules[i],
})
simplePlot(
d, figsize=(10, 10), xrange=(-12, 510),
x_label='Memory Cell ID',
y_label=label,
use_axis=ax,
title=f'{gain_names[gain]}',
title_position=[0.5, 1.18],
legend='outside-top-ncol6-frame',
legend_size='18%',
legend_pad=0.00,
)
# Extra Sigma plot for Offset and Noise constants.
if const_name != 'BadPixelsDark':
ax = fig.add_subplot(1, 2, 2)
label = f'$\sigma$ {const_name} [ADU], good pixels only'
d = []
for i, mod in enumerate(np.nanstd(data, axis=(1, 2))):
d.append({
'x': np.arange(mod.shape[0]),
'y': mod,
'drawstyle': 'steps-pre',
'label': processed_modules[i],
})
simplePlot(
d, figsize=(10, 10), xrange=(-12, 510),
x_label='Memory Cell ID',
y_label=label,
use_axis=ax,
title=f'{gain_names[gain]} $\sigma$',
title_position=[0.5, 1.18],
legend='outside-top-ncol6-frame',
legend_size='18%',
legend_pad=0.00,
)
plt.show()
```
......
......@@ -133,6 +133,22 @@ class CalCatAPIClient:
def detector_by_id(self, det_id):
return self.get(f"detectors/{det_id}")
@lru_cache()
def pdus_by_detector_name(self, det_name, snapshot_at=""):
det_id = self.detector_by_identifier(det_name)['id']
return self.pdus_by_detector_id(det_id, snapshot_at)
@lru_cache()
def pdus_by_detector_id(self, det_id, snapshot_at=""):
return self.get(
"physical_detector_units/get_all_by_detector",
{
"detector_id": det_id,
"pdu_snapshot_at": self.format_time(snapshot_at),
},
)
# --------------------
# Shortcuts to find 1 of something by an ID-like field (e.g. name) other
# than CalCat's own integer IDs. Error on no match or >1 matches.
......@@ -258,13 +274,16 @@ class SingleConstant:
_have_calcat_metadata=True,
)
def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
def get_full_path(self, caldb_root=None):
if caldb_root is not None:
caldb_root = Path(caldb_root)
else:
caldb_root = _get_default_caldb_root()
return caldb_root / self.path
f = h5py.File(caldb_root / self.path, "r")
def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
f = h5py.File(self.get_full_path(caldb_root), "r")
return f[self.dataset]["data"]
def ndarray(self, caldb_root=None):
......@@ -495,6 +514,7 @@ class CalibrationData(Mapping):
client=None,
event_at=None,
pdu_snapshot_at=None,
begin_at_strategy="",
):
if calibrations is None:
calibrations = set(condition.calibration_types)
......@@ -509,13 +529,8 @@ class CalibrationData(Mapping):
client = client or get_client()
detector_id = client.detector_by_identifier(detector_name)["id"]
pdus = client.get(
"physical_detector_units/get_all_by_detector",
{
"detector_id": detector_id,
"pdu_snapshot_at": client.format_time(pdu_snapshot_at),
},
)
pdus = client.pdus_by_detector_id(detector_id, pdu_snapshot_at)
module_details = sorted(pdus, key=lambda d: d["karabo_da"])
for mod in module_details:
if mod.get("module_number") is None:
......@@ -539,6 +554,7 @@ class CalibrationData(Mapping):
"karabo_da": "",
"event_at": client.format_time(event_at),
"pdu_snapshot_at": client.format_time(pdu_snapshot_at),
"begin_at_strategy": begin_at_strategy,
},
data=json.dumps(cls._format_cond(condition_dict)),
)
......@@ -927,6 +943,46 @@ class DSSCConditions(ConditionsBase):
}
@dataclass
class JUNGFRAUConditions(ConditionsBase):
"""Conditions for JUNGFRAU detectors"""
sensor_bias_voltage: float
memory_cells: int
integration_time: float
gain_setting: int
gain_mode: Optional[int] = None
sensor_temperature: float = 291
pixels_x: int = 1024
pixels_y: int = 512
_params = [
"Sensor Bias Voltage",
"Memory Cells",
"Pixels X",
"Pixels Y",
"Integration Time",
"Sensor temperature",
"Gain Setting",
"Gain mode",
]
calibration_types = {
"Offset10Hz": _params,
"Noise10Hz": _params,
"BadPixelsDark10Hz": _params,
"RelativeGain10Hz": _params,
"BadPixelsFF10Hz": _params,
}
def make_dict(self, parameters):
cond = super().make_dict(parameters)
# Fix-up some database quirks.
if int(cond.get("Gain mode", -1)) == 0:
del cond["Gain mode"]
return cond
@dataclass
class ShimadzuHPVX2Conditions(ConditionsBase):
burst_frame_count: float
......
from datetime import datetime, timezone
from struct import pack, unpack
from pathlib import Path
from shutil import copyfile
from hashlib import md5
import binascii
import time
import warnings
from hashlib import md5
from pathlib import Path
from shutil import copyfile
from struct import pack, unpack
import numpy as np
import h5py
import numpy as np
from calibration_client import CalibrationClient
from cal_tools.calcat_interface2 import _get_default_caldb_root, get_client
from cal_tools.tools import run_prop_seq_from_path
from cal_tools.calcat_interface2 import _get_default_caldb_root
from cal_tools.restful_config import calibration_client
class CCVAlreadyInjected(UserWarning):
"""Exception when same CCV was already injected.
expected response: {
'success': False, 'status_code': 422,
'info': 'Error creating calibration_constant_version',
'app_info': {
'calibration_constant_id': ['has already been taken'],
'physical_detector_unit_id': ['has already been taken'],
'begin_at': ['has already been taken']
}, 'pagination': {}, 'data': {}}
"""
pass
def custom_warning_formatter(
message, category, filename, lineno, file=None, line=None):
"""Custom warning format to avoid display filename and lineno."""
return f"{category.__name__}: {message}\n"
# Apply the custom warning formatter
warnings.formatwarning = custom_warning_formatter
def write_ccv(
const_path,
pdu_name, pdu_uuid, detector_type,
calibration, conditions, created_at, proposal, runs,
data, dims, key='0'
data, dims, key='0', lower_deviations={}, upper_deviations={},
):
"""Write CCV data file.
......@@ -73,8 +94,8 @@ def write_ccv(
key = db_name.lower().replace(' ', '_')
dset = opcond_group.create_dataset(key, data=value,
dtype=np.float64)
dset.attrs['lower_deviation'] = 0.0
dset.attrs['upper_deviation'] = 0.0
dset.attrs['lower_deviation'] = lower_deviations.get(db_name, 0.0)
dset.attrs['upper_deviation'] = upper_deviations.get(db_name, 0.0)
dset.attrs['database_name'] = db_name
dset = ccv_group.create_dataset('data', data=data)
......@@ -83,6 +104,40 @@ def write_ccv(
return ccv_group_name
def get_condition_dict(
name, value, lower_deviation=0.0, upper_deviation=0.0):
return {
'parameter_name': name,
'value': value,
'lower_deviation_value': lower_deviation,
'upper_deviation_value': upper_deviation,
'flg_available': True
}
def generate_unique_name(detector_type, pdu_name, pdu_uuid, cond_params):
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
for param_dict in cond_params:
cond_hash.update(str(param_dict['parameter_name']).encode())
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
return unique_name[:60]
def get_raw_data_location(proposal, runs):
if proposal and len(runs) > 0:
return (
f'proposal:{proposal} runs: {" ".join([str(x) for x in runs])}')
else:
return "" # Fallback for non-run based constants
def inject_ccv(const_src, ccv_root, report_to=None):
"""Inject new CCV into CalCat.
......@@ -106,6 +161,7 @@ def inject_ccv(const_src, ccv_root, report_to=None):
detector_type = pdu_group.attrs['detector_type']
ccv_group = const_file[ccv_root]
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs']
begin_at_str = ccv_group.attrs['begin_at']
......@@ -116,44 +172,26 @@ def inject_ccv(const_src, ccv_root, report_to=None):
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
cond_params.append({
'parameter_name': param_dset.attrs['database_name'],
'value': float(param_dset[()]),
'lower_deviation_value': param_dset.attrs['lower_deviation'],
'upper_deviation_value': param_dset.attrs['upper_deviation'],
'flg_available': True
})
cond_params.append(get_condition_dict(
param_dset.attrs['database_name'],
float(param_dset[()]),
param_dset.attrs['lower_deviation'],
param_dset.attrs['upper_deviation'],
))
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
if proposal and len(runs) > 0:
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
else:
pass # Fallback for non-run based constants
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
unique_name = generate_unique_name(
detector_type, pdu_name, pdu_uuid, cond_params)
for param_dict in cond_params:
cond_hash.update(str(param_dict['parameter_name']).encode())
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
raw_data_location = get_raw_data_location(proposal, runs)
# Add PDU "UUID" to parameters.
cond_params.append({
'parameter_name': 'Detector UUID',
'value': unpack('d', pack('q', pdu_uuid))[0],
'lower_deviation_value': 0.0,
'upper_deviation_value': 0.0,
'flg_available': True
})
cond_params.append(get_condition_dict(
'Detector UUID',
unpack('d', pack('q', pdu_uuid))[0]
))
inject_h = {
'detector_condition': {
......@@ -196,4 +234,14 @@ def inject_ccv(const_src, ccv_root, report_to=None):
if not resp['success']:
const_dest.unlink() # Delete already copied CCV file.
raise RuntimeError(resp)
# Unfortunately this is the best we can do for a check at the moment.
if (
resp['status_code'] == 422 and
"taken" in resp['app_info'].get("begin_at", [""])[0]
):
warnings.warn(
f"{calibration} calibration constant version for {pdu_name}"
" has been already injected.\n",
CCVAlreadyInjected)
else:
raise RuntimeError(resp)
......@@ -398,8 +398,8 @@ def show_processed_modules_jungfrau(
if module in processed_modules:
color = 'green'
if (
'Noise' not in constants.keys() or
np.nanmean(constants['Noise'][counter, ..., 0]) == 0
'Noise10Hz' not in constants.keys() or
np.nanmean(constants['Noise10Hz'][counter, ..., 0]) == 0
):
color = 'red'
counter += 1
......