Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • calibration/pycalibration
1 result
Show changes
Commits on Source (16)
Showing
with 1130 additions and 292 deletions
......@@ -21,6 +21,7 @@
LPD/results
Test
build
coverage.*
docs/build
docs/source/_notebooks
docs/source/_static/reports
......
......@@ -43,7 +43,12 @@ pytest:
<<: *before_script
script:
- python3 -m pip install ".[test]"
- python3 -m pytest --verbose --cov=cal_tools --cov=xfel_calibrate
- python3 -m pytest --color yes --verbose --cov=cal_tools --cov=xfel_calibrate
# Nope... https://docs.gitlab.com/12.10/ee/user/project/merge_requests/test_coverage_visualization.html#enabling-the-feature
# - coverage xml
# artifacts:
# reports:
# cobertura: coverage.xml
cython-editable-install-test:
stage: test
......@@ -51,4 +56,4 @@ cython-editable-install-test:
<<: *before_script
script:
- python3 -m pip install -e ".[test]"
- python3 -m pytest ./tests/test_agipdalgs.py
- python3 -m pytest --color yes --verbose ./tests/test_agipdalgs.py
%% Cell type:markdown id: tags:
# AGIPD Characterize Dark Images #
Author: S. Hauf, Version: 0.1
The following code analyzes a set of dark images taken with the AGIPD detector to deduce detector offsets , noise, bad-pixel maps and thresholding. All four types of constants are evaluated per-pixel and per-memory cell. Data for the detector's three gain stages needs to be present, separated into separate runs.
The evaluated calibration constants are stored locally and injected in the calibration data base.
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/d/raw/CALLAB/202031/p900113" # path to input data, required
out_folder = "/gpfs/exfel/data/scratch/hammerd/agipd-fixed-gain" # path to output to, required
sequences = [0] # sequence files to evaluate.
modules = [-1] # list of modules to evaluate, RANGE ALLOWED
run_high = 9985 # run number in which high gain data was recorded, required
run_med = 9984 # run number in which medium gain data was recorded, required
run_low = 9983 # run number in which low gain data was recorded, required
operation_mode = "ADAPTIVE_GAIN" # Detector operation mode, optional (defaults to "ADAPTIVE_GAIN")
karabo_id = "HED_DET_AGIPD500K2G" # karabo karabo_id
karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators
receiver_id = "{}CH0" # inset for receiver devices
path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data
h5path = '/INSTRUMENT/{}/DET/{}:xtdf/image' # path in the HDF5 file to images
h5path_idx = '/INDEX/{}/DET/{}:xtdf/image' # path in the HDF5 file to images
h5path_ctrl = '/CONTROL/{}/MDL/FPGA_COMP' # path to control information
karabo_id_control = "HED_EXP_AGIPD500K2G" # karabo-id for control device '
karabo_da_control = "AGIPD500K2G00" # karabo DA for control infromation
use_dir_creation_date = True # use dir creation date as data production reference date
cal_db_interface = "tcp://max-exfl016:8020" # the database interface to use
cal_db_timeout = 3000000 # timeout on caldb requests"
local_output = True # output constants locally
db_output = False # output constants to database
mem_cells = 0 # number of memory cells used, set to 0 to automatically infer
bias_voltage = 0 # detector bias voltage
gain_setting = 0.1 # the gain setting, use 0.1 to try to auto-determine
acq_rate = 0. # the detector acquisition rate, use 0 to try to auto-determine
interlaced = False # assume interlaced data format, for data prior to Dec. 2017
rawversion = 2 # RAW file format version
thresholds_offset_sigma = 3. # offset sigma thresholds for offset deduced bad pixels
thresholds_offset_hard = [0, 0] # For setting the same threshold offset for the 3 gains. Left for backcompatability. Default [0, 0] to take the following parameters.
thresholds_offset_hard_hg = [3000, 7000] # High-gain thresholds in absolute ADU terms for offset deduced bad pixels
thresholds_offset_hard_mg = [6000, 10000] # Medium-gain thresholds in absolute ADU terms for offset deduced bad pixels
thresholds_offset_hard_lg = [6000, 10000] # Low-gain thresholds in absolute ADU terms for offset deduced bad pixels
thresholds_offset_hard_hg_fixed = [3500, 6500] # Same as thresholds_offset_hard_hg, but for fixed gain operation
thresholds_offset_hard_mg_fixed = [3500, 6500] # Same as thresholds_offset_hard_mg, but for fixed gain operation
thresholds_offset_hard_lg_fixed = [3500, 6500] # Same as thresholds_offset_hard_lg, but for fixed gain operation
thresholds_noise_sigma = 5. # noise sigma thresholds for offset deduced bad pixels
thresholds_noise_hard = [0, 0] # For setting the same threshold noise for the 3 gains. Left for backcompatability. Default [0, 0] to take the following parameters.
thresholds_noise_hard_hg = [4, 20] # High-gain thresholds in absolute ADU terms for offset deduced bad pixels
thresholds_noise_hard_mg = [4, 20] # Medium-gain thresholds in absolute ADU terms for offset deduced bad pixels
thresholds_noise_hard_lg = [4, 20] # Low-gain thresholds in absolute ADU terms for offset deduced bad pixels
thresholds_gain_sigma = 5. # Gain separation sigma threshold
high_res_badpix_3d = False # set this to True if you need high-resolution 3d bad pixel plots. ~7mins extra time for 64 memory cells
```
%% Cell type:code id: tags:
``` python
import os
from collections import OrderedDict
from datetime import datetime
from typing import Tuple
import dateutil.parser
import h5py
import matplotlib
import numpy as np
import pasha as psh
import tabulate
import yaml
matplotlib.use('agg')
import matplotlib.pyplot as plt
from IPython.display import Latex, Markdown, display
%matplotlib inline
import itertools
import multiprocessing
from cal_tools.agipdlib import (
get_acq_rate,
get_bias_voltage,
get_gain_mode,
get_gain_setting,
get_num_cells,
)
from cal_tools.enums import AgipdGainMode, BadPixels
from cal_tools.plotting import (
create_constant_overview,
plot_badpix_3d,
show_overview,
show_processed_modules,
)
from cal_tools.tools import (
get_dir_creation_date,
get_from_db,
get_pdu_from_db,
get_random_db_interface,
get_report,
map_gain_stages,
module_index_to_qm,
run_prop_seq_from_path,
save_const_to_h5,
send_to_db,
)
from iCalibrationDB import Conditions, Constants, Detectors
import iCalibrationDB
```
%% Cell type:code id: tags:
``` python
# insert control device if format string (does nothing otherwise)
h5path_ctrl = h5path_ctrl.format(karabo_id_control)
max_cells = mem_cells
offset_runs = OrderedDict()
offset_runs["high"] = run_high
offset_runs["med"] = run_med
offset_runs["low"] = run_low
creation_time=None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run_high)
print(f"Using {creation_time} as creation time of constant.")
run, prop, seq = run_prop_seq_from_path(in_folder)
cal_db_interface = get_random_db_interface(cal_db_interface)
print(f'Calibration database interface: {cal_db_interface}')
instrument = karabo_id.split("_")[0]
if instrument == "SPB":
dinstance = "AGIPD1M1"
nmods = 16
elif instrument == "MID":
dinstance = "AGIPD1M2"
nmods = 16
elif instrument == "HED":
dinstance = "AGIPD500K"
nmods = 8
control_names = [f'{in_folder}/r{r:04d}/RAW-R{r:04d}-{karabo_da_control}-S00000.h5'
for r in (run_high, run_med, run_low)]
if operation_mode not in ("ADAPTIVE_GAIN", "FIXED_GAIN"):
print(f"WARNING: unknown operation_mode \"{operation_mode}\" parameter set")
run_gain_modes = [get_gain_mode(fn, h5path_ctrl) for fn in control_names]
if all(gm == AgipdGainMode.ADAPTIVE_GAIN for gm in run_gain_modes):
fixed_gain_mode = False
if operation_mode == "FIXED_GAIN":
print("WARNING: operation_mode parameter is FIXED_GAIN, slow data indicates adaptive gain")
elif run_gain_modes == [AgipdGainMode.FIXED_HIGH_GAIN, AgipdGainMode.FIXED_MEDIUM_GAIN, AgipdGainMode.FIXED_LOW_GAIN]:
if operation_mode == "ADAPTIVE_GAIN":
print("WARNING: operation_mode parameter ix ADAPTIVE_GAIN, slow data indicates fixed gain")
fixed_gain_mode = True
else:
print(f'Something is clearly wrong; slow data indicates gain modes {run_gain_modes}')
print(f"Detector in use is {karabo_id}")
print(f"Instrument {instrument}")
print(f"Detector instance {dinstance}")
```
%% Cell type:code id: tags:
``` python
runs = [run_high, run_med, run_low]
if gain_setting == 0.1:
if creation_time.replace(tzinfo=None) < dateutil.parser.parse('2020-01-31'):
print("Set gain-setting to None for runs taken before 2020-01-31")
gain_setting = None
else:
try:
# extract gain setting and validate that all runs have the same setting
gsettings = []
for r in runs:
control_fname = '{}/r{:04d}/RAW-R{:04d}-{}-S00000.h5'.format(in_folder, r, r,
karabo_da_control)
gsettings.append(get_gain_setting(control_fname, h5path_ctrl))
if not all(g == gsettings[0] for g in gsettings):
raise ValueError(f"Different gain settings for the 3 input runs {gsettings}")
gain_setting = gsettings[0]
gain_setting = gsettings[0]
except Exception as e:
print(f'Error while reading gain setting from: \n{control_fname}')
print(f'Error: {e}')
if "component not found" in str(e):
print("Gain setting is not found in the control information")
print("Data will not be processed")
sequences = []
```
%% Cell type:code id: tags:
``` python
if karabo_da[0] == '-1':
if modules[0] == -1:
modules = list(range(nmods))
karabo_da = ["AGIPD{:02d}".format(i) for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
h5path = h5path.format(karabo_id, receiver_id)
h5path_idx = h5path_idx.format(karabo_id, receiver_id)
if bias_voltage == 0:
# Read the bias voltage from files, if recorded.
# If not available, make use of the historical voltage the detector is running at
bias_voltage = get_bias_voltage(control_names[0], karabo_id_control)
bias_voltage = bias_voltage if bias_voltage is not None else 300
print("Parameters are:")
print(f"Proposal: {prop}")
print(f"Memory cells: {mem_cells}/{max_cells}")
print("Runs: {}".format([ v for v in offset_runs.values()]))
print("Runs: {}".format([v for v in offset_runs.values()]))
print(f"Sequences: {sequences}")
print(f"Interlaced mode: {interlaced}")
print(f"Using DB: {db_output}")
print(f"Input: {in_folder}")
print(f"Output: {out_folder}")
print(f"Bias voltage: {bias_voltage}V")
print(f"Gain setting: {gain_setting}")
print(f"Operation mode is {'fixed' if fixed_gain_mode else 'adaptive'} gain mode")
```
%% Cell type:markdown id: tags:
The following lines will create a queue of files which will the be executed module-parallel. Distiguishing between different gains.
%% Cell type:code id: tags:
``` python
# set everything up filewise
os.makedirs(out_folder, exist_ok=True)
gmf = map_gain_stages(in_folder, offset_runs, path_template, karabo_da, sequences)
gain_mapped_files, total_sequences, total_file_size = gmf
print(f"Will process a total of {total_sequences} files.")
```
%% Cell type:markdown id: tags:
## Calculate Offsets, Noise and Thresholds ##
The calculation is performed per-pixel and per-memory-cell. Offsets are simply the median value for a set of dark data taken at a given gain, noise the standard deviation, and gain-bit values the medians of the gain array.
%% Cell type:code id: tags:
``` python
if thresholds_offset_hard != [0, 0]:
# if set, this will override the individual parameters
thresholds_offset_hard = [thresholds_offset_hard] * 3
elif fixed_gain_mode:
thresholds_offset_hard = [
thresholds_offset_hard_hg_fixed,
thresholds_offset_hard_mg_fixed,
thresholds_offset_hard_lg_fixed,
]
else:
thresholds_offset_hard = [
thresholds_offset_hard_hg,
thresholds_offset_hard_mg,
thresholds_offset_hard_lg,
]
print(f"Will use the following hard offset thresholds")
print("Will use the following hard offset thresholds")
for name, value in zip(("High", "Medium", "Low"), thresholds_offset_hard):
print(f"- {name} gain: {value}")
if thresholds_noise_hard != [0, 0]:
thresholds_noise_hard = [thresholds_noise_hard] * 3
else:
thresholds_noise_hard = [
thresholds_noise_hard_hg,
thresholds_noise_hard_mg,
thresholds_noise_hard_lg,
]
```
%% Cell type:markdown id: tags:
The following lines will create a queue of files which will the be executed module-parallel. Distiguishing between different gains.
%% Cell type:code id: tags:
``` python
# set everything up filewise
os.makedirs(out_folder, exist_ok=True)
gain_mapped_files, total_files, total_file_size = map_gain_stages(
in_folder, offset_runs, path_template, karabo_da, sequences
)
print(f"Will process a total of {total_files} files ({total_file_size:.02f} GB).")
inp = []
for gain_index, (gain, qm_file_map) in enumerate(gain_mapped_files.items()):
for module_index in modules:
qm = module_index_to_qm(module_index)
if qm not in qm_file_map:
print(f"Did not find files for {qm}")
continue
file_queue = qm_file_map[qm]
while not file_queue.empty():
filename = file_queue.get()
print(f"Process {filename} for {qm}")
inp.append((filename, module_index, gain_index))
```
%% Cell type:markdown id: tags:
## Calculate Offsets, Noise and Thresholds ##
The calculation is performed per-pixel and per-memory-cell. Offsets are simply the median value for a set of dark data taken at a given gain, noise the standard deviation, and gain-bit values the medians of the gain array.
%% Cell type:code id: tags:
``` python
def characterize_module(fast_data_filename: str, channel: int, gg: int) -> Tuple[np.array, np.array, np.array, np.array, int, np.array, int, float]:
# min() only relevant if running on multiple modules (i.e. within notebook)
parallel_num_procs = min(12, total_files)
parallel_num_threads = multiprocessing.cpu_count() // parallel_num_procs
print(f"Will use {parallel_num_procs} processes with {parallel_num_threads} threads each")
def characterize_module(
fast_data_filename: str, channel: int, gain_index: int
) -> Tuple[np.array, np.array, np.array, np.array, int, np.array, int, float]:
if max_cells == 0:
num_cells = get_num_cells(fast_data_filename, karabo_id, channel)
else:
num_cells = max_cells
print(f"Using {num_cells} memory cells")
if acq_rate == 0.:
slow_paths = control_names[gg], karabo_id_control
slow_paths = control_names[gain_index], karabo_id_control
fast_paths = fast_data_filename, karabo_id, channel
local_acq_rate = get_acq_rate(fast_paths, slow_paths)
else:
local_acq_rate = acq_rate
local_thresholds_offset_hard = thresholds_offset_hard[gg]
local_thresholds_noise_hard = thresholds_noise_hard[gg]
local_thresholds_offset_hard = thresholds_offset_hard[gain_index]
local_thresholds_noise_hard = thresholds_noise_hard[gain_index]
h5path_f = h5path.format(channel)
h5path_idx_f = h5path_idx.format(channel)
with h5py.File(fast_data_filename, "r") as infile:
if rawversion == 2:
count = np.squeeze(infile[f"{h5path_idx_f}/count"])
first = np.squeeze(infile[f"{h5path_idx_f}/first"])
last_index = int(first[count != 0][-1]+count[count != 0][-1])
first_index = int(first[count != 0][0])
else:
status = np.squeeze(infile[f"{h5path_idx_f}/status"])
if np.count_nonzero(status != 0) == 0:
return
last = np.squeeze(infile[f"{h5path_idx_f}/last"])
first = np.squeeze(infile[f"{h5path_idx_f}/first"])
last_index = int(last[status != 0][-1]) + 1
first_index = int(first[status != 0][0])
im = np.array(infile[f"{h5path_f}/data"][first_index:last_index,...])
cellIds = np.squeeze(infile[f"{h5path_f}/cellId"][first_index:last_index,...])
cell_ids = np.squeeze(infile[f"{h5path_f}/cellId"][first_index:last_index,...])
if interlaced:
if not fixed_gain_mode:
ga = im[1::2, 0, ...]
im = im[0::2, 0, ...].astype(np.float32)
cellIds = cellIds[::2]
cell_ids = cell_ids[::2]
else:
if not fixed_gain_mode:
ga = im[:, 1, ...]
im = im[:, 0, ...].astype(np.float32)
im = np.rollaxis(im, 2)
im = np.rollaxis(im, 2, 1)
im = np.transpose(im)
if not fixed_gain_mode:
ga = np.rollaxis(ga, 2)
ga = np.rollaxis(ga, 2, 1)
ga = np.transpose(ga)
offset = np.zeros((im.shape[0], im.shape[1], num_cells))
noise = np.zeros((im.shape[0], im.shape[1], num_cells))
context = psh.context.ThreadContext(num_workers=parallel_num_threads)
offset = context.alloc(shape=(im.shape[0], im.shape[1], num_cells), dtype=np.float64)
noise = context.alloc(like=offset)
if fixed_gain_mode:
gains = None
gains_std = None
else:
gains = np.zeros((im.shape[0], im.shape[1], num_cells))
gains_std = np.zeros((im.shape[0], im.shape[1], num_cells))
gains = context.alloc(like=offset)
gains_std = context.alloc(like=offset)
for cc in np.unique(cellIds[cellIds < num_cells]):
cellidx = cellIds == cc
offset[...,cc] = np.median(im[..., cellidx], axis=2)
noise[...,cc] = np.std(im[..., cellidx], axis=2)
def process_cell(worker_id, array_index, cell_number):
cell_slice_index = (cell_ids == cell_number)
im_slice = im[..., cell_slice_index]
offset[..., cell_number] = np.median(im_slice, axis=2)
noise[..., cell_number] = np.std(im_slice, axis=2)
if not fixed_gain_mode:
gains[...,cc] = np.median(ga[..., cellidx], axis=2)
gains_std[...,cc] = np.std(ga[..., cellidx], axis=2)
ga_slice = ga[..., cell_slice_index]
gains[..., cell_number] = np.median(ga_slice, axis=2)
gains_std[..., cell_number] = np.std(ga_slice, axis=2)
context.map(process_cell, np.unique(cell_ids))
# bad pixels
bp = np.zeros(offset.shape, np.uint32)
bp = np.zeros_like(offset, dtype=np.uint32)
# offset related bad pixels
offset_mn = np.nanmedian(offset, axis=(0,1))
offset_std = np.nanstd(offset, axis=(0,1))
bp[(offset < offset_mn-thresholds_offset_sigma*offset_std) |
(offset > offset_mn+thresholds_offset_sigma*offset_std)] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bp[(offset < local_thresholds_offset_hard[0]) | (
offset > local_thresholds_offset_hard[1])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bp[~np.isfinite(offset)] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
(offset > offset_mn+thresholds_offset_sigma*offset_std)] |= BadPixels.OFFSET_OUT_OF_THRESHOLD
bp[(offset < local_thresholds_offset_hard[0]) |
(offset > local_thresholds_offset_hard[1])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD
bp[~np.isfinite(offset)] |= BadPixels.OFFSET_NOISE_EVAL_ERROR
# noise related bad pixels
noise_mn = np.nanmedian(noise, axis=(0,1))
noise_std = np.nanstd(noise, axis=(0,1))
bp[(noise < noise_mn-thresholds_noise_sigma*noise_std) |
(noise > noise_mn+thresholds_noise_sigma*noise_std)] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value
bp[(noise < local_thresholds_noise_hard[0]) | (noise > local_thresholds_noise_hard[1])] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value
bp[~np.isfinite(noise)] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
(noise > noise_mn+thresholds_noise_sigma*noise_std)] |= BadPixels.NOISE_OUT_OF_THRESHOLD
bp[(noise < local_thresholds_noise_hard[0]) | (noise > local_thresholds_noise_hard[1])] |= BadPixels.NOISE_OUT_OF_THRESHOLD
bp[~np.isfinite(noise)] |= BadPixels.OFFSET_NOISE_EVAL_ERROR
return offset, noise, gains, gains_std, gg, bp, num_cells, local_acq_rate
return offset, noise, gains, gains_std, bp, num_cells, local_acq_rate
```
%% Cell type:code id: tags:
``` python
with multiprocessing.Pool(processes=parallel_num_procs) as pool:
results = pool.starmap(characterize_module, inp)
```
%% Cell type:code id: tags:
``` python
offset_g = OrderedDict()
noise_g = OrderedDict()
badpix_g = OrderedDict()
if not fixed_gain_mode:
gain_g = OrderedDict()
gainstd_g = OrderedDict()
start = datetime.now()
all_cells = []
all_acq_rate = []
inp = []
for gg, (gain, mapped_files) in enumerate(gain_mapped_files.items()):
dones = []
for i in modules:
qm = module_index_to_qm(i)
if qm in mapped_files and not mapped_files[qm].empty():
fname_in = mapped_files[qm].get()
print("Process file: ", fname_in)
dones.append(mapped_files[qm].empty())
else:
continue
inp.append((fname_in, i, gg))
with multiprocessing.Pool() as pool:
results = pool.starmap(characterize_module, inp)
for offset, noise, gains, gains_std, gg, bp, thiscell, thisacq in results:
for (_, module_index, gain_index), (offset, noise, gains, gains_std, bp,
thiscell, thisacq) in zip(inp, results):
all_cells.append(thiscell)
all_acq_rate.append(thisacq)
for i in modules:
qm = module_index_to_qm(i)
if qm not in offset_g:
offset_g[qm] = np.zeros((offset.shape[0], offset.shape[1], offset.shape[2], 3))
noise_g[qm] = np.zeros_like(offset_g[qm])
badpix_g[qm] = np.zeros_like(offset_g[qm], np.uint32)
if not fixed_gain_mode:
gain_g[qm] = np.zeros_like(offset_g[qm])
gainstd_g[qm] = np.zeros_like(offset_g[qm])
offset_g[qm][...,gg] = offset
noise_g[qm][...,gg] = noise
badpix_g[qm][...,gg] = bp
qm = module_index_to_qm(module_index)
if qm not in offset_g:
offset_g[qm] = np.zeros((offset.shape[0], offset.shape[1], offset.shape[2], 3))
noise_g[qm] = np.zeros_like(offset_g[qm])
badpix_g[qm] = np.zeros_like(offset_g[qm], np.uint32)
if not fixed_gain_mode:
gain_g[qm][...,gg] = gains
gainstd_g[qm][..., gg] = gains_std
gain_g[qm] = np.zeros_like(offset_g[qm])
gainstd_g[qm] = np.zeros_like(offset_g[qm])
offset_g[qm][..., gain_index] = offset
noise_g[qm][..., gain_index] = noise
badpix_g[qm][..., gain_index] = bp
if not fixed_gain_mode:
gain_g[qm][..., gain_index] = gains
gainstd_g[qm][..., gain_index] = gains_std
duration = (datetime.now() - start).total_seconds()
max_cells = np.max(all_cells)
print(f"Using {max_cells} memory cells")
acq_rate = np.max(all_acq_rate)
print(f"Using {acq_rate} MHz acquisition rate")
```
%% Cell type:code id: tags:
``` python
# Add a badpixel due to bad gain separation
# Add bad pixels due to bad gain separation
if not fixed_gain_mode:
for g in range(2):
# Bad pixels during bad gain separation.
# Fraction of pixels in the module with separation lower than "thresholds_gain_sigma".
bad_sep = (gain_g[qm][..., g+1] - gain_g[qm][..., g]) / np.sqrt(gainstd_g[qm][..., g+1]**2 + gainstd_g[qm][..., g]**2)
badpix_g[qm][...,g+1][(bad_sep)<thresholds_gain_sigma]|= BadPixels.GAIN_THRESHOLDING_ERROR.value
for qm in gain_g.keys():
for g in range(2):
# Bad pixels during bad gain separation.
# Fraction of pixels in the module with separation lower than "thresholds_gain_sigma".
bad_sep = (gain_g[qm][..., g+1] - gain_g[qm][..., g]) / \
np.sqrt(gainstd_g[qm][..., g+1]**2 + gainstd_g[qm][..., g]**2)
badpix_g[qm][...,g+1][bad_sep<thresholds_gain_sigma] |= \
BadPixels.GAIN_THRESHOLDING_ERROR
```
%% Cell type:markdown id: tags:
The thresholds for gain switching are then defined as the mean value between in individual gain bit levels. Note that these thresholds need to be refined with charge induced thresholds, as the two are not the same.
%% Cell type:code id: tags:
``` python
if not fixed_gain_mode:
thresholds_g = {}
for qm in gain_g.keys():
thresholds_g[qm] = np.zeros((gain_g[qm].shape[0], gain_g[qm].shape[1], gain_g[qm].shape[2], 5))
thresholds_g[qm][...,0] = (gain_g[qm][...,1]+gain_g[qm][...,0])/2
thresholds_g[qm][...,1] = (gain_g[qm][...,2]+gain_g[qm][...,1])/2
for i in range(3):
thresholds_g[qm][...,2+i] = gain_g[qm][...,i]
```
%% Cell type:code id: tags:
``` python
res = OrderedDict()
for i in modules:
qm = module_index_to_qm(i)
res[qm] = {
'Offset': offset_g[qm],
'Noise': noise_g[qm],
'BadPixelsDark': badpix_g[qm]
}
if not fixed_gain_mode:
res[qm]['ThresholdsDark'] = thresholds_g[qm]
```
%% Cell type:code id: tags:
``` python
# Read report path and create file location tuple to add with the injection
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = 'proposal:{} runs:{} {} {}'.format(proposal, run_low, run_med, run_high)
report = get_report(out_folder)
```
%% Cell type:code id: tags:
``` python
# TODO: add db_module when received from myMDC
# Create the modules dict of karabo_das and PDUs
qm_dict = OrderedDict()
for i, k_da in zip(modules, karabo_da):
qm = module_index_to_qm(i)
qm_dict[qm] = {
"karabo_da": k_da,
"db_module": ""
}
```
%% Cell type:code id: tags:
``` python
# set the operating condition
# note: iCalibrationDB only adds gain_mode if it is truthy, so we don't need to handle None
condition = Conditions.Dark.AGIPD(memory_cells=max_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
gain_mode=fixed_gain_mode)
condition = iCalibrationDB.Conditions.Dark.AGIPD(
memory_cells=max_cells,
bias_voltage=bias_voltage,
acquisition_rate=acq_rate,
gain_setting=gain_setting,
gain_mode=fixed_gain_mode
)
```
%% Cell type:code id: tags:
``` python
# Retrieve existing constants for comparison
old_const = {}
old_mdata = {}
print('Retrieve pre-existing constants for comparison.')
for qm in res:
qm_db = qm_dict[qm]
karabo_da = qm_db["karabo_da"]
for const in res[qm]:
dconst = getattr(Constants.AGIPD, const)()
# This should be used in case of running notebook
# by a different method other than myMDC which already
# sends CalCat info.
# TODO: Set db_module to "" by default in the first cell
if not qm_db["db_module"]:
qm_db["db_module"] = get_pdu_from_db(karabo_id, karabo_da, dconst,
condition, cal_db_interface,
snapshot_at=creation_time)[0]
data, mdata = get_from_db(karabo_id, karabo_da,
dconst, condition,
None, cal_db_interface,
creation_time=creation_time,
verbosity=2, timeout=cal_db_timeout)
old_const[const] = data
if mdata is not None and data is not None:
time = mdata.calibration_constant_version.begin_at
old_mdata[const] = time.isoformat()
os.makedirs('{}/old/'.format(out_folder), exist_ok=True)
save_const_to_h5(qm_db["db_module"], karabo_id,
dconst, condition, data,
file_loc, report, creation_time,
f'{out_folder}/old/')
else:
old_mdata[const] = "Not found"
with open(f"{out_folder}/module_mapping_{qm}.yml","w") as fd:
yaml.safe_dump({"module_mapping": {qm: qm_db["db_module"]}}, fd)
# Create mapping from module(s) (qm) to karabo_da(s) and PDU(s)
qm_dict = OrderedDict()
all_pdus = get_pdu_from_db(
karabo_id,
karabo_da,
constant=iCalibrationDB.CalibrationConstant(),
condition=condition,
cal_db_interface=cal_db_interface,
snapshot_at=creation_time.isoformat(),
timeout=cal_db_timeout
)
for module_index, module_da, module_pdu in zip(modules, karabo_da, all_pdus):
qm = module_index_to_qm(module_index)
qm_dict[qm] = {
"karabo_da": module_da,
"db_module": module_pdu
}
# saving mapping information for summary notebook
with open(f"{out_folder}/module_mapping_{qm}.yml", "w") as fd:
yaml.safe_dump({"module_mapping": {qm: module_pdu}}, fd)
```
%% Cell type:code id: tags:
``` python
md = None
for qm in res:
karabo_da = qm_dict[qm]["karabo_da"]
db_module = qm_dict[qm]["db_module"]
for const in res[qm]:
dconst = getattr(Constants.AGIPD, const)()
dconst = getattr(iCalibrationDB.Constants.AGIPD, const)()
dconst.data = res[qm][const]
if db_output:
md = send_to_db(db_module, karabo_id, dconst, condition, file_loc,
report, cal_db_interface, creation_time=creation_time,
timeout=cal_db_timeout)
if local_output:
md = save_const_to_h5(db_module, karabo_id, dconst, condition, dconst.data,
file_loc, report, creation_time, out_folder)
print(f"Calibration constant {const} is stored locally.\n")
print(f"Calibration constant {const} for {qm} is stored locally in {file_loc}.\n")
print("Constants parameter conditions are:\n")
print(f"• memory_cells: {max_cells}\n• bias_voltage: {bias_voltage}\n"
f"• acquisition_rate: {acq_rate}\n• gain_setting: {gain_setting}\n"
f"• gain_mode: {fixed_gain_mode}\n"
f"• creation_time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n")
```
%% Cell type:code id: tags:
``` python
# Start retrieving existing constants for comparison
qm_x_const = [(qm, const) for const in res[qm] for qm in res]
def retrieve_old_constant(qm, const):
dconst = getattr(iCalibrationDB.Constants.AGIPD, const)()
# This should be used in case of running notebook
# by a different method other than myMDC which already
# sends CalCat info.
# TODO: Set db_module to "" by default in the first cell
data, mdata = get_from_db(
karabo_id,
qm_dict[qm]["karabo_da"],
constant=dconst,
condition=condition,
empty_constant=None,
cal_db_interface=cal_db_interface,
creation_time=creation_time,
verbosity=2,
timeout=cal_db_timeout
)
if mdata is None or data is None:
timestamp = "Not found"
else:
timestamp = mdata.calibration_constant_version.begin_at.isoformat()
return data, timestamp
old_retrieval_pool = multiprocessing.Pool()
old_retrieval_res = old_retrieval_pool.starmap_async(
retrieve_old_constant, qm_x_const
)
old_retrieval_pool.close()
```
%% Cell type:code id: tags:
``` python
mnames=[]
for i in modules:
qm = module_index_to_qm(i)
mnames.append(qm)
display(Markdown(f'## Position of the module {qm} and its ASICs##'))
display(Markdown(f'## Position of the module {qm} and its ASICs'))
show_processed_modules(dinstance, constants=None, mnames=mnames, mode="position")
```
%% Cell type:markdown id: tags:
## Single-Cell Overviews ##
Single cell overviews allow to identify potential effects on all memory cells, e.g. on sensor level. Additionally, they should serve as a first sanity check on expected behaviour, e.g. if structuring on the ASIC level is visible in the offsets, but otherwise no immediate artifacts are visible.
%% Cell type:markdown id: tags:
### High Gain ###
%% Cell type:code id: tags:
``` python
cell = 3
gain = 0
show_overview(res, cell, gain, infix="{}-{}-{}".format(*offset_runs.values()))
```
%% Cell type:markdown id: tags:
### Medium Gain ###
%% Cell type:code id: tags:
``` python
cell = 3
gain = 1
show_overview(res, cell, gain, infix="{}-{}-{}".format(*offset_runs.values()))
```
%% Cell type:markdown id: tags:
### Low Gain ###
%% Cell type:code id: tags:
``` python
cell = 3
gain = 2
show_overview(res, cell, gain, infix="{}-{}-{}".format(*offset_runs.values()))
```
%% Cell type:code id: tags:
``` python
cols = {BadPixels.NOISE_OUT_OF_THRESHOLD.value: (BadPixels.NOISE_OUT_OF_THRESHOLD.name, '#FF000080'),
BadPixels.OFFSET_NOISE_EVAL_ERROR.value: (BadPixels.OFFSET_NOISE_EVAL_ERROR.name, '#0000FF80'),
BadPixels.OFFSET_OUT_OF_THRESHOLD.value: (BadPixels.OFFSET_OUT_OF_THRESHOLD.name, '#00FF0080'),
BadPixels.GAIN_THRESHOLDING_ERROR.value: (BadPixels.GAIN_THRESHOLDING_ERROR.name, '#FF40FF40'),
BadPixels.OFFSET_OUT_OF_THRESHOLD.value | BadPixels.NOISE_OUT_OF_THRESHOLD.value: ('OFFSET_OUT_OF_THRESHOLD + NOISE_OUT_OF_THRESHOLD', '#DD00DD80'),
BadPixels.OFFSET_OUT_OF_THRESHOLD.value | BadPixels.NOISE_OUT_OF_THRESHOLD.value |
BadPixels.GAIN_THRESHOLDING_ERROR.value: ('MIXED', '#BFDF009F')}
if high_res_badpix_3d:
cols = {
BadPixels.NOISE_OUT_OF_THRESHOLD: (BadPixels.NOISE_OUT_OF_THRESHOLD.name, '#FF000080'),
BadPixels.OFFSET_NOISE_EVAL_ERROR: (BadPixels.OFFSET_NOISE_EVAL_ERROR.name, '#0000FF80'),
BadPixels.OFFSET_OUT_OF_THRESHOLD: (BadPixels.OFFSET_OUT_OF_THRESHOLD.name, '#00FF0080'),
BadPixels.GAIN_THRESHOLDING_ERROR: (BadPixels.GAIN_THRESHOLDING_ERROR.name, '#FF40FF40'),
BadPixels.OFFSET_OUT_OF_THRESHOLD | BadPixels.NOISE_OUT_OF_THRESHOLD: ('OFFSET_OUT_OF_THRESHOLD + NOISE_OUT_OF_THRESHOLD', '#DD00DD80'),
BadPixels.OFFSET_OUT_OF_THRESHOLD | BadPixels.NOISE_OUT_OF_THRESHOLD |
BadPixels.GAIN_THRESHOLDING_ERROR: ('MIXED', '#BFDF009F')
}
display(Markdown("""
## Global Bad Pixel Behaviour ##
The following plots show the results of bad pixel evaluation for all evaluated memory cells.
Cells are stacked in the Z-dimension, while pixels values in x/y are rebinned with a factor of 2.
This excludes single bad pixels present only in disconnected pixels.
Hence, any bad pixels spanning at least 4 pixels in the x/y-plane, or across at least two memory cells are indicated.
Colors encode the bad pixel type, or mixed type.
"""))
gnames = ['High Gain', 'Medium Gain', 'Low Gain']
for gain in range(3):
display(Markdown(f'### {gnames[gain]} ###'))
for mod, data in badpix_g.items():
plot_badpix_3d(data[...,gain], cols, title=mod, rebin_fac=1)
plt.show()
```
%% Cell type:markdown id: tags:
## Aggregate values, and per Cell behaviour ##
The following tables and plots give an overview of statistical aggregates for each constant, as well as per cell behavior.
%% Cell type:code id: tags:
``` python
create_constant_overview(offset_g, "Offset (ADU)", max_cells, 4000, 8000,
badpixels=[badpix_g, np.nan])
```
%% Cell type:code id: tags:
``` python
create_constant_overview(noise_g, "Noise (ADU)", max_cells, 0, 100,
badpixels=[badpix_g, np.nan])
```
%% Cell type:code id: tags:
``` python
if not fixed_gain_mode:
# Plot only three gain threshold maps.
bp_thresh = OrderedDict()
for mod, con in badpix_g.items():
bp_thresh[mod] = np.zeros((con.shape[0], con.shape[1], con.shape[2], 5), dtype=con.dtype)
bp_thresh[mod][...,:2] = con[...,:2]
bp_thresh[mod][...,2:] = con
create_constant_overview(thresholds_g, "Threshold (ADU)", max_cells, 4000, 10000, 5,
badpixels=[bp_thresh, np.nan],
gmap=['HG-MG Threshold', 'MG-LG Threshold', 'High gain', 'Medium gain', 'low gain'],
marker=['d','d','','','']
)
```
%% Cell type:code id: tags:
``` python
bad_pixel_aggregate_g = OrderedDict()
for m, d in badpix_g.items():
bad_pixel_aggregate_g[m] = d.astype(np.bool).astype(np.float)
create_constant_overview(bad_pixel_aggregate_g, "Bad pixel fraction", max_cells, 0, 0.10, 3)
```
%% Cell type:markdown id: tags:
## Summary tables ##
The following tables show summary information for the evaluated module. Values for currently evaluated constants are compared with values for pre-existing constants retrieved from the calibration database.
%% Cell type:code id: tags:
``` python
display(Markdown('The following pre-existing constants are used for comparison: \n'))
for key in old_mdata:
display(Markdown('**{}** at {}'.format(key, old_mdata[key])))
# now we need the old constants
old_const = {}
old_mdata = {}
old_retrieval_res.wait()
for (qm, const), (data, timestamp) in zip(qm_x_const, old_retrieval_res.get()):
old_const.setdefault(qm, {})[const] = data
old_mdata.setdefault(qm, {})[const] = timestamp
```
%% Cell type:code id: tags:
``` python
display(Markdown("The following pre-existing constants are used for comparison:"))
for qm, consts in old_mdata.items():
display(Markdown(f"- {qm}"))
for const in consts:
display(Markdown(f" - {const} at {consts[const]}"))
```
%% Cell type:code id: tags:
``` python
table = []
gain_names = ['High', 'Medium', 'Low']
bits = [BadPixels.NOISE_OUT_OF_THRESHOLD, BadPixels.OFFSET_OUT_OF_THRESHOLD, BadPixels.OFFSET_NOISE_EVAL_ERROR, BadPixels.GAIN_THRESHOLDING_ERROR]
for qm in badpix_g.keys():
for gain in range(3):
l_data = []
l_data_old = []
data = np.copy(badpix_g[qm][:,:,:,gain])
datau32 = data.astype(np.uint32)
l_data.append(len(datau32[datau32>0].flatten()))
for bit in bits:
l_data.append(np.count_nonzero(badpix_g[qm][:,:,:,gain] & bit.value))
l_data.append(np.count_nonzero(badpix_g[qm][:,:,:,gain] & bit))
if old_const['BadPixelsDark'] is not None:
dataold = np.copy(old_const['BadPixelsDark'][:, :, :, gain])
if old_const[qm]['BadPixelsDark'] is not None:
dataold = np.copy(old_const[qm]['BadPixelsDark'][:, :, :, gain])
datau32old = dataold.astype(np.uint32)
l_data_old.append(len(datau32old[datau32old>0].flatten()))
for bit in bits:
l_data_old.append(np.count_nonzero(old_const['BadPixelsDark'][:, :, :, gain] & bit.value))
l_data_old.append(np.count_nonzero(old_const[qm]['BadPixelsDark'][:, :, :, gain] & bit))
l_data_name = ['All bad pixels', 'NOISE_OUT_OF_THRESHOLD',
'OFFSET_OUT_OF_THRESHOLD', 'OFFSET_NOISE_EVAL_ERROR', 'GAIN_THRESHOLDING_ERROR']
l_threshold = ['', f'{thresholds_noise_sigma}' f'{thresholds_noise_hard[gain]}',
f'{thresholds_offset_sigma}' f'{thresholds_offset_hard[gain]}',
'', f'{thresholds_gain_sigma}']
for i in range(len(l_data)):
line = [f'{l_data_name[i]}, {gain_names[gain]} gain', l_threshold[i], l_data[i]]
if old_const['BadPixelsDark'] is not None:
if old_const[qm]['BadPixelsDark'] is not None:
line += [l_data_old[i]]
else:
line += ['-']
table.append(line)
table.append(['', '', '', ''])
display(Markdown('''
### Number of bad pixels ###
### Number of bad pixels
One pixel can be bad for different reasons, therefore, the sum of all types of bad pixels can be more than the number of all bad pixels.
'''))
if len(table)>0:
md = display(Latex(tabulate.tabulate(table, tablefmt='latex',
headers=["Pixel type", "Threshold",
"New constant", "Old constant"])))
```
%% Cell type:code id: tags:
``` python
header = ['Parameter',
"New constant", "Old constant ",
"New constant", "Old constant ",
"New constant", "Old constant ",
"New constant", "Old constant "]
if fixed_gain_mode:
constants = ['Offset', 'Noise']
else:
constants = ['Offset', 'Noise', 'ThresholdsDark']
for const in constants:
constants_x_qms = list(itertools.product(constants, res.keys()))
def compute_table(const, qm):
if const == 'ThresholdsDark':
table = [['','HG-MG threshold', 'HG-MG threshold', 'MG-LG threshold', 'MG-LG threshold']]
else:
table = [['','High gain', 'High gain', 'Medium gain', 'Medium gain', 'Low gain', 'Low gain']]
for qm in res.keys():
data = np.copy(res[qm][const])
compare_with_old_constant = old_const[qm][const] is not None and \
old_const[qm]['BadPixelsDark'] is not None
data = np.copy(res[qm][const])
if const == 'ThresholdsDark':
data[...,0][res[qm]['BadPixelsDark'][...,0]>0] = np.nan
data[...,1][res[qm]['BadPixelsDark'][...,1]>0] = np.nan
else:
data[res[qm]['BadPixelsDark']>0] = np.nan
if compare_with_old_constant:
data_old = np.copy(old_const[qm][const])
if const == 'ThresholdsDark':
data[...,0][res[qm]['BadPixelsDark'][...,0]>0] = np.nan
data[...,1][res[qm]['BadPixelsDark'][...,1]>0] = np.nan
data_old[...,0][old_const[qm]['BadPixelsDark'][...,0]>0] = np.nan
data_old[...,1][old_const[qm]['BadPixelsDark'][...,1]>0] = np.nan
else:
data[res[qm]['BadPixelsDark']>0] = np.nan
data_old[old_const[qm]['BadPixelsDark']>0] = np.nan
if old_const[const] is not None and old_const['BadPixelsDark'] is not None:
dataold = np.copy(old_const[const])
if const == 'ThresholdsDark':
dataold[...,0][old_const['BadPixelsDark'][...,0]>0] = np.nan
dataold[...,1][old_const['BadPixelsDark'][...,1]>0] = np.nan
f_list = [np.nanmedian, np.nanmean, np.nanstd, np.nanmin, np.nanmax]
n_list = ['Median', 'Mean', 'Std', 'Min', 'Max']
def compute_row(i):
line = [n_list[i]]
for gain in range(3):
# Compare only 3 threshold gain-maps
if gain == 2 and const == 'ThresholdsDark':
continue
stat_measure = f_list[i](data[...,gain])
line.append(f"{stat_measure:6.1f}")
if compare_with_old_constant:
old_stat_measure = f_list[i](data_old[...,gain])
line.append(f"{old_stat_measure:6.1f}")
else:
dataold[old_const['BadPixelsDark']>0] = np.nan
line.append("-")
return line
f_list = [np.nanmedian, np.nanmean, np.nanstd, np.nanmin, np.nanmax]
n_list = ['Median', 'Mean', 'Std', 'Min', 'Max']
for i, f in enumerate(f_list):
line = [n_list[i]]
for gain in range(3):
# Compare only 3 threshold gain-maps
if gain == 2 and const == 'ThresholdsDark':
continue
line.append('{:6.1f}'.format(f(data[...,gain])))
if old_const[const] is not None and old_const['BadPixelsDark'] is not None:
line.append('{:6.1f}'.format(f(dataold[...,gain])))
else:
line.append('-')
with multiprocessing.pool.ThreadPool(processes=multiprocessing.cpu_count() // len(constants_x_qms)) as pool:
rows = pool.map(compute_row, range(len(f_list)))
table.append(line)
table.extend(rows)
return table
with multiprocessing.Pool(processes=len(constants_x_qms)) as pool:
tables = pool.starmap(compute_table, constants_x_qms)
display(Markdown('### {} [ADU], good pixels only ###'.format(const)))
md = display(Latex(tabulate.tabulate(table, tablefmt='latex', headers=header)))
for (const, qm), table in zip(constants_x_qms, tables):
display(Markdown(f"### {qm}: {const} [ADU], good pixels only"))
display(Latex(tabulate.tabulate(table, tablefmt='latex', headers=header)))
```
......
%% Cell type:markdown id: tags:
# Test Notebook - CLI
Author: Robert Rosca
Version: 0.1
Notebook for use with the unit and continuous integration tests.
%% Cell type:code id: tags:
```
in_folder = "/dev/null" # input folder
out_folder = "/dev/null" # output folder
list_normal = [10] # parameterized list, range allowed
list_intellilist = [2345] # parameterized list with ranges, range allowed
concurrency_parameter = [1] # concurrency parameter, range allowed
number = 0 # parameterized number
```
%% Cell type:markdown id: tags:
Tests notebook execution by just creating an empty file in the output directory.
%% Cell type:code id: tags:
```
from pathlib import Path
```
%% Cell type:code id: tags:
```
in_folder = Path(in_folder)
(in_folder / "touch").touch()
```
......@@ -4,6 +4,12 @@ requires = ["cython==0.29.21", "numpy==1.19.1", "setuptools>=40.8.0", "wheel"]
[tool.isort]
profile = "black"
[tool.pylint.messages_control]
disable = "C0330, C0326"
[tool.pylint.format]
max-line-length = "88"
[tool.pytest.ini_options]
norecursedirs = [
"legacy",
......@@ -15,5 +21,11 @@ norecursedirs = [
"dist",
"node_modules",
"venv",
"{arch}"
"{arch}",
]
required_plugins = [
"pytest-asyncio",
"pytest-cov",
"pytest-subprocess",
]
......@@ -74,14 +74,14 @@ setup(
ext_modules=cythonize(ext_modules),
install_requires=[
"iCalibrationDB @ git+ssh://git@git.xfel.eu:10022/detectors/cal_db_interactive.git@2.0.5", # noqa
"nbparameterise @ git+ssh://git@git.xfel.eu:10022/detectors/nbparameterise.git@0.3", # noqa
"XFELDetectorAnalysis @ git+ssh://git@git.xfel.eu:10022/karaboDevices/pyDetLib.git@2.5.6-2.10.0#subdirectory=lib", # noqa
"pasha @ git+https://github.com/European-XFEL/pasha.git@0.1.0",
"Cython==0.29.21",
"Jinja2==2.11.2",
"astcheck==0.2.5",
"astsearch==0.1.3",
"dill==0.3.0",
"extra_data==1.2.0",
"extra_data==1.4.1",
"extra_geom==1.1.1",
"fabio==0.9.0",
"gitpython==3.1.0",
......@@ -101,6 +101,7 @@ setup(
"nbclient==0.5.1",
"nbconvert==5.6.1",
"nbformat==5.0.7",
"nbparameterise==0.5",
"notebook==6.1.5",
"numpy==1.19.1",
"prettytable==0.7.2",
......@@ -125,6 +126,7 @@ setup(
"nbval",
"pytest-asyncio",
"pytest-cov",
"pytest-subprocess",
"pytest>=5.4.0",
"testpath",
"unittest-xml-reporting==3.0.2",
......
......@@ -15,16 +15,28 @@ from pathlib import Path
from subprocess import DEVNULL, check_output
from typing import List, Union
import cal_tools.tools
import nbconvert
import nbformat
import numpy as np
from jinja2 import Template
from nbparameterise import extract_parameters, parameter_values, replace_definitions
import cal_tools.tools
from .finalize import tex_escape
from .notebooks import notebooks
from .settings import *
from .settings import (
default_report_path,
free_nodes_cmd,
launcher_command,
max_reserved,
preempt_nodes_cmd,
python_path,
reservation,
reservation_char,
sprof,
temp_path,
try_report_to_output,
)
PKG_DIR = os.path.dirname(os.path.abspath(__file__))
......@@ -115,7 +127,6 @@ def make_intelli_list(ltype):
super(IntelliListAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
parsed_values = []
values = ",".join(values)
if isinstance(values, str):
......@@ -171,7 +182,7 @@ def extract_title_author_version(nb):
# In case of a standard installation a version is stored
# in the _version.py file
try:
git_dir = os.path.join(PKG_DIR, '..', '.git')
git_dir = os.path.join(PKG_DIR, '..', '..', '.git')
version = check_output([
'git', f'--git-dir={git_dir}', 'describe', '--tag',
], stderr=DEVNULL).decode('utf8')
......@@ -284,8 +295,7 @@ def balance_sequences(in_folder: str, run: int, sequences: List[int],
if isinstance(karabo_da, str):
karabo_da = [karabo_da]
elif not isinstance(karabo_da, list):
raise ValueError("Balance sequences expects "
"karabo_da as a string or list.")
raise TypeError("Balance sequences expects `karabo_da` as a string or list.")
in_path = Path(in_folder, f"r{run:04d}")
......@@ -305,8 +315,10 @@ def balance_sequences(in_folder: str, run: int, sequences: List[int],
if sequences != [-1]:
seq_nums = sorted(seq_nums.intersection(sequences))
if len(seq_nums) == 0:
raise ValueError(f"Selected sequences {sequences} are not "
f"available in {in_path}")
raise ValueError(
f"Selected sequences {sequences} are not "
f"available in {in_path}"
)
# Validate required nodes with max_nodes
nsplits = len(seq_nums) // sequences_per_node
......@@ -332,6 +344,7 @@ def make_extended_parser() -> argparse.ArgumentParser:
try:
det_notebooks = notebooks[detector]
except KeyError:
# TODO: This should really go to stderr not stdout
print("Not one of the known detectors: {}".format(notebooks.keys()))
sys.exit(1)
......@@ -394,7 +407,7 @@ def add_args_from_nb(nb, parser, cvar=None, no_required=False):
:param bool no_required: If True, none of the added options are required.
"""
parser.description = make_epilog(nb)
parms = extract_parameters(nb, lang='python3')
parms = extract_parameters(nb, lang='python')
for p in parms:
......@@ -663,7 +676,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
suffix = flatten_list(cval)
# first convert the notebook
parms = extract_parameters(nb, lang='python3')
parms = extract_parameters(nb, lang='python')
if has_parm(parms, "cluster_profile"):
cluster_profile = f"{args['cluster_profile']}_{suffix}"
......@@ -673,7 +686,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
params = parameter_values(parms, **args)
params = parameter_values(params, cluster_profile=cluster_profile)
new_nb = replace_definitions(nb, params, execute=False, lang='python3')
new_nb = replace_definitions(nb, params, execute=False, lang='python')
if not show_title:
first_markdown_cell(new_nb).source = ''
set_figure_format(new_nb, args["vector_figs"])
......@@ -682,9 +695,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
os.path.basename(base_name), cparm, suffix)
nbpath = os.path.join(temp_path, new_name)
with open(nbpath, "w") as f:
f.write(nbconvert.exporters.export(
nbconvert.NotebookExporter, new_nb)[0])
nbformat.write(new_nb, nbpath)
# add finalization to the last job
if final_job:
......@@ -832,7 +843,7 @@ def run():
if ext_func is not None:
extend_params(nb, ext_func)
parms = extract_parameters(nb, lang='python3')
parms = extract_parameters(nb, lang='python')
title, author, version = extract_title_author_version(nb)
......
......@@ -233,4 +233,14 @@ notebooks = {
"cluster cores": 16},
},
},
"TEST": {
"TEST-CLI": {
"notebook": "notebooks/test/test-cli.ipynb",
"concurrency": {
"parameter": "concurrency_parameter",
"default concurrency": None,
"cluster cores": 1,
},
},
},
}
......@@ -17,7 +17,7 @@ try_report_to_output = True
logo_path = "xfel.pdf"
# the command to run this concurrently. It is prepended to the actual call
sprof = os.environ.get("XFELCALSLURM", "exfel")
sprof = os.environ.get("XFELCALSLURM", "exfel") # TODO: https://git.xfel.eu/gitlab/calibration/planning/issues/3
launcher_command = "sbatch -t 24:00:00 --requeue --output {temp_path}/slurm-%j.out"
free_nodes_cmd = "sinfo -p exfel -t idle -N --noheader | wc -l"
preempt_nodes_cmd = "squeue -p all,grid --noheader | grep max-exfl | egrep -v 'max-exfl18[3-8]|max-exfl100|max-exflg' | wc -l"
......
......@@ -10,7 +10,7 @@
\end{figure}
\vspace{0mm}
\Large \textbf{ Detector group }
\Large \textbf{ Data department }
Based on data sample: {{ data_path }}
......
from pathlib import Path
import pytest
def pytest_addoption(parser):
parser.addoption(
"--no-gpfs",
action="store_true",
default="false",
help="Skips tests marked as requiring GPFS access",
)
def pytest_configure(config):
config.addinivalue_line(
"markers", "requires_gpfs(): marks skips for tests that require GPFS access"
)
def pytest_runtest_setup(item):
if list(item.iter_markers(name="requires_gpfs")) and (
not Path("/gpfs").is_dir() or item.config.getoption("--no-gpfs")
):
pytest.skip("gpfs not available")
......@@ -21,6 +21,7 @@ def test_show_processed_modules():
assert 'LDP' in err.value()
@pytest.mark.requires_gpfs
def test_dir_creation_date():
folder = '/gpfs/exfel/exp/CALLAB/202031/p900113/raw'
......@@ -54,46 +55,48 @@ cal_db_interface = "tcp://max-exfl017:8020"
def test_get_pdu_from_db():
snapshot_at = "2021-05-06 00:20:10.00"
# A karabo_da str returns a list of one element.
pdu_dict = get_pdu_from_db(karabo_id="TEST_DET_CI-2",
karabo_da="TEST_DAQ_DA_01",
pdu_dict = get_pdu_from_db(karabo_id="TEST_DET_CAL_CI-1",
karabo_da="TEST_DET_CAL_DA0",
constant=constant,
condition=condition,
cal_db_interface=cal_db_interface,
snapshot_at="2021-03-01 09:44:00+00:00",
snapshot_at=snapshot_at,
timeout=30000)
assert len(pdu_dict) == 1
assert pdu_dict[0] == 'PHYSICAL_DETECTOR_UNIT-1_DO_NOT_DELETE'
assert pdu_dict[0] == "CAL_PHYSICAL_DETECTOR_UNIT-1_TEST"
# A list of karabo_das to return thier PDUs, if available.
pdu_dict = get_pdu_from_db(karabo_id="TEST_DET_CI-2",
karabo_da=["TEST_DAQ_DA_01", "TEST_DAQ_DA_02",
pdu_dict = get_pdu_from_db(karabo_id="TEST_DET_CAL_CI-1",
karabo_da=["TEST_DET_CAL_DA0", "TEST_DET_CAL_DA1",
"UNAVAILABLE_DA"],
constant=constant,
condition=condition,
cal_db_interface=cal_db_interface,
snapshot_at="2021-03-01 09:44:00+00:00",
snapshot_at=snapshot_at,
timeout=30000)
assert pdu_dict == ['PHYSICAL_DETECTOR_UNIT-1_DO_NOT_DELETE',
'PHYSICAL_DETECTOR_UNIT-2_DO_NOT_DELETE',
assert pdu_dict == ["CAL_PHYSICAL_DETECTOR_UNIT-1_TEST",
"CAL_PHYSICAL_DETECTOR_UNIT-2_TEST",
None]
# "all" is used to return all corresponding units for a karabo_id.
pdu_dict = get_pdu_from_db(karabo_id="TEST_DET_CI-2",
pdu_dict = get_pdu_from_db(karabo_id="TEST_DET_CAL_CI-1",
karabo_da="all",
constant=constant,
condition=condition,
cal_db_interface="tcp://max-exfl017:8020",
snapshot_at="2021-03-01 09:44:00+00:00",
cal_db_interface=cal_db_interface,
snapshot_at=snapshot_at,
timeout=30000)
assert len(pdu_dict) == 3
assert pdu_dict == ['PHYSICAL_DETECTOR_UNIT-1_DO_NOT_DELETE',
'PHYSICAL_DETECTOR_UNIT-2_DO_NOT_DELETE',
'PHYSICAL_DETECTOR_UNIT-3_DO_NOT_DELETE']
assert len(pdu_dict) == 2
assert pdu_dict == ["CAL_PHYSICAL_DETECTOR_UNIT-1_TEST",
"CAL_PHYSICAL_DETECTOR_UNIT-2_TEST"]
@pytest.mark.requires_gpfs
def test_initialize_from_db():
creation_time = datetime.strptime("2020-01-07 13:26:48.00",
"%Y-%m-%d %H:%M:%S.%f")
......
import pytest
from xfel_calibrate.calibrate import balance_sequences
def test_balance_sequences():
ret = balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[0, 2, 5, 10, 20, 50, 100],
sequences_per_node=1, karabo_da=["all"],
max_nodes=8)
expected = [[0], [2]]
assert expected == ret
ret = balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[-1],
sequences_per_node=1, karabo_da=["JNGFR01"],
max_nodes=3)
expected = []
assert expected == ret
with pytest.raises(ValueError) as e:
balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[1991, 2021],
sequences_per_node=1, karabo_da=["all"],
max_nodes=3)
assert 'Selected sequences [1991, 2021]]' in e.value()
with pytest.raises(ValueError) as e:
balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[1991, 2021],
sequences_per_node=1, karabo_da=-1,
max_nodes=3)
assert 'karabo_da as a string or list' in e.value()
......@@ -9,6 +9,7 @@ sys.path.insert(0, Path(__file__).parent / 'webservice')
from webservice.webservice import check_files, merge, parse_config, wait_on_transfer
@pytest.mark.requires_gpfs
def test_check_files():
in_folder = '/gpfs/exfel/exp/CALLAB/202031/p900113/raw'
runs = [9985, 9984]
......
# pylint: disable=missing-module-docstring
import sys
from pathlib import Path
from typing import Callable, Dict, List, NamedTuple, Optional, Union
from unittest import mock
import extra_data.tests.make_examples as make_examples
from pytest_subprocess import FakeProcess
from xfel_calibrate import calibrate, settings
class FakeProcessCalibrate(FakeProcess):
"""Class used to create a fake process which will mock calls to processes
expected to be called by pycalibration
"""
def __init__(self):
"""Sets up fake process for use with xfel calibrate
Fakes:
- Slurm free nodes call
- Slurm preempt nodes call
- Slurm sbatch call
- Git describe calls (for pulling repo version)
Expected use is something like:
```
@pytest.fixture(scope="class", autouse=True)
def fake_process_calibrate(self):
with FakeProcessCalibrate() as fake_process:
yield fake_process
```
"""
super().__init__()
# Fake calls to slurm
self.register_subprocess(settings.free_nodes_cmd, stdout=["1"])
self.register_subprocess(settings.preempt_nodes_cmd, stdout=["1"])
self.register_subprocess(
["sbatch", self.any()], stdout=["Submitted batch job 000000"]
)
# For the version insertion...
self.register_subprocess(
["git", self.any(), "describe", "--tag"], stdout=["0.0.0"]
)
self.keep_last_process(True)
class MockProposal:
"""Class used with pytest to create proposal directories with placeholder
files
Use this to generate a skeleton of the proposal structure for tests which
only rely on the files being present and not their contents, e.g. balancing
by sequences done in some notebooks
"""
def __init__(
self,
*,
tmp_path: Path,
runs: Union[int, list, slice],
instrument: str,
cycle: str,
proposal: str,
sequences: int = 2,
):
"""Create a MockProposal object, this should be used to create a fixture
Expected use is something like:
```python3
@pytest.fixture
def mock_proposal(tmp_path):
return MockProposal(tmp_path, runs=4)
```
Then `mock_proposal` can be passed as a fixture (or set to autouse) to
make the object available for individual or class tests
Args:
tmp_path (Path): Temporary path, should come from pytest fixture
runs (Union[int, list, slice]): Defines what run directories to make
instrument (str, optional): Instrument name, e.g. "AGIPD".
cycle (str, optional): Cycle, e.g. "202031".
proposal (str, optional): Proposal number, e.g. "p900113".
sequences (int, optional): Number of sequence files. Defaults to 2.
"""
self.tmp_path = tmp_path
self.instrument = instrument
self.cycle = cycle
self.proposal = proposal
self.sequences = list(
range(sequences)
) # TODO: Implement once it's in extra-data
self.path = tmp_path / instrument / cycle / proposal
self.path_raw = self.path / "raw"
self.path_proc = self.path / "proc"
self.runs = self.create_runs(runs)
def create_runs(self, runs: Union[int, List[int], slice]) -> Dict[int, Path]:
"""Create run directories with skeleton files for the proposal
Args:
runs (Union[int, list, slice]): Defines what run directories to make
Returns:
[Dict[int, Path]]: Dictionary of the run number and run directory
"""
if isinstance(runs, int):
runs = list(range(runs))
elif isinstance(runs, list):
if not all(isinstance(r, int) for r in runs):
raise TypeError("lists of runs must contain only integers")
elif isinstance(runs, slice):
if runs.stop:
runs = list(range(runs.stop)[runs])
else:
raise ValueError("runs slice must have a stop value")
else:
raise TypeError("runs must be an int, list of integers, or a slice")
run_directories = {run: self.path_raw / f"r{run:04d}" for run in runs}
for run_path in run_directories.values():
run_path.mkdir(parents=True)
self._create_run(run_path)
return run_directories
@property
def _create_run(self) -> Callable:
"""Return the number of modules for a detector type
Returns:
modules [Iterable]: Number of modules for a detector type
"""
return {
"AGIPD": make_examples.make_spb_run,
"DSSC": make_examples.make_scs_run,
"JGFR": make_examples.make_jungfrau_run,
"LPD": make_examples.make_fxe_run,
"None": lambda x: None,
}[self.instrument]
class CalibrateCall:
"""Class used with pytest to create call `xfel-calibrate` and log its output
Use this to manage calls to `xfel-calibrate` via pytest fixtures
"""
def __init__(
self,
tmp_path,
capsys,
*,
detector: str,
cal_type: str,
command: str = "xfel-calibrate",
in_folder: Optional[Path] = None,
out_folder: Optional[Path] = None,
extra_args: List[str] = None,
):
"""Create a CallibrateCall object, this should be used to create a fixture
Expected use is something like:
```python3
@pytest.fixture(scope="function")
def calibrate_call(self, mock_proposal: MockProposal, capsys, tmp_path):
return CalibrateCall(
tmp_path,
capsys,
in_folder=mock_proposal.path_raw,
out_folder=mock_proposal.path_proc,
command="xfel-calibrate",
detector="AGIPD",
cal_type="CORRECT",
extra_args=["--run", "0"],
)
```
Args:
tmp_path ([type]): Temporary path, should come from pytest fixture
capsys ([type]): capsys path, should come from pytest fixture
detector (str): Detector passed to the command, e.g. AGIPD
cal_type (str): Calibration type passed to the command, e.g. CORRECT
command (str): Main entrypoint called. Defaults to "xfel-calibrate".
in_folder (Optional[Path], optional): Path to the input folder, usually
raw. Defaults to None.
out_folder (Optional[Path], optional): Path to the output folder, usually
proc. Defaults to None.
extra_args (List[str], optional): Additional arguments to pass to the
command. Defaults to None.
"""
self.tmp_path = tmp_path
self.in_folder = in_folder
self.out_folder = out_folder
self.args = []
self.args.extend([command, detector, cal_type])
if in_folder:
self.args.extend(["--in-folder", str(self.in_folder)])
if out_folder:
self.args.extend(["--out-folder", str(self.out_folder)])
if extra_args:
self.args.extend(extra_args)
with mock.patch.object(sys, "argv", self.args):
with mock.patch.object(calibrate, "temp_path", str(tmp_path)):
calibrate.run()
out, err = capsys.readouterr()
self.out: str = out
self.err: str = err
Paths = NamedTuple(
"Paths",
[
("notebooks", List[Path]),
("run_calibrate", Path),
("finalize", Path),
("InputParameters", Path),
],
)
self.paths = Paths(
notebooks=list(self.tmp_path.glob("**/*/*.ipynb")),
run_calibrate=list(self.tmp_path.glob("**/*/run_calibrate.sh"))[0],
finalize=list(self.tmp_path.glob("**/*/finalize.py"))[0],
InputParameters=list(self.tmp_path.glob("**/*/InputParameters.rst"))[0],
)
# TODO: These are unit tests, `test_cli.py` contains integration tests, may be
# worth splitting these up in the future so that it's easier to track
# what's-what, track the coverage of both approaches individually, and run them
# independently from each other
import pytest
from xfel_calibrate.calibrate import balance_sequences
@pytest.mark.parametrize(
"karabo_da,sequences,expected",
[
pytest.param(
"all",
[0, 2, 5, 10, 20, 50, 100],
[[0], [2]],
marks=pytest.mark.requires_gpfs(),
),
("JNGFR01", [-1], []),
],
)
def test_balance_sequences(karabo_da, sequences, expected):
ret = balance_sequences(
in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw",
run=9992,
sequences=sequences,
sequences_per_node=1,
karabo_da=karabo_da,
max_nodes=8,
)
assert ret == expected
@pytest.mark.parametrize(
"karabo_da,sequences,expected",
[
pytest.param("all", [1991, 2021], ValueError, marks=pytest.mark.requires_gpfs()),
(-1, [], TypeError),
],
)
def test_balance_sequences_raises(karabo_da, sequences, expected):
with pytest.raises(expected):
balance_sequences(
in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw",
run=9992,
sequences=sequences,
sequences_per_node=1,
karabo_da=karabo_da,
max_nodes=8,
)
# pylint: disable=missing-class-docstring, missing-function-docstring, no-self-use
"""Tests for the CLI portion of `xfel_calibrate`
These tests cover the CLI interface which is called by the `xfel-calibrate ...`
entrypoint. Some sections of the `calibrate.py` file are still not covered by
the current test cases, this should be improved later on.
"""
import ast
import shlex
import sys
from datetime import date
from pathlib import Path
from unittest import mock
import nbformat
import pytest
from nbparameterise import extract_parameters
import xfel_calibrate.calibrate as calibrate
from tests.test_xfel_calibrate.conftest import (
CalibrateCall,
FakeProcessCalibrate,
MockProposal,
)
class TestBasicCalls:
"""Tests which only call the command line utility `xfel-calibrate` and check
that the expected output is present in stdout
"""
@mock.patch.object(sys, "argv", ["xfel-calibrate", "--help"])
def test_help(self, capsys):
with pytest.raises(SystemExit):
calibrate.run()
out, err = capsys.readouterr()
# Should always be present in these help outputs
assert "positional arguments:" in out
assert "optional arguments:" in out
assert err == ""
@mock.patch.object(sys, "argv", ["xfel-calibrate", "TEST", "-h"])
def test_help_detector(self, capsys):
with pytest.raises(SystemExit):
calibrate.run()
out, err = capsys.readouterr()
assert "Notebook for use with the unit and continuous integration" in out
assert "tests." in out
assert err == ""
@mock.patch.object(sys, "argv", ["xfel-calibrate", "NotADetector", "beep", "-h"])
def test_unknown_detector(self, capsys):
with pytest.raises(SystemExit) as exit_exception:
calibrate.run()
out, err = capsys.readouterr()
assert exit_exception.value.code == 1
assert "Not one of the known calibrations or detectors" in out
assert err == ""
@mock.patch.object(sys, "argv", ["xfel-calibrate", "NotADetector", "-h"])
def test_unknown_detector_h(self, capsys):
with pytest.raises(SystemExit) as exit_exception:
calibrate.run()
out, err = capsys.readouterr()
assert exit_exception.value.code == 1
assert "Not one of the known detectors" in out
assert err == ""
@mock.patch.object(sys, "argv", ["xfel-calibrate", "Tutorial", "TEST", "--help"])
def test_help_nb(self, capsys):
with pytest.raises(SystemExit):
calibrate.run()
out, err = capsys.readouterr()
# Should always be present in these help outputs
assert "positional arguments:" in out
assert "optional arguments:" in out
# Defined in the test notebook, should be propagated to help output
assert "sensor-size" in out
assert "random-seed" in out
assert err == ""
class TestTutorialNotebook:
"""Checks calling `xfel-calibrate` on the `Tutorial TEST` notebook, looks
at the stdout as well as files generated by the call
"""
@pytest.fixture(scope="class", autouse=True)
def fake_process_calibrate(self):
with FakeProcessCalibrate() as fake_process:
yield fake_process
@pytest.fixture(scope="class")
def mock_proposal(self, tmp_path_factory):
return MockProposal(
tmp_path=tmp_path_factory.mktemp("exp"),
instrument="None",
cycle="000000",
proposal="p000000",
runs=0,
sequences=0,
)
@pytest.fixture(scope="function")
def calibrate_call(
self,
mock_proposal: MockProposal,
capsys,
tmp_path,
):
return CalibrateCall(
tmp_path,
capsys,
out_folder=mock_proposal.path_proc,
detector="Tutorial",
cal_type="TEST",
extra_args=["--runs", "1000"],
)
def test_call(
self,
calibrate_call: CalibrateCall,
):
assert "sbatch" in calibrate_call.out
assert "--job-name xfel_calibrate" in calibrate_call.out
assert str(calibrate_call.tmp_path) in calibrate_call.out
assert "Submitted job: " in calibrate_call.out
assert calibrate_call.err == ""
def test_expected_processes_called(
self,
fake_process_calibrate: FakeProcessCalibrate,
):
process_calls = [
list(shlex.shlex(p, posix=True, punctuation_chars=True))
if isinstance(p, str)
else p
for p in fake_process_calibrate.calls
]
processes_called = [p[0] for p in process_calls] # List of the process names
assert "sbatch" in processes_called
@pytest.mark.skip(reason="not implemented")
def test_output_metadata_yml(self):
# TODO: Finish this test later, not a priority
# metadata_yml_path = list(self.tmp_path.glob("**/calibration_metadata.yml"))
pass
def test_output_ipynb(self, calibrate_call: CalibrateCall):
notebook_path = calibrate_call.paths.notebooks
assert len(notebook_path) == 1
with notebook_path[0].open() as file:
notebook = nbformat.read(file, as_version=4)
parameters = {p.name: p.value for p in extract_parameters(notebook)}
assert parameters["out_folder"] == str(calibrate_call.out_folder)
assert parameters["sensor_size"] == [10, 30]
assert parameters["random_seed"] == [2345]
assert parameters["runs"] == 1000
def test_output_finalize(
self, mock_proposal: MockProposal, calibrate_call: CalibrateCall
):
# TODO: Specify `feature_version` once we run on python 3.8+
finalize_ast = ast.parse(calibrate_call.paths.finalize.read_text())
today = date.today()
expected_equals = {
"joblist": [],
"project": "Tutorial Calculation",
"calibration": "Tutorial Calculation",
"author": "Astrid Muennich",
"version": "0.0.0",
"data_path": "",
}
expected_contains = {
"request_time": str(today),
"submission_time": str(today),
"run_path": str(calibrate_call.tmp_path),
# TODO: add a test checking that the out folder is correct
# reffer to: https://git.xfel.eu/gitlab/detectors/pycalibration/issues/52
"out_path": str(mock_proposal.path_proc),
"report_to": str(mock_proposal.path_proc),
}
# Pull the keyword arguments out of the finalize function call via the AST,
# here we use the keys in `expected_...` to filter which kwargs are parsed
# as some cannot be read
finalize_kwargs = {
k.arg: ast.literal_eval(k.value)
for k in ast.walk(finalize_ast)
if isinstance(k, ast.keyword)
and (k.arg in expected_equals or k.arg in expected_contains)
}
for k, v in expected_equals.items():
assert v == finalize_kwargs[k]
for k, v in expected_contains.items():
assert v in finalize_kwargs[k]
@pytest.mark.skip(reason="not implemented")
def test_output_rst(self, calibrate_call: CalibrateCall):
# TODO: Finish this test later, not a priority
# rst_path = calibrate_call.paths.InputParameters
pass
def test_output_sh(self, calibrate_call: CalibrateCall):
cmd = list(
shlex.shlex(
calibrate_call.paths.run_calibrate.read_text(),
posix=True,
punctuation_chars=True,
)
)
assert (
cmd[0] == "xfel-calibrate"
), f"{calibrate_call.paths.run_calibrate} does not call `xfel-calibrate`"
assert cmd[1:3] == ["Tutorial", "TEST"]
assert {"--out-folder", str(calibrate_call.out_folder)}.issubset(cmd)
assert {"--runs", "1000"}.issubset(cmd)
class TestIntelliList:
@pytest.fixture(scope="class", autouse=True)
def fake_process_calibrate(self):
with FakeProcessCalibrate() as fake_process:
yield fake_process
@pytest.fixture(scope="class")
def mock_proposal(self, tmpdir_factory):
return MockProposal(
tmp_path=Path(tmpdir_factory.mktemp("exp")),
instrument="AGIPD",
cycle="202031",
proposal="p900113",
runs=1,
sequences=1,
)
@pytest.fixture(scope="function")
def calibrate_call(self, mock_proposal: MockProposal, capsys, tmp_path):
return CalibrateCall(
tmp_path,
capsys,
in_folder=mock_proposal.path_raw,
out_folder=mock_proposal.path_proc,
detector="TEST",
cal_type="TEST-CLI",
extra_args=[
"--number",
"10",
"--list-normal",
"1,2,10",
"--list-intellilist",
"1,2,5-8",
"--concurrency-parameter",
"0,1",
],
)
def test_intellilist(self, calibrate_call: CalibrateCall):
assert "--number" in calibrate_call.args
assert "--list-intellilist" in calibrate_call.args
assert "1,2,5-8" in calibrate_call.args
assert len(calibrate_call.paths.notebooks) == 2
for i, notebook_path in enumerate(calibrate_call.paths.notebooks):
with notebook_path.open() as file:
notebook = nbformat.read(file, as_version=4)
parameters = {p.name: p.value for p in extract_parameters(notebook)}
assert parameters["number"] == 10
assert parameters["list_normal"] == [1, 2, 10]
assert parameters["list_intellilist"] == [1, 2, 5, 6, 7]
assert parameters["concurrency_parameter"][0] == i
class TestAgipdNotebook:
@pytest.fixture(scope="class", autouse=True)
def fake_process_calibrate(self):
with FakeProcessCalibrate() as fake_process:
yield fake_process
@pytest.fixture(scope="function")
def mock_proposal(self, tmpdir_factory):
return MockProposal(
tmp_path=Path(tmpdir_factory.mktemp("exp")),
instrument="AGIPD",
cycle="202031",
proposal="p900113",
runs=2,
# TODO: update this once extra-data tests can have variable sequences
# sequences=5,
)
@pytest.fixture(scope="function")
def calibrate_call(self, mock_proposal: MockProposal, capsys, tmp_path):
return CalibrateCall(
tmp_path,
capsys,
in_folder=mock_proposal.path_raw,
out_folder=mock_proposal.path_proc / "r0000",
detector="AGIPD",
cal_type="CORRECT",
extra_args=[
"--run",
"0",
"--sequences",
"1-3",
# TODO: enable this when notebook execution tests are ready to be ran
# "--no-cluster-job",
],
)
@pytest.mark.skip(reason="not implemented")
def test_out_folder_correct(self):
# TODO: add a test checking that the out folder is correct
# reffer to: https://git.xfel.eu/gitlab/detectors/pycalibration/issues/52
pass
@pytest.mark.skip(reason="requires extra-data test file sequence options")
def test_files_present(self, calibrate_call: CalibrateCall):
# There should be three notebooks: one pre, then the main, then one post
assert len(calibrate_call.paths.notebooks) == 3
# This is pretty fragile, but the name of notebooks should not change
# (too) often
root_nb_path = calibrate_call.paths.notebooks[0].parent
notebooks = [
root_nb_path / "AGIPD_Correct_and_Verify__sequences__1.ipynb",
root_nb_path / "AGIPD_Correct_and_Verify_Summary_NBC__None__None.ipynb",
root_nb_path / "AGIPD_Retrieve_Constants_Precorrection__None__None.ipynb",
]
assert all(nb in calibrate_call.paths.notebooks for nb in notebooks)
@pytest.mark.skip(reason="not implemented")
def test_nb_sequences(self, calibrate_call: CalibrateCall):
notebook_path = (
calibrate_call.paths.notebooks[0].parent
/ "AGIPD_Correct_and_Verify__sequences__1.ipynb"
)
with notebook_path.open() as file:
notebook = nbformat.read(file, as_version=4)
parameters = {p.name: p.value for p in extract_parameters(notebook)}
# TODO: add test cases for this notebook
print(parameters)
@pytest.mark.skip(reason="not implemented")
def test_nb_summary(self, calibrate_call: CalibrateCall):
notebook_path = (
calibrate_call.paths.notebooks[0].parent
/ "AGIPD_Correct_and_Verify_Summary_NBC__None__None.ipynb"
)
with notebook_path.open() as file:
notebook = nbformat.read(file, as_version=4)
parameters = {p.name: p.value for p in extract_parameters(notebook)}
# TODO: add test cases for this notebook
print(parameters)
@pytest.mark.skip(reason="not implemented")
def test_nb_precorrection(self, calibrate_call: CalibrateCall):
notebook_path = (
calibrate_call.paths.notebooks[0].parent
/ "AGIPD_Retrieve_Constants_Precorrection__None__None.ipynb"
)
with notebook_path.open() as file:
notebook = nbformat.read(file, as_version=4)
# TODO: add test cases for this notebook
parameters = {p.name: p.value for p in extract_parameters(notebook)}
print(parameters)
......@@ -1087,11 +1087,18 @@ class ActionsServer:
request_time
)
await update_mdc_status(self.mdc, 'dark_request', rid, ret)
if report_path is None:
if len(report_path) == 0:
logging.warning("Failed to identify report path for dark_request")
else:
await update_darks_paths(self.mdc, rid, in_folder,
out_folder, report_path)
if len(report_path) > 1:
logging.warning(
"More than one report path is returned. "
"Updating myMDC with the first report path only."
)
await update_darks_paths(
self.mdc, rid, in_folder,
out_folder, report_path[0]
)
# END of part to run after sending reply
asyncio.ensure_future(_continue())
......@@ -1162,7 +1169,9 @@ class ActionsServer:
async def launch_jobs(
self, run_nrs, rid, detectors, action, instrument, cycle, proposal,
request_time
) -> (str, Optional[str]):
) -> (str, List[str]):
report = []
ret = []
# run xfel_calibrate
for karabo_id, dconfig in detectors.items():
detector = dconfig['detector-type']
......@@ -1179,16 +1188,16 @@ class ActionsServer:
).split()
cmd = parse_config(cmd, dconfig)
ret = await run_action(self.job_db, cmd, self.mode,
proposal, run_nrs[-1], rid)
# TODO: Add detector info in returned run action status.
ret.append(await run_action(
self.job_db, cmd, self.mode,
proposal, run_nrs[-1], rid
))
if '--report-to' in cmd[:-1]:
report_idx = cmd.index('--report-to') + 1
report = cmd[report_idx] + '.pdf'
else:
report = None
return ret, report
report.append(cmd[report_idx] + '.pdf')
# return string without a tailing comma.
return ", ".join(ret), report
parser = argparse.ArgumentParser(
description='Start the calibration webservice')
......