Skip to content
Snippets Groups Projects
Commit 5013f7e8 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'feat/inject_ccv_resfulapi' into 'master'

Feat[Jungfrau]: Inject CCVs using RESTful API

See merge request !1026
parents ac3a2137 fbe01bde
No related branches found
No related tags found
1 merge request!1026Feat[Jungfrau]: Inject CCVs using RESTful API
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Jungfrau Dark Image Characterization # # Jungfrau Dark Image Characterization #
Author: European XFEL Detector Group, Version: 2.0 Author: European XFEL Detector Group, Version: 2.0
Analyzes Jungfrau dark image data to deduce offset, noise and resulting bad pixel maps Analyzes Jungfrau dark image data to deduce offset, noise and resulting bad pixel maps
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
in_folder = '/gpfs/exfel/exp/SPB/202130/p900204/raw/' # folder under which runs are located, required in_folder = '/gpfs/exfel/exp/SPB/202130/p900204/raw/' # folder under which runs are located, required
out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove' # path to place reports at, required out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove' # path to place reports at, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate
run_high = 141 # run number for G0 dark run, required run_high = 141 # run number for G0 dark run, required
run_med = 142 # run number for G1 dark run, required run_med = 142 # run number for G1 dark run, required
run_low = 143 # run number for G2 dark run, required run_low = 143 # run number for G2 dark run, required
# Parameters used to access raw data. # Parameters used to access raw data.
karabo_da = ['JNGFR01', 'JNGFR02','JNGFR03','JNGFR04', 'JNGFR05', 'JNGFR06','JNGFR07','JNGFR08'] # list of data aggregators, which corresponds to different JF modules karabo_da = ['JNGFR01', 'JNGFR02','JNGFR03','JNGFR04', 'JNGFR05', 'JNGFR06','JNGFR07','JNGFR08'] # list of data aggregators, which corresponds to different JF modules
karabo_id = 'SPB_IRDA_JF4M' # karabo_id (detector identifier) prefix of Jungfrau detector to process. karabo_id = 'SPB_IRDA_JF4M' # karabo_id (detector identifier) prefix of Jungfrau detector to process.
karabo_id_control = '' # if control is on a different ID, set to empty string if it is the same a karabo-id karabo_id_control = '' # if control is on a different ID, set to empty string if it is the same a karabo-id
receiver_template = 'JNGFR{:02}' # inset for receiver devices receiver_template = 'JNGFR{:02}' # inset for receiver devices
instrument_source_template = '{}/DET/{}:daqOutput' # template for instrument source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput' instrument_source_template = '{}/DET/{}:daqOutput' # template for instrument source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput'
ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control) ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control)
# Parameters for calibration database and storing constants. # Parameters for calibration database and storing constants.
cal_db_interface = '' # calibrate db interface to connect to # KEEP FOR THE WEBSERVICE cal_db_interface = '' # calibrate db interface to connect to # KEEP FOR THE WEBSERVICE
cal_db_timeout = 300000 # timeout on caldb requests cal_db_timeout = 300000 # timeout on caldb requests
local_output = True # output constants locally local_output = True # output constants locally
db_output = False # output constants to database db_output = False # output constants to database
# Parameters affecting creating dark calibration constants. # Parameters affecting creating dark calibration constants.
badpixel_threshold_sigma = 5. # bad pixels defined by values outside n times this std from median badpixel_threshold_sigma = 5. # bad pixels defined by values outside n times this std from median
offset_abs_threshold_low = [1000, 10000, 10000] # absolute bad pixel threshold in terms of offset, lower values offset_abs_threshold_low = [1000, 10000, 10000] # absolute bad pixel threshold in terms of offset, lower values
offset_abs_threshold_high = [8000, 15000, 15000] # absolute bad pixel threshold in terms of offset, upper values offset_abs_threshold_high = [8000, 15000, 15000] # absolute bad pixel threshold in terms of offset, upper values
max_trains = 1000 # Maximum trains to process darks. Set to 0 to process all available train images. 1000 trains is enough resolution to create the dark constants max_trains = 1000 # Maximum trains to process darks. Set to 0 to process all available train images. 1000 trains is enough resolution to create the dark constants
min_trains = 100 # Minimum number of trains to process dark constants. Raise a warning if the run has fewer trains. min_trains = 100 # Minimum number of trains to process dark constants. Raise a warning if the run has fewer trains.
skip_first_ntrains = 0 # Skip first number of trains and don't include them in dark processing. skip_first_ntrains = 0 # Skip first number of trains and don't include them in dark processing.
manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values
time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00" creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
# Parameters to be used for injecting dark calibration constants. # Parameters to be used for injecting dark calibration constants.
integration_time = -1 # Integration time in us. Set to -1 to overwrite by value in file. integration_time = -1 # Integration time in us. Set to -1 to overwrite by value in file.
exposure_timeout = -1 # Exposure timeout. Set to -1 to overwrite by value in file. exposure_timeout = -1 # Exposure timeout. Set to -1 to overwrite by value in file.
gain_setting = -1 # 0 for dynamic, forceswitchg1, forceswitchg2, 1 for dynamichg0, fixgain1, fixgain2. Set to overwrite by value in file. gain_setting = -1 # 0 for dynamic, forceswitchg1, forceswitchg2, 1 for dynamichg0, fixgain1, fixgain2. Set to overwrite by value in file.
gain_mode = -1 # 1 if medium and low runs are fixgain1 and fixgain2, otherwise 0. Set to -1 to overwrite by value in file. gain_mode = -1 # 1 if medium and low runs are fixgain1 and fixgain2, otherwise 0. Set to -1 to overwrite by value in file.
bias_voltage = -1 # sensor bias voltage in V, will be overwritten by value in file bias_voltage = -1 # sensor bias voltage in V, will be overwritten by value in file
memory_cells = -1 # Number of memory cells. memory_cells = -1 # Number of memory cells.
# Parameters used for plotting # Parameters used for plotting
detailed_report = False detailed_report = False
# TODO: this is used for only Warning check at AGIPD dark. # TODO: this is used for only Warning check at AGIPD dark.
# Need to rethink if it makes sense to use it here as well. # Need to rethink if it makes sense to use it here as well.
operation_mode = 'ADAPTIVE_GAIN' # Detector operation mode, optional operation_mode = 'ADAPTIVE_GAIN' # Detector operation mode, optional
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
import os import os
from datetime import timedelta from datetime import timedelta
from logging import warning from logging import warning
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
import matplotlib import matplotlib
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
import pasha as psh import pasha as psh
import yaml import yaml
from IPython.display import Markdown, display from IPython.display import Markdown, display
from extra_data import RunDirectory from extra_data import RunDirectory
from XFELDetAna.plotting.heatmap import heatmapPlot from XFELDetAna.plotting.heatmap import heatmapPlot
from XFELDetAna.plotting.histogram import histPlot from XFELDetAna.plotting.histogram import histPlot
from cal_tools import step_timing from cal_tools import step_timing
from cal_tools.calcat_interface2 import ( from cal_tools.calcat_interface2 import (
CalibrationData, CalibrationData,
JUNGFRAUConditions, JUNGFRAUConditions,
) )
from cal_tools.constants import ( from cal_tools.constants import (
CCVAlreadyInjectedError, CCVAlreadyInjectedError,
inject_ccv, inject_ccv,
write_ccv, write_ccv,
) )
from cal_tools.enums import BadPixels, JungfrauGainMode from cal_tools.enums import BadPixels, JungfrauGainMode
from cal_tools.jungfrau import jungfraulib from cal_tools.jungfrau import jungfraulib
from cal_tools.restful_config import ( from cal_tools.restful_config import (
extra_calibration_client, extra_calibration_client,
) )
from cal_tools.tools import calcat_creation_time, pdus_by_detector_id from cal_tools.tools import calcat_creation_time, pdus_by_detector_id
matplotlib.use('agg') matplotlib.use('agg')
%matplotlib inline %matplotlib inline
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Constants relevant for the analysis # Constants relevant for the analysis
run_nums = [run_high, run_med, run_low] # run number for G0/HG0, G1, G2 run_nums = [run_high, run_med, run_low] # run number for G0/HG0, G1, G2
sensor_size = (1024, 512) sensor_size = (1024, 512)
gains = [0, 1, 2] gains = [0, 1, 2]
fixed_settings = [ fixed_settings = [
JungfrauGainMode.FIX_GAIN_1.value, JungfrauGainMode.FIX_GAIN_2.value] JungfrauGainMode.FIX_GAIN_1.value, JungfrauGainMode.FIX_GAIN_2.value]
dynamic_settings = [ dynamic_settings = [
JungfrauGainMode.FORCE_SWITCH_HG1.value, JungfrauGainMode.FORCE_SWITCH_HG2.value] JungfrauGainMode.FORCE_SWITCH_HG1.value, JungfrauGainMode.FORCE_SWITCH_HG2.value]
old_fixed_settings = ["fixgain1", "fixgain2"] old_fixed_settings = ["fixgain1", "fixgain2"]
creation_time = calcat_creation_time(in_folder, run_high, creation_time) creation_time = calcat_creation_time(in_folder, run_high, creation_time)
print(f"Using {creation_time} as creation time") print(f"Using {creation_time} as creation time")
os.makedirs(out_folder, exist_ok=True) os.makedirs(out_folder, exist_ok=True)
if karabo_id_control == "": if karabo_id_control == "":
karabo_id_control = karabo_id karabo_id_control = karabo_id
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2] proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
step_timer = step_timing.StepTimer() step_timer = step_timing.StepTimer()
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Reading control data ## Reading control data
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
gain_runs = dict() gain_runs = dict()
med_low_settings = [] med_low_settings = []
ctrl_src = ctrl_source_template.format(karabo_id_control) ctrl_src = ctrl_source_template.format(karabo_id_control)
run_nums = jungfraulib.sort_runs_by_gain( run_nums = jungfraulib.sort_runs_by_gain(
raw_folder=in_folder, raw_folder=in_folder,
runs=run_nums, runs=run_nums,
ctrl_src=ctrl_src, ctrl_src=ctrl_src,
) )
_gain_mode = None _gain_mode = None
for gain, run_n in enumerate(run_nums): for gain, run_n in enumerate(run_nums):
run_dc = RunDirectory(f"{in_folder}/r{run_n:04d}/") run_dc = RunDirectory(f"{in_folder}/r{run_n:04d}/")
gain_runs[run_n] = [gain, run_dc] gain_runs[run_n] = [gain, run_dc]
ctrl_data = jungfraulib.JungfrauCtrl(run_dc, ctrl_src) ctrl_data = jungfraulib.JungfrauCtrl(run_dc, ctrl_src)
# Read control data for the high gain run only. # Read control data for the high gain run only.
if gain == 0: if gain == 0:
run_mcells, sc_start = ctrl_data.get_memory_cells() run_mcells, sc_start = ctrl_data.get_memory_cells()
if integration_time < 0: if integration_time < 0:
integration_time = ctrl_data.get_integration_time() integration_time = ctrl_data.get_integration_time()
print(f"Integration time is {integration_time} us.") print(f"Integration time is {integration_time} us.")
else: else:
print(f"Integration time is manually set to {integration_time} us.") print(f"Integration time is manually set to {integration_time} us.")
if exposure_timeout < 0: if exposure_timeout < 0:
exposure_timeout = ctrl_data.get_exposure_timeout() exposure_timeout = ctrl_data.get_exposure_timeout()
print(f"Exposure timeout is {exposure_timeout}.") print(f"Exposure timeout is {exposure_timeout}.")
else: else:
print(f"Exposure timeout is manually set to {exposure_timeout}.") print(f"Exposure timeout is manually set to {exposure_timeout}.")
if bias_voltage < 0: if bias_voltage < 0:
bias_voltage = ctrl_data.get_bias_voltage() bias_voltage = ctrl_data.get_bias_voltage()
print(f"Bias voltage is {bias_voltage} V.") print(f"Bias voltage is {bias_voltage} V.")
else: else:
print(f"Bias voltage is manually set to {bias_voltage} V.") print(f"Bias voltage is manually set to {bias_voltage} V.")
if gain_setting < 0: if gain_setting < 0:
gain_setting = ctrl_data.get_gain_setting() gain_setting = ctrl_data.get_gain_setting()
print(f"Gain setting is {gain_setting} ({ctrl_data.run_settings})") print(f"Gain setting is {gain_setting} ({ctrl_data.run_settings})")
else: else:
print(f"Gain setting is manually set to {gain_setting}.") print(f"Gain setting is manually set to {gain_setting}.")
if run_mcells == 1: if run_mcells == 1:
memory_cells = 1 memory_cells = 1
print('Dark runs in single cell mode, ' print('Dark runs in single cell mode, '
f'storage cell start: {sc_start:02d}') f'storage cell start: {sc_start:02d}')
else: else:
memory_cells = 16 memory_cells = 16
print('Dark runs in burst mode, ' print('Dark runs in burst mode, '
f'storage cell start: {sc_start:02d}') f'storage cell start: {sc_start:02d}')
else: # medium and low gain else: # medium and low gain
_gain_mode = ctrl_data.get_gain_mode() _gain_mode = ctrl_data.get_gain_mode()
med_low_settings.append(ctrl_data.run_mode) med_low_settings.append(ctrl_data.run_mode)
# TODO: consider updating this cell into something similar to agipdlib.AgipdCtrlsRuns() # TODO: consider updating this cell into something similar to agipdlib.AgipdCtrlsRuns()
if gain_mode < 0: if gain_mode < 0:
gain_mode = _gain_mode gain_mode = _gain_mode
print(f"Gain mode is {gain_mode} ({med_low_settings})") print(f"Gain mode is {gain_mode} ({med_low_settings})")
else: else:
print(f"Gain mode is manually set to {gain_mode}.") print(f"Gain mode is manually set to {gain_mode}.")
step_timer.done_step(f'Reading control data.') step_timer.done_step(f'Reading control data.')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
# set the operating condition # set the operating condition
conditions = JUNGFRAUConditions( conditions = JUNGFRAUConditions(
sensor_bias_voltage=bias_voltage, sensor_bias_voltage=bias_voltage,
memory_cells=memory_cells, memory_cells=memory_cells,
exposure_timeout=exposure_timeout, exposure_timeout=exposure_timeout,
integration_time=integration_time, integration_time=integration_time,
gain_setting=gain_setting, gain_setting=gain_setting,
gain_mode=gain_mode, gain_mode=gain_mode,
) )
cc = extra_calibration_client() cc = extra_calibration_client(inject=True)
det_id = cc.detector_by_identifier(karabo_id)['id'] det_id = cc.detector_by_identifier(karabo_id)['id']
pdus = pdus_by_detector_id(cc, det_id, snapshot_at=creation_time) pdus = pdus_by_detector_id(cc, det_id, snapshot_at=creation_time)
da_to_pdu = dict() da_to_pdu = dict()
pdu_to_uuid = dict() pdu_to_uuid = dict()
for pdu in pdus: for pdu in pdus:
if pdu['karabo_da'] in karabo_da: # exclude unselected das if pdu['karabo_da'] in karabo_da: # exclude unselected das
da_to_pdu[pdu['karabo_da']] = pdu['physical_name'] da_to_pdu[pdu['karabo_da']] = pdu['physical_name']
pdu_to_uuid[pdu['physical_name']] = pdu['uuid'] pdu_to_uuid[pdu['physical_name']] = pdu['uuid']
first_pdu = pdus[0] first_pdu = pdus[0]
detector_info = first_pdu['detector'] detector_info = first_pdu['detector']
detector_info['detector_type'] = first_pdu['detector_type']['name'] detector_info['detector_type'] = first_pdu['detector_type']['name']
step_timer.done_step('Set conditions and get PDU names from CalCat.') step_timer.done_step('Set conditions and get PDU names from CalCat.')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Use only high gain threshold for all gains in case of fixed_gain. # Use only high gain threshold for all gains in case of fixed_gain.
if gain_mode: # fixed_gain if gain_mode: # fixed_gain
offset_abs_threshold = [[offset_abs_threshold_low[0]]*3, [offset_abs_threshold_high[0]]*3] offset_abs_threshold = [[offset_abs_threshold_low[0]]*3, [offset_abs_threshold_high[0]]*3]
else: else:
offset_abs_threshold = [offset_abs_threshold_low, offset_abs_threshold_high] offset_abs_threshold = [offset_abs_threshold_low, offset_abs_threshold_high]
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
context = psh.context.ThreadContext(num_workers=memory_cells) context = psh.context.ThreadContext(num_workers=memory_cells)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
""" """
All jungfrau runs are taken through one acquisition, except for the forceswitch runs. All jungfrau runs are taken through one acquisition, except for the forceswitch runs.
While taking non-fixed dark runs, a procedure of multiple acquisitions is used to switch the storage cell indices. While taking non-fixed dark runs, a procedure of multiple acquisitions is used to switch the storage cell indices.
This is done for medium and low gain dark dynamic runs, only [forceswitchg1, forceswitchg2]: This is done for medium and low gain dark dynamic runs, only [forceswitchg1, forceswitchg2]:
Switching the cell indices in burst mode is a work around for hardware procedure Switching the cell indices in burst mode is a work around for hardware procedure
deficiency that produces wrong data for dark runs except for the first storage cell. deficiency that produces wrong data for dark runs except for the first storage cell.
This is why multiple acquisitions are taken to switch the used storage cells and This is why multiple acquisitions are taken to switch the used storage cells and
acquire data through two cells for each of the 16 cells instead of acquiring darks through all 16 cells. acquire data through two cells for each of the 16 cells instead of acquiring darks through all 16 cells.
""" """
print(f"Maximum trains to process is set to {max_trains}") print(f"Maximum trains to process is set to {max_trains}")
noise_map = dict() noise_map = dict()
offset_map = dict() offset_map = dict()
bad_pixels_map = dict() bad_pixels_map = dict()
for mod in karabo_da: for mod in karabo_da:
step_timer.start() step_timer.start()
instrument_src = instrument_source_template.format( instrument_src = instrument_source_template.format(
karabo_id, receiver_template.format(int(mod[-2:]))) karabo_id, receiver_template.format(int(mod[-2:])))
print(f"\n- Instrument data path for {mod} is {instrument_src}.") print(f"\n- Instrument data path for {mod} is {instrument_src}.")
# (1024, 512, 1 or 16, 3) # (1024, 512, 1 or 16, 3)
offset_map[mod] = context.alloc( offset_map[mod] = context.alloc(
shape=(sensor_size+(memory_cells, 3)), fill=0, dtype=np.float32) shape=(sensor_size+(memory_cells, 3)), fill=0, dtype=np.float32)
noise_map[mod] = context.alloc(like=offset_map[mod], fill=0) noise_map[mod] = context.alloc(like=offset_map[mod], fill=0)
bad_pixels_map[mod] = context.alloc(shape=offset_map[mod].shape, dtype=np.uint32, fill=0) bad_pixels_map[mod] = context.alloc(shape=offset_map[mod].shape, dtype=np.uint32, fill=0)
for run_n, [gain, run_dc] in gain_runs.items(): for run_n, [gain, run_dc] in gain_runs.items():
def process_cell(worker_id, array_index, cell_number): def process_cell(worker_id, array_index, cell_number):
cell_slice_idx = acelltable == cell_number cell_slice_idx = acelltable == cell_number
if cell_slice_idx.sum() == 0: if cell_slice_idx.sum() == 0:
# This cell is not in the data (or it's deliberated excluded) # This cell is not in the data (or it's deliberated excluded)
bad_pixels_map[mod][..., cell_number, gain] = BadPixels.NO_DARK_DATA.value bad_pixels_map[mod][..., cell_number, gain] = BadPixels.NO_DARK_DATA.value
offset_map[mod][..., cell_number, gain] = np.nan offset_map[mod][..., cell_number, gain] = np.nan
noise_map[mod][..., cell_number, gain] = np.nan noise_map[mod][..., cell_number, gain] = np.nan
return return
thiscell = images[..., cell_slice_idx] # [1024, 512, n_trains] thiscell = images[..., cell_slice_idx] # [1024, 512, n_trains]
# Identify cells/trains with images of 0 pixels. # Identify cells/trains with images of 0 pixels.
# TODO: An investigation is ongoing by DET to identify reason for these empty images. # TODO: An investigation is ongoing by DET to identify reason for these empty images.
nonzero_adc = np.any(thiscell != 0 , axis=(0, 1)) # [n_trains] nonzero_adc = np.any(thiscell != 0 , axis=(0, 1)) # [n_trains]
# Exclude empty images with 0 pixels, before calculating offset and noise # Exclude empty images with 0 pixels, before calculating offset and noise
thiscell = thiscell[..., nonzero_adc] thiscell = thiscell[..., nonzero_adc]
offset_map[mod][..., cell_number, gain] = np.mean( # [1024, 512] offset_map[mod][..., cell_number, gain] = np.mean( # [1024, 512]
thiscell, axis=2, dtype=np.float32) thiscell, axis=2, dtype=np.float32)
noise_map[mod][..., cell_number, gain] = np.std( # [1024, 512] noise_map[mod][..., cell_number, gain] = np.std( # [1024, 512]
thiscell, axis=2, dtype=np.float32) thiscell, axis=2, dtype=np.float32)
del thiscell del thiscell
# Check if there are wrong bad gain values. # Check if there are wrong bad gain values.
# 1. Exclude empty images. # 1. Exclude empty images.
# 2. Indicate pixels with wrong gain value for any train for each cell. # 2. Indicate pixels with wrong gain value for any train for each cell.
# TODO: mean is used to use thresholds for accepting gain values, even if not 0 mean value. # TODO: mean is used to use thresholds for accepting gain values, even if not 0 mean value.
gain_avg = np.mean( # [1024, 512] gain_avg = np.mean( # [1024, 512]
gain_vals[..., cell_slice_idx][..., nonzero_adc], gain_vals[..., cell_slice_idx][..., nonzero_adc],
axis=2, dtype=np.float32 axis=2, dtype=np.float32
) )
# Assign WRONG_GAIN_VALUE for a pixel in a badpixel map for all gains. # Assign WRONG_GAIN_VALUE for a pixel in a badpixel map for all gains.
bad_pixels_map[mod][:, :,cell_number][gain_avg != raw_g] |= BadPixels.WRONG_GAIN_VALUE.value bad_pixels_map[mod][:, :,cell_number][gain_avg != raw_g] |= BadPixels.WRONG_GAIN_VALUE.value
print(f"Gain stage {gain}, run {run_n}") print(f"Gain stage {gain}, run {run_n}")
# load shape of data for memory cells, and detector size (imgs, cells, x, y) # load shape of data for memory cells, and detector size (imgs, cells, x, y)
n_trains = run_dc[instrument_src, "data.adc"].shape[0] n_trains = run_dc[instrument_src, "data.adc"].shape[0]
# load number of data available, including trains with empty data. # load number of data available, including trains with empty data.
all_trains = len(run_dc.train_ids) all_trains = len(run_dc.train_ids)
empty_trains = all_trains - n_trains empty_trains = all_trains - n_trains
if empty_trains != 0: if empty_trains != 0:
print(f"{mod} has {empty_trains} empty trains out of {all_trains} trains") print(f"{mod} has {empty_trains} empty trains out of {all_trains} trains")
if skip_first_ntrains: if skip_first_ntrains:
print(f"Skip first {skip_first_ntrains} trains from processing as configured.") print(f"Skip first {skip_first_ntrains} trains from processing as configured.")
instr_dc = run_dc.select( instr_dc = run_dc.select(
instrument_src, require_all=True).select_trains(np.s_[skip_first_ntrains:]) instrument_src, require_all=True).select_trains(np.s_[skip_first_ntrains:])
if max_trains > 0: if max_trains > 0:
n_trains = min(n_trains, max_trains) n_trains = min(n_trains, max_trains)
print(f"Processing {n_trains} images.") print(f"Processing {n_trains} images.")
if n_trains == 0: if n_trains == 0:
raise ValueError(f"{run_n} has no trains to process.") raise ValueError(f"{run_n} has no trains to process.")
if n_trains < min_trains: if n_trains < min_trains:
warning(f"Less than {min_trains} trains are available in RAW data.") warning(f"Less than {min_trains} trains are available in RAW data.")
# Select only requested number of images to process darks. # Select only requested number of images to process darks.
instr_dc = instr_dc.select_trains(np.s_[:n_trains]) instr_dc = instr_dc.select_trains(np.s_[:n_trains])
images = np.transpose( images = np.transpose(
instr_dc[instrument_src, "data.adc"].ndarray(), (3, 2, 1, 0)) instr_dc[instrument_src, "data.adc"].ndarray(), (3, 2, 1, 0))
acelltable = np.transpose(instr_dc[instrument_src, "data.memoryCell"].ndarray()) acelltable = np.transpose(instr_dc[instrument_src, "data.memoryCell"].ndarray())
gain_vals = np.transpose( gain_vals = np.transpose(
instr_dc[instrument_src, "data.gain"].ndarray(), (3, 2, 1, 0)) instr_dc[instrument_src, "data.gain"].ndarray(), (3, 2, 1, 0))
# define gain value as saved in raw gain map # define gain value as saved in raw gain map
raw_g = 3 if gain == 2 else gain raw_g = 3 if gain == 2 else gain
if memory_cells == 1: if memory_cells == 1:
acelltable -= sc_start acelltable -= sc_start
# Only for dynamic medium and low gain runs [forceswitchg1, forceswitchg2] in burst mode. # Only for dynamic medium and low gain runs [forceswitchg1, forceswitchg2] in burst mode.
if ( if (
gain_mode == 0 and # dynamic gain mode gain_mode == 0 and # dynamic gain mode
gain > 0 and # Medium and low runs gain > 0 and # Medium and low runs
memory_cells == 16 and # Burst mode memory_cells == 16 and # Burst mode
acelltable.shape[0] == 2 # forceswitchg1 and forceswitchg2 acquired with the MDL device. acelltable.shape[0] == 2 # forceswitchg1 and forceswitchg2 acquired with the MDL device.
): ):
# 255 similar to the receiver which uses the 255 # 255 similar to the receiver which uses the 255
# value to indicate a cell without an image. # value to indicate a cell without an image.
# image shape for forceswitchg1 and forceswitchg2 = (1024, 512, 2, trains) # image shape for forceswitchg1 and forceswitchg2 = (1024, 512, 2, trains)
# compared to expected shape of (1024, 512, 16, trains) for high gain run. # compared to expected shape of (1024, 512, 16, trains) for high gain run.
acelltable[1:] = 255 acelltable[1:] = 255
# Calculate offset and noise maps # Calculate offset and noise maps
context.map(process_cell, range(memory_cells)) context.map(process_cell, range(memory_cells))
cells_missing = (bad_pixels_map[mod][0, 0, :, gain] & BadPixels.NO_DARK_DATA) > 0 cells_missing = (bad_pixels_map[mod][0, 0, :, gain] & BadPixels.NO_DARK_DATA) > 0
if np.any(cells_missing): if np.any(cells_missing):
print(f"No dark data in gain stage {gain} found for cells", np.nonzero(cells_missing)[0]) print(f"No dark data in gain stage {gain} found for cells", np.nonzero(cells_missing)[0])
del images del images
del acelltable del acelltable
del gain_vals del gain_vals
step_timer.done_step('Creating Offset and noise constants for a module.') step_timer.done_step('Creating Offset and noise constants for a module.')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
if detailed_report: if detailed_report:
display(Markdown("## Offset and Noise Maps:")) display(Markdown("## Offset and Noise Maps:"))
display(Markdown( display(Markdown(
"Below offset and noise maps for the high ($g_0$) gain stage are shown, " "Below offset and noise maps for the high ($g_0$) gain stage are shown, "
"alongside the distribution of these values. One expects block-like " "alongside the distribution of these values. One expects block-like "
"structures mapping to the ASICs of the detector")) "structures mapping to the ASICs of the detector"))
g_name = ['G0', 'G1', 'G2'] g_name = ['G0', 'G1', 'G2']
g_range = [(0, 8000), (8000, 16000), (8000, 16000)] g_range = [(0, 8000), (8000, 16000), (8000, 16000)]
n_range = [(0., 50.), (0., 50.), (0., 50.)] n_range = [(0., 50.), (0., 50.), (0., 50.)]
unit = '[ADCu]' unit = '[ADCu]'
# TODO: Fix plots arrangment and speed for Jungfrau burst mode. # TODO: Fix plots arrangment and speed for Jungfrau burst mode.
step_timer.start() step_timer.start()
for mod, pdu in da_to_pdu.items(): for mod, pdu in da_to_pdu.items():
for g_idx in gains: for g_idx in gains:
for cell in range(0, memory_cells): for cell in range(0, memory_cells):
f_o0 = heatmapPlot( f_o0 = heatmapPlot(
np.swapaxes(offset_map[mod][..., cell, g_idx], 0, 1), np.swapaxes(offset_map[mod][..., cell, g_idx], 0, 1),
y_label="Row", y_label="Row",
x_label="Column", x_label="Column",
lut_label=unit, lut_label=unit,
aspect=1., aspect=1.,
vmin=g_range[g_idx][0], vmin=g_range[g_idx][0],
vmax=g_range[g_idx][1], vmax=g_range[g_idx][1],
title=f'Pedestal {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})') title=f'Pedestal {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})')
fo0, ax_o0 = plt.subplots() fo0, ax_o0 = plt.subplots()
res_o0 = histPlot( res_o0 = histPlot(
ax_o0, offset_map[mod][..., cell, g_idx], ax_o0, offset_map[mod][..., cell, g_idx],
bins=800, bins=800,
range=g_range[g_idx], range=g_range[g_idx],
facecolor='b', facecolor='b',
histotype='stepfilled', histotype='stepfilled',
) )
ax_o0.tick_params(axis='both',which='major',labelsize=15) ax_o0.tick_params(axis='both',which='major',labelsize=15)
ax_o0.set_title( ax_o0.set_title(
f'Module pedestal distribution - Cell {cell:02d} - Module {mod} ({pdu})', f'Module pedestal distribution - Cell {cell:02d} - Module {mod} ({pdu})',
fontsize=15) fontsize=15)
ax_o0.set_xlabel(f'Pedestal {g_name[g_idx]} {unit}',fontsize=15) ax_o0.set_xlabel(f'Pedestal {g_name[g_idx]} {unit}',fontsize=15)
ax_o0.set_yscale('log') ax_o0.set_yscale('log')
f_n0 = heatmapPlot( f_n0 = heatmapPlot(
np.swapaxes(noise_map[mod][..., cell, g_idx], 0, 1), np.swapaxes(noise_map[mod][..., cell, g_idx], 0, 1),
y_label="Row", y_label="Row",
x_label="Column", x_label="Column",
lut_label= unit, lut_label= unit,
aspect=1., aspect=1.,
vmin=n_range[g_idx][0], vmin=n_range[g_idx][0],
vmax=n_range[g_idx][1], vmax=n_range[g_idx][1],
title=f"RMS noise {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})", title=f"RMS noise {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})",
) )
fn0, ax_n0 = plt.subplots() fn0, ax_n0 = plt.subplots()
res_n0 = histPlot( res_n0 = histPlot(
ax_n0, ax_n0,
noise_map[mod][..., cell, g_idx], noise_map[mod][..., cell, g_idx],
bins=100, bins=100,
range=n_range[g_idx], range=n_range[g_idx],
facecolor='b', facecolor='b',
histotype='stepfilled', histotype='stepfilled',
) )
ax_n0.tick_params(axis='both', which='major', labelsize=15) ax_n0.tick_params(axis='both', which='major', labelsize=15)
ax_n0.set_title( ax_n0.set_title(
f'Module noise distribution - Cell {cell:02d} - Module {mod} ({pdu})', f'Module noise distribution - Cell {cell:02d} - Module {mod} ({pdu})',
fontsize=15) fontsize=15)
ax_n0.set_xlabel( ax_n0.set_xlabel(
f'RMS noise {g_name[g_idx]} ' + unit, fontsize=15) f'RMS noise {g_name[g_idx]} ' + unit, fontsize=15)
plt.show() plt.show()
step_timer.done_step('Plotting offset and noise maps.') step_timer.done_step('Plotting offset and noise maps.')
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Bad Pixel Map ### ## Bad Pixel Map ###
The bad pixel map is deduced by comparing offset and noise of each pixel ($v_i$) and each gain ($g$) against the median value for that gain stage: The bad pixel map is deduced by comparing offset and noise of each pixel ($v_i$) and each gain ($g$) against the median value for that gain stage:
$$ $$
v_i > \mathrm{median}(v_{k,g}) + n \sigma_{v_{k,g}} v_i > \mathrm{median}(v_{k,g}) + n \sigma_{v_{k,g}}
$$ $$
or or
$$ $$
v_i < \mathrm{median}(v_{k,g}) - n \sigma_{v_{k,g}} v_i < \mathrm{median}(v_{k,g}) - n \sigma_{v_{k,g}}
$$ $$
Values are encoded in a 32 bit mask, where for the dark image deduced bad pixels the following non-zero entries are relevant: Values are encoded in a 32 bit mask, where for the dark image deduced bad pixels the following non-zero entries are relevant:
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
def print_bp_entry(bp): def print_bp_entry(bp):
print("{:<30s} {:032b} -> {}".format(bp.name, bp.value, int(bp.value))) print("{:<30s} {:032b} -> {}".format(bp.name, bp.value, int(bp.value)))
print_bp_entry(BadPixels.OFFSET_OUT_OF_THRESHOLD) print_bp_entry(BadPixels.OFFSET_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.NOISE_OUT_OF_THRESHOLD) print_bp_entry(BadPixels.NOISE_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.OFFSET_NOISE_EVAL_ERROR) print_bp_entry(BadPixels.OFFSET_NOISE_EVAL_ERROR)
print_bp_entry(BadPixels.NO_DARK_DATA) print_bp_entry(BadPixels.NO_DARK_DATA)
print_bp_entry(BadPixels.WRONG_GAIN_VALUE) print_bp_entry(BadPixels.WRONG_GAIN_VALUE)
def eval_bpidx(d): def eval_bpidx(d):
mdn = np.nanmedian(d, axis=(0, 1))[None, None, :, :] mdn = np.nanmedian(d, axis=(0, 1))[None, None, :, :]
std = np.nanstd(d, axis=(0, 1))[None, None, :, :] std = np.nanstd(d, axis=(0, 1))[None, None, :, :]
idx = (d > badpixel_threshold_sigma*std+mdn) | (d < (-badpixel_threshold_sigma)*std+mdn) idx = (d > badpixel_threshold_sigma*std+mdn) | (d < (-badpixel_threshold_sigma)*std+mdn)
return idx return idx
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
for mod, pdu in da_to_pdu.items(): for mod, pdu in da_to_pdu.items():
display(Markdown(f"### Badpixels for module {mod} ({pdu}):")) display(Markdown(f"### Badpixels for module {mod} ({pdu}):"))
offset_abs_threshold = np.array(offset_abs_threshold) offset_abs_threshold = np.array(offset_abs_threshold)
bad_pixels_map[mod][eval_bpidx(offset_map[mod])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value bad_pixels_map[mod][eval_bpidx(offset_map[mod])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(offset_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value bad_pixels_map[mod][~np.isfinite(offset_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][eval_bpidx(noise_map[mod])] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value bad_pixels_map[mod][eval_bpidx(noise_map[mod])] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(noise_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value bad_pixels_map[mod][~np.isfinite(noise_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][(offset_map[mod] < offset_abs_threshold[0][None, None, None, :]) | (offset_map[mod] > offset_abs_threshold[1][None, None, None, :])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value # noqa bad_pixels_map[mod][(offset_map[mod] < offset_abs_threshold[0][None, None, None, :]) | (offset_map[mod] > offset_abs_threshold[1][None, None, None, :])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value # noqa
if detailed_report: if detailed_report:
for g_idx in gains: for g_idx in gains:
for cell in range(memory_cells): for cell in range(memory_cells):
bad_pixels = bad_pixels_map[mod][:, :, cell, g_idx] bad_pixels = bad_pixels_map[mod][:, :, cell, g_idx]
fn_0 = heatmapPlot( fn_0 = heatmapPlot(
np.swapaxes(bad_pixels, 0, 1), np.swapaxes(bad_pixels, 0, 1),
y_label="Row", y_label="Row",
x_label="Column", x_label="Column",
lut_label=f"Badpixels {g_name[g_idx]} [ADCu]", lut_label=f"Badpixels {g_name[g_idx]} [ADCu]",
aspect=1., aspect=1.,
vmin=0, vmax=5, vmin=0, vmax=5,
title=f'G{g_idx} Bad pixel map - Cell {cell:02d} - Module {mod} ({pdu})') title=f'G{g_idx} Bad pixel map - Cell {cell:02d} - Module {mod} ({pdu})')
step_timer.done_step('Creating bad pixels constant') step_timer.done_step('Creating bad pixels constant')
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
## Inject and save calibration constants ## Inject and save calibration constants
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
step_timer.start() step_timer.start()
constants = {} constants = {}
for mod, pdu in da_to_pdu.items(): for mod, pdu in da_to_pdu.items():
constants['Offset10Hz'] = np.moveaxis(offset_map[mod], 0, 1) constants['Offset10Hz'] = np.moveaxis(offset_map[mod], 0, 1)
constants['Noise10Hz'] = np.moveaxis(noise_map[mod], 0, 1) constants['Noise10Hz'] = np.moveaxis(noise_map[mod], 0, 1)
constants['BadPixelsDark10Hz'] = np.moveaxis(bad_pixels_map[mod], 0, 1) constants['BadPixelsDark10Hz'] = np.moveaxis(bad_pixels_map[mod], 0, 1)
md = None
for const_name, const_data in constants.items(): for const_name, const_data in constants.items():
with NamedTemporaryFile(dir=out_folder) as tempf: with NamedTemporaryFile(dir=out_folder) as tempf:
ccv_root = write_ccv( ccv_root = write_ccv(
tempf.name, tempf.name,
pdu, pdu_name=pdu,
pdu_to_uuid[pdu], pdu_uuid=pdu_to_uuid[pdu],
detector_info["detector_type"], detector_type=detector_info["detector_type"],
const_name, calibration=const_name,
conditions, conditions=conditions,
creation_time, created_at=creation_time,
proposal,[run_high, run_med, run_low], proposal=proposal,
const_data, runs=[run_high, run_med, run_low],
data=const_data,
dims=["fast_scan", "slow_scan", "cell", "gain"], dims=["fast_scan", "slow_scan", "cell", "gain"],
deviations={"integration_time": time_limits}, deviations={"integration_time": time_limits},
) )
if db_output: if db_output:
try: try:
inject_ccv(tempf.name, ccv_root, metadata_folder) inject_ccv(tempf.name, ccv_root, metadata_folder)
print(f"{const_name} for {mod}({pdu}) has been injected to the database.") print(f"{const_name} for {mod}({pdu}) has been injected to the database.")
except CCVAlreadyInjectedError: except CCVAlreadyInjectedError:
warning( warning(
f"{const_name} calibration constant version for {pdu}" f"{const_name} calibration constant version for {pdu}"
" has been already injected.\n") " has been already injected.\n")
if local_output: if local_output:
ofile = f"{out_folder}/const_{const_name}_{pdu}.h5" ofile = f"{out_folder}/const_{const_name}_{pdu}.h5"
if os.path.isfile(ofile): if os.path.isfile(ofile):
print(f'File {ofile} already exists and will be overwritten\n') print(f'File {ofile} already exists and will be overwritten\n')
from shutil import copyfile from shutil import copyfile
copyfile(tempf.name, ofile) copyfile(tempf.name, ofile)
print(f"Calibration constant {const_name} is stored locally at {out_folder}.\n") print(f"Calibration constant {const_name} is stored locally at {out_folder}.\n")
print("Constants parameter conditions are:\n") print("Constants parameter conditions are:\n")
print( print(
f"• Bias voltage: {bias_voltage}\n" f"• Bias voltage: {bias_voltage}\n"
f"• Memory cells: {memory_cells}\n" f"• Memory cells: {memory_cells}\n"
f"• Integration time: {integration_time}\n" f"• Integration time: {integration_time}\n"
f"• Exposure timeout: {exposure_timeout}\n" f"• Exposure timeout: {exposure_timeout}\n"
f"• Gain setting: {gain_setting}\n" f"• Gain setting: {gain_setting}\n"
f"• Gain mode: {gain_mode}\n" f"• Gain mode: {gain_mode}\n"
f"• Creation time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n") # noqa f"• Creation time: {creation_time}\n") # noqa
step_timer.done_step("Injecting constants.") step_timer.done_step("Injecting constants.")
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
print(f"Total processing time {step_timer.timespan():.01f} s") print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary() step_timer.print_summary()
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Start retrieving existing constants for comparison # Start retrieving existing constants for comparison
step_timer.start() step_timer.start()
old_const = {} old_const = {}
old_mdata = {} old_mdata = {}
# set the operating condition # set the operating condition
conditions = JUNGFRAUConditions( conditions = JUNGFRAUConditions(
sensor_bias_voltage=bias_voltage, sensor_bias_voltage=bias_voltage,
memory_cells=memory_cells, memory_cells=memory_cells,
integration_time=integration_time, integration_time=integration_time,
exposure_timeout=exposure_timeout, exposure_timeout=exposure_timeout,
gain_setting=gain_setting, gain_setting=gain_setting,
gain_mode=gain_mode, gain_mode=gain_mode,
) )
jf_caldata = CalibrationData.from_condition( jf_caldata = CalibrationData.from_condition(
conditions, conditions,
karabo_id, karabo_id,
event_at=creation_time-timedelta(seconds=60), event_at=creation_time-timedelta(seconds=60),
begin_at_strategy="prior", begin_at_strategy="prior",
) )
for mod in karabo_da: for mod in karabo_da:
old_const[mod] = {} old_const[mod] = {}
old_mdata[mod] = {} old_mdata[mod] = {}
for cname in constants.keys(): for cname in constants.keys():
cmdata = jf_caldata.get(cname, None) cmdata = jf_caldata.get(cname, None)
data_found = cmdata and mod in cmdata.aggregator_names data_found = cmdata and mod in cmdata.aggregator_names
if data_found: if data_found:
old_const[mod][cname] = cmdata[mod].ndarray() old_const[mod][cname] = cmdata[mod].ndarray()
old_mdata[mod][cname] = { old_mdata[mod][cname] = {
"timestamp": cmdata[mod].metadata("begin_validity_at"), "timestamp": cmdata[mod].metadata("begin_validity_at"),
"filepath": str(cmdata[mod].file_path()), "filepath": str(cmdata[mod].file_path()),
"dataset": cmdata[mod].dataset, "dataset": cmdata[mod].dataset,
} }
else: else:
old_const[mod][cname] = None old_const[mod][cname] = None
old_mdata[mod][cname] = { old_mdata[mod][cname] = {
"timestamp": "Not found", "timestamp": "Not found",
"filepath": None, "filepath": None,
"dataset": None, "dataset": None,
} }
step_timer.done_step('Retrieved old dark constants for comparison.') step_timer.done_step('Retrieved old dark constants for comparison.')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
display(Markdown("## The following pre-existing constants are used for comparison:")) display(Markdown("## The following pre-existing constants are used for comparison:"))
for mod, consts in old_mdata.items(): for mod, consts in old_mdata.items():
pdu = da_to_pdu[mod] pdu = da_to_pdu[mod]
display(Markdown(f"- {mod} ({pdu})")) display(Markdown(f"- {mod} ({pdu})"))
for const in consts: for const in consts:
display(Markdown(f" - {const}: {consts[const]['timestamp']}")) display(Markdown(f" - {const}: {consts[const]['timestamp']}"))
# saving locations of old constants for summary notebook # saving locations of old constants for summary notebook
with open(f"{metadata_folder or out_folder}/module_metadata_{mod}.yml", "w") as fd: with open(f"{metadata_folder or out_folder}/module_metadata_{mod}.yml", "w") as fd:
yaml.safe_dump( yaml.safe_dump(
{ {
"module": mod, "module": mod,
"pdu": pdu, "pdu": pdu,
"old-constants": old_mdata[mod], "old-constants": old_mdata[mod],
}, },
fd, fd,
) )
``` ```
......
...@@ -29,6 +29,9 @@ class ModuleNameError(KeyError): ...@@ -29,6 +29,9 @@ class ModuleNameError(KeyError):
class CalCatAPIError(requests.HTTPError): class CalCatAPIError(requests.HTTPError):
"""Used when the response includes error details as JSON""" """Used when the response includes error details as JSON"""
@property
def status_code(self):
return self.response.status_code
class CalCatAPIClient: class CalCatAPIClient:
...@@ -46,6 +49,10 @@ class CalCatAPIClient: ...@@ -46,6 +49,10 @@ class CalCatAPIClient:
# Ensure the base URL has a trailing slash # Ensure the base URL has a trailing slash
self.base_api_url = base_api_url.rstrip("/") + "/" self.base_api_url = base_api_url.rstrip("/") + "/"
def __repr__(self):
auth = " (with Oauth)" if self.oauth_client else ""
return f"<CalCatAPIClient for {self.base_api_url}{auth}>"
def default_headers(self): def default_headers(self):
return { return {
"content-type": "application/json", "content-type": "application/json",
...@@ -61,6 +68,8 @@ class CalCatAPIClient: ...@@ -61,6 +68,8 @@ class CalCatAPIClient:
return dt.astimezone(timezone.utc).isoformat() return dt.astimezone(timezone.utc).isoformat()
elif isinstance(dt, date): elif isinstance(dt, date):
return cls.format_time(datetime.combine(dt, time())) return cls.format_time(datetime.combine(dt, time()))
elif dt is None:
return "" # Not specified - for searches, this usually means now
elif not isinstance(dt, str): elif not isinstance(dt, str):
raise TypeError( raise TypeError(
f"Timestamp parameter ({dt!r}) must be a string, datetime or " f"Timestamp parameter ({dt!r}) must be a string, datetime or "
...@@ -69,7 +78,7 @@ class CalCatAPIClient: ...@@ -69,7 +78,7 @@ class CalCatAPIClient:
return dt return dt
def get_request(self, relative_url, params=None, headers=None, **kwargs): def request(self, method, relative_url, params=None, headers=None, **kwargs):
"""Make a GET request, return the HTTP response object""" """Make a GET request, return the HTTP response object"""
# Base URL may include e.g. '/api/'. This is a prefix for all URLs; # Base URL may include e.g. '/api/'. This is a prefix for all URLs;
# even if they look like an absolute path. # even if they look like an absolute path.
...@@ -77,7 +86,9 @@ class CalCatAPIClient: ...@@ -77,7 +86,9 @@ class CalCatAPIClient:
_headers = self.default_headers() _headers = self.default_headers()
if headers: if headers:
_headers.update(headers) _headers.update(headers)
return self.session.get(url, params=params, headers=_headers, **kwargs) return self.session.request(
method, url, params=params, headers=_headers, **kwargs
)
@staticmethod @staticmethod
def _parse_response(resp: requests.Response): def _parse_response(resp: requests.Response):
...@@ -89,7 +100,8 @@ class CalCatAPIClient: ...@@ -89,7 +100,8 @@ class CalCatAPIClient:
else: else:
raise CalCatAPIError( raise CalCatAPIError(
f"Error {resp.status_code} from API: " f"Error {resp.status_code} from API: "
f"{d.get('info', 'missing details')}" f"{d.get('info', 'missing details')}",
response=resp
) )
if resp.content == b"": if resp.content == b"":
...@@ -99,7 +111,7 @@ class CalCatAPIClient: ...@@ -99,7 +111,7 @@ class CalCatAPIClient:
def get(self, relative_url, params=None, **kwargs): def get(self, relative_url, params=None, **kwargs):
"""Make a GET request, return response content from JSON""" """Make a GET request, return response content from JSON"""
resp = self.get_request(relative_url, params, **kwargs) resp = self.request('GET', relative_url, params, **kwargs)
return self._parse_response(resp) return self._parse_response(resp)
_pagination_headers = ( _pagination_headers = (
...@@ -111,7 +123,7 @@ class CalCatAPIClient: ...@@ -111,7 +123,7 @@ class CalCatAPIClient:
def get_paged(self, relative_url, params=None, **kwargs): def get_paged(self, relative_url, params=None, **kwargs):
"""Make a GET request, return response content & pagination info""" """Make a GET request, return response content & pagination info"""
resp = self.get_request(relative_url, params, **kwargs) resp = self.request('GET', relative_url, params, **kwargs)
content = self._parse_response(resp) content = self._parse_response(resp)
pagination_info = { pagination_info = {
k[2:].lower().replace("-", "_"): int(resp.headers[k]) k[2:].lower().replace("-", "_"): int(resp.headers[k])
...@@ -120,6 +132,11 @@ class CalCatAPIClient: ...@@ -120,6 +132,11 @@ class CalCatAPIClient:
} }
return content, pagination_info return content, pagination_info
def post(self, relative_url, json, **kwargs):
"""Make a POST request, return response content from JSON"""
resp = self.request('POST', relative_url, json=json, **kwargs)
return self._parse_response(resp)
# ------------------ # ------------------
# Cached wrappers for simple ID lookups of fixed-ish info # Cached wrappers for simple ID lookups of fixed-ish info
# #
...@@ -136,24 +153,35 @@ class CalCatAPIClient: ...@@ -136,24 +153,35 @@ class CalCatAPIClient:
# -------------------- # --------------------
# Shortcuts to find 1 of something by an ID-like field (e.g. name) other # Shortcuts to find 1 of something by an ID-like field (e.g. name) other
# than CalCat's own integer IDs. Error on no match or >1 matches. # than CalCat's own integer IDs. Error on no match or >1 matches.
@lru_cache() @lru_cache
def detector_by_identifier(self, identifier): def get_by_name(self, endpoint, name, name_key="name"):
# The "identifier", "name" & "karabo_name" fields seem to have the same names res = self.get(endpoint, {name_key: name})
res = self.get("detectors", {"identifier": identifier})
if not res: if not res:
raise KeyError(f"No detector with identifier {identifier}") raise KeyError(f"No {endpoint[:-1]} with name {name}")
elif len(res) > 1: elif len(res) > 1:
raise ValueError(f"Multiple detectors found with identifier {identifier}") raise ValueError(f"Multiple {endpoint} found with name {name}")
return res[0] return res[0]
@lru_cache() def detector_by_identifier(self, identifier):
return self.get_by_name(
"detectors", identifier, name_key="identifier")
def calibration_by_name(self, name): def calibration_by_name(self, name):
res = self.get("calibrations", {"name": name}) return self.get_by_name("calibrations", name)
if not res:
raise KeyError(f"No calibration with name {name}") def parameter_by_name(self, name):
elif len(res) > 1: return self.get_by_name("parameters", name)
raise ValueError(f"Multiple calibrations found with name {name}")
return res[0] # Keeping the cache here instead of in other methods
# because it's less likely a new detector type will be edited
# in CalCat without the need to add new ConditionBase class for it.
@lru_cache
def detector_type_by_name(self, name):
return self.get_by_name("detector_types", name)
def pdu_by_name(self, name):
return self.get_by_name(
"physical_detector_units", name, name_key="physical_name")
global_client = None global_client = None
...@@ -207,7 +235,7 @@ def setup_client( ...@@ -207,7 +235,7 @@ def setup_client(
if oauth_client is None and base_url == CALCAT_PROXY_URL: if oauth_client is None and base_url == CALCAT_PROXY_URL:
try: try:
# timeout=(connect_timeout, read_timeout) # timeout=(connect_timeout, read_timeout)
global_client.get_request("me", timeout=(1, 5)) global_client.request("GET", "me", timeout=(1, 5))
except requests.ConnectionError as e: except requests.ConnectionError as e:
raise RuntimeError( raise RuntimeError(
"Could not connect to calibration catalog proxy. This proxy allows " "Could not connect to calibration catalog proxy. This proxy allows "
......
import binascii import binascii
import logging
import time import time
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from hashlib import md5 from hashlib import md5
from pathlib import Path from pathlib import Path
from shutil import copyfile from shutil import copyfile
from struct import pack, unpack from struct import pack, unpack
from typing import List, Optional, Union from typing import Tuple, Union
import h5py import h5py
import numpy as np import numpy as np
from calibration_client import CalibrationClient
from cal_tools.calcat_interface2 import ( from cal_tools.calcat_interface2 import (
CalCatAPIClient,
CalCatAPIError, CalCatAPIError,
get_client,
get_default_caldb_root, get_default_caldb_root,
) )
from cal_tools.restful_config import calibration_client
CONDITION_NAME_MAX_LENGTH = 60 CONDITION_NAME_MAX_LENGTH = 60
log = logging.getLogger(__name__)
class InjectAPIError(CalCatAPIError):
...
class InjectionError(Exception):
...
class CCVAlreadyInjectedError(InjectAPIError): class CCVAlreadyInjectedError(InjectionError):
... ...
@dataclass
class ParameterConditionAttribute:
value: str
lower_deviation_value: float = 0
upper_deviation_value: float = 0
flg_available: bool = True
description: str = ''
def generate_unique_condition_name(
detector_type: str,
pdu_name: str,
pdu_uuid: float,
cond_params: dict,
):
"""Generate a unique condition using UUID and timestamp.
Args:
detector_type (str): detector type.
pdu_name (str): Physical detector unit db name.
pdu_uuid (float): Physical detector unit db id.
cond_params (dict): Keys DB names, values ParameterConditionAttribute
Returns:
str: A unique name used for the table of conditions.
"""
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
for pname, pattrs in cond_params.items():
cond_hash.update(pname.encode())
cond_hash.update(str(pattrs.value).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
return unique_name[:CONDITION_NAME_MAX_LENGTH]
def create_unique_cc_name(det_type, calibration, condition_name):
"""
Generating CC name from condition name,
detector type, and calibration name.
"""
cc_name_1 = f'{det_type}_{calibration}'
# I guess there is a limit to the name characters?
return f'{cc_name_1[:40]}_{condition_name}'
def create_unique_ccv_name(start_idx):
# Generate unique name if it doesn't exist
datetime_str = datetime.now(
timezone.utc).strftime('%Y%m%d_%H%M%S')
return f'{datetime_str}_sIdx={start_idx}'
def extract_parameter_conditions(
ccv_group: dict, pdu_uuid: int) -> dict:
def _to_string(value):
"""Send only accepted value types to CALCAT."""
if isinstance(value, bool):
value = float(value)
return str(value)
cond_params = {}
condition_group = ccv_group['operating_condition']
# It's really not ideal we're mixing conditionS and condition now.
# Get parameter data and create a list of `ParameterConditionAttribute`s
for parameter in condition_group:
param_dset = condition_group[parameter]
param_name = param_dset.attrs['database_name']
cond_params[param_name] = ParameterConditionAttribute(
value=_to_string(param_dset[()]),
lower_deviation_value=param_dset.attrs['lower_deviation'],
upper_deviation_value=param_dset.attrs['upper_deviation'],
)
# Add PDU "UUID" to parameters.
cond_params['Detector UUID'] = ParameterConditionAttribute(
value=_to_string(unpack('d', pack('q', pdu_uuid))[0]),
)
return cond_params
def write_ccv( def write_ccv(
const_path, const_path,
pdu_name, pdu_uuid, detector_type, pdu_name, pdu_uuid, detector_type,
...@@ -130,40 +218,68 @@ def get_condition_dict( ...@@ -130,40 +218,68 @@ def get_condition_dict(
} }
def generate_unique_condition_name( def get_or_create_calibration_constant(
client: CalCatAPIClient,
calibration: str,
detector_type: str,
condition_id: int,
condition_name: str,
):
cond_id = condition_id
# Prepare some parameters to set Calibration Constant.
cal_id = client.calibration_by_name(calibration)['id']
det_type_id = client.detector_type_by_name(detector_type)['id']
cc_name = create_unique_cc_name(detector_type, calibration, condition_name)
calibration_constant = dict(
name=cc_name,
calibration_id=cal_id,
condition_id=cond_id,
detector_type_id=det_type_id,
flg_auto_approve=True,
flg_available=True,
description="",
)
try:
cc_id = client.get(
f"calibrations/{cal_id}/get_calibration_constant",
calibration_constant
)['id']
log.debug("Retrieved existing calibration constant ID %s", cc_id)
except CalCatAPIError as e:
if e.status_code != 404:
raise
cc_id = client.post("calibration_constants", calibration_constant)['id']
log.debug("Created calibration constant ID %s", cc_id)
return cc_id
def create_condition(
client: CalCatAPIClient,
detector_type: str, detector_type: str,
pdu_name: str, pdu_name: str,
pdu_uuid: float, pdu_uuid: float,
cond_params: List[dict], cond_params: dict,
): ) -> Tuple[int, str]:
"""Generate a unique condition using UUID and timestamp. # Create condition unique name
cond_name = generate_unique_condition_name(
Args: detector_type, pdu_name, pdu_uuid, cond_params)
detector_type (str): detector type.
pdu_name (str): Physical detector unit db name.
pdu_uuid (float): Physical detector unit db id.
cond_params (List[dict]): A list of dictionary with each condition
e.g. [{
"parameter_name": "Memory Cells",
"value": 352.0,
"lower-deviation": 0.0,
"upper-deviation": 0.0
}]
Returns:
str: A unique name used for the table of conditions.
"""
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
cond_hash.update(int(pdu_uuid).to_bytes(
length=8, byteorder='little', signed=False))
for param_dict in cond_params:
cond_hash.update(str(param_dict['parameter_name']).encode())
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode() # Create condition in database, if not already there.
return unique_name[:CONDITION_NAME_MAX_LENGTH] cond = dict(
name=cond_name,
parameters_conditions_attributes=[
asdict(cond) | {"parameter_name": db_name}
for (db_name, cond) in cond_params.items()
],
flg_available=True,
description='',
)
resp = client.post(
"conditions/set_expected_condition", {"condition": cond}
)
return resp['id'], cond_name
def get_raw_data_location(proposal: str, runs: list): def get_raw_data_location(proposal: str, runs: list):
...@@ -174,112 +290,112 @@ def get_raw_data_location(proposal: str, runs: list): ...@@ -174,112 +290,112 @@ def get_raw_data_location(proposal: str, runs: list):
return "" # Fallback for non-run based constants return "" # Fallback for non-run based constants
def inject_ccv( def get_ccv_info_from_file(
const_src: Union[Path, str], cfile: Union[str, Path], pdu: str, ccv_root: str):
ccv_root: str,
report_to: Optional[str] = None,
):
"""Inject new CCV into CalCat.
Args:
const_path (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
Raises:
RuntimeError: If CalCat POST request fails.
""" """
pdu_name, calibration, _ = ccv_root.lstrip('/').split('/') Read CCV HDF5 file to get calibration parameters.
with h5py.File(const_src, 'r') as const_file: Args:
if ccv_root not in const_file: cfile (str, Path): The CalibrationConstantVersion file path.
raise ValueError( pdu (str): The Physical detector unit name for the stored constant.
f"Invalid HDF5 structure: {ccv_root} not found in file.") ccv_root (str): The CCV root dataset path to access the data.
pdu_group = const_file[pdu_name] Returns:
List[ParameterConditionAttribute], str, float, str, str
"""
with h5py.File(cfile, 'r') as const_file:
pdu_group = const_file[pdu]
pdu_uuid = pdu_group.attrs['uuid'] pdu_uuid = pdu_group.attrs['uuid']
detector_type = pdu_group.attrs['detector_type'] detector_type = pdu_group.attrs['detector_type']
ccv_group = const_file[ccv_root] ccv_group = const_file[ccv_root]
raw_data_location = get_raw_data_location(
ccv_group.attrs['proposal'],
ccv_group.attrs['runs']
)
begin_at = ccv_group.attrs['begin_at']
cond_params = extract_parameter_conditions(ccv_group, pdu_uuid)
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs'] return cond_params, begin_at, pdu_uuid, detector_type, raw_data_location
begin_at_str = ccv_group.attrs['begin_at']
condition_group = ccv_group['operating_condition']
cond_params = []
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
cond_params.append(get_condition_dict(
param_dset.attrs['database_name'],
param_dset[()],
param_dset.attrs['lower_deviation'],
param_dset.attrs['upper_deviation'],
))
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}' def inject_ccv(const_src, ccv_root, report_to=None, client=None):
const_filename = f'cal.{time.time()}.h5' """Inject new CCV into CalCat.
unique_name = generate_unique_condition_name(
detector_type, pdu_name, pdu_uuid, cond_params)
raw_data_location = get_raw_data_location(proposal, runs) Args:
const_src (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
client (CalCatAPIClient, optional): Client for CalCat API.
Raises:
RuntimeError: If CalCat POST request fails.
"""
if client is None:
client = get_client()
# Add PDU "UUID" to parameters. pdu_name, calibration, _ = ccv_root.lstrip('/').split('/')
cond_params.append(get_condition_dict(
'Detector UUID',
unpack('d', pack('q', pdu_uuid))[0]
))
inject_h = {
'detector_condition': {
'name': unique_name,
'parameters': cond_params
},
'calibration_constant': {
'calibration_name': calibration,
'detector_type_name': detector_type,
'flg_auto_approve': True
},
'calibration_constant_version': {
'raw_data_location': raw_data_location,
'file_name': const_filename,
'path_to_file': const_rel_path,
'data_set_name': f'{pdu_name}/{calibration}/0',
'start_idx': '0',
'end_idx': '0',
'begin_validity_at': begin_at_str,
'end_validity_at': '',
'begin_at': begin_at_str,
'pdu_physical_name': pdu_name,
'flg_good_quality': True
}
}
if report_to: (
report_path = Path(report_to).absolute().with_suffix('.pdf') cond_params, begin_at, pdu_uuid, detector_type, raw_data_location
inject_h['report'] = { ) = get_ccv_info_from_file(const_src, pdu_name, ccv_root)
'name': report_path.stem,
'file_path': str(report_path)
}
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
const_dest = get_default_caldb_root() / const_rel_path / const_filename const_dest = get_default_caldb_root() / const_rel_path / const_filename
const_dest.parent.mkdir(parents=True, exist_ok=True) const_dest.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_src, const_dest) copyfile(const_src, const_dest)
# TODO: Consider catching `RequestException`s try:
# when bypassing calibration_client condition_id, condition_name = create_condition(
resp = CalibrationClient.inject_new_calibration_constant_version( client, detector_type, pdu_name, pdu_uuid, cond_params)
calibration_client(), inject_h) log.debug("Condition ID: %s & name: %r", condition_id, condition_name)
if not resp['success']: # Create Calibration Constant in database, if not available.
cc_id = get_or_create_calibration_constant(
client, calibration, detector_type, condition_id, condition_name)
# Create report in database, if not available
report_id = None
if report_to:
report_path = Path(report_to).absolute().with_suffix('.pdf')
resp = client.post("reports/set", dict(
name=report_path.name,
file_path=str(report_path),
flg_available=True,
description="",
))
report_id = resp['id']
log.debug("CalCat report ID: %s", report_id)
# Get PDU ID before creating new CCV.
pdu_id = client.pdu_by_name(pdu_name)['id']
# Prepare CCV data and inject it to CALCAT.
start_idx = 0
ccv = dict(
name=create_unique_ccv_name(start_idx),
file_name=const_filename,
path_to_file=const_rel_path,
data_set_name=ccv_root,
calibration_constant_id=cc_id,
physical_detector_unit_id=pdu_id,
raw_data_location=raw_data_location,
report_id=report_id,
begin_validity_at=begin_at,
end_validity_at='',
begin_at=begin_at,
start_idx=start_idx,
end_idx=0,
flg_deployed=True,
flg_good_quality=True,
description='',
)
try:
client.post("calibration_constant_versions", ccv)
except CalCatAPIError as e:
if e.status_code == 422:
raise CCVAlreadyInjectedError
raise
except Exception:
const_dest.unlink() # Delete already copied CCV file. const_dest.unlink() # Delete already copied CCV file.
# TODO: Remove this when the new injection code is added. raise
if (
resp['status_code'] == 422 and
"taken" in resp['app_info'].get("begin_at", [""])[0]
):
raise CCVAlreadyInjectedError
else:
raise RuntimeError(resp)
...@@ -44,7 +44,7 @@ def calibration_client(): ...@@ -44,7 +44,7 @@ def calibration_client():
scope='') scope='')
def extra_calibration_client(): def extra_calibration_client(inject=False):
"""Obtain an initialized CalCatAPIClient object.""" """Obtain an initialized CalCatAPIClient object."""
from cal_tools import calcat_interface2 from cal_tools import calcat_interface2
...@@ -57,6 +57,12 @@ def extra_calibration_client(): ...@@ -57,6 +57,12 @@ def extra_calibration_client():
base_api_url = calcat_config['base-api-url'].rstrip('/') base_api_url = calcat_config['base-api-url'].rstrip('/')
assert base_api_url.endswith('/api') assert base_api_url.endswith('/api')
base_url = base_api_url[:-4] base_url = base_api_url[:-4]
if inject and '//exflcalproxy' in base_url:
raise ValueError(
"cal_tools would use exflcalproxy to talk to CalCat, but this "
"provides read-only access, and we want to inject constants. "
"You need to configure a connection to CalCat using Oauth."
)
calcat_interface2.setup_client( calcat_interface2.setup_client(
base_url, base_url,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment