Skip to content
Snippets Groups Projects
Commit a01ae5ee authored by Karim Ahmed's avatar Karim Ahmed
Browse files

Merge branch 'fix/error_out_all_modules_empty' into 'master'

error out only if all sources are empty

See merge request !817
parents 073d2954 bc159d5f
No related branches found
No related tags found
1 merge request!817[AGIPD] [Correct] error out only if all sources are empty
%% Cell type:markdown id: tags:
# AGIPD Retrieving Constants Pre-correction #
Author: European XFEL Detector Group, Version: 1.0
Retrieving Required Constants for Offline Calibration of the AGIPD Detector
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SPB/202030/p900119/raw" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/AGIPD_" # the folder to output to, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
modules = [-1] # modules to correct, set to -1 for all, range allowed
run = 80 # runs to process, required
karabo_id = "SPB_DET_AGIPD1M-1" # karabo karabo_id
karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators
ctrl_source_template = '{}/MDL/FPGA_COMP_TEST' # path to control information
instrument_source_template = '{}/DET/{}:xtdf' # path in the HDF5 file to images
receiver_template = "{}CH0" # inset for receiver devices
karabo_id_control = "SPB_IRU_AGIPD1M1" # karabo-id for control device
# Parameters for calibration database.
cal_db_interface = "tcp://max-exfl016:8015#8045" # the database interface to use
creation_date_offset = "00:00:00" # add an offset to creation date, e.g. to get different constants
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
slopes_ff_from_files = "" # Path to locally stored SlopesFF and BadPixelsFF constants
mem_cells = -1 # number of memory cells used, set to 0 to automatically infer
bias_voltage = -1 # bias voltage, set to 0 to use stored value in slow data.
acq_rate = -1. # the detector acquisition rate, use 0 to try to auto-determine
gain_setting = -1 # the gain setting, use -1 to use value stored in slow data.
gain_mode = -1 # gain mode (0: adaptive, 1-3 fixed high/med/low, -1: read from CONTROL data)
integration_time = -1 # integration time, negative values for auto-detection.
# Correction Booleans
only_offset = False # Apply only Offset correction. if False, Offset is applied by Default. if True, Offset is only applied.
rel_gain = False # do relative gain correction based on PC data
xray_gain = True # do relative gain correction based on xray data
blc_noise = False # if set, baseline correction via noise peak location is attempted
blc_stripes = False # if set, baseline corrected via stripes
blc_hmatch = False # if set, base line correction via histogram matching is attempted
adjust_mg_baseline = False # adjust medium gain baseline to match highest high gain value
```
%% Cell type:code id: tags:
``` python
# Fill dictionaries comprising bools and arguments for correction and data analysis
# Here the hierarichy and dependencies for correction booleans are defined
corr_bools = {}
# offset is at the bottom of AGIPD correction pyramid.
corr_bools["only_offset"] = only_offset
# Dont apply any corrections if only_offset is requested
if not only_offset:
corr_bools["adjust_mg_baseline"] = adjust_mg_baseline
corr_bools["rel_gain"] = rel_gain
corr_bools["xray_corr"] = xray_gain
corr_bools["blc_noise"] = blc_noise
corr_bools["blc_hmatch"] = blc_hmatch
```
%% Cell type:code id: tags:
``` python
import numpy as np
from logging import warning
from pathlib import Path
from typing import Tuple
import multiprocessing
from datetime import timedelta
from dateutil import parser
from extra_data import RunDirectory
from cal_tools.agipdlib import (
AgipdCtrl,
SnowResolution,
assemble_constant_dict,
)
from cal_tools.enums import AgipdGainMode
from cal_tools.tools import (
calcat_creation_time,
get_from_db,
module_index_to_qm,
CalibrationMetadata,
)
from iCalibrationDB import Conditions, Constants, Detectors
```
%% Cell type:code id: tags:
``` python
# slopes_ff_from_files left as str for now
in_folder = Path(in_folder)
out_folder = Path(out_folder)
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata.setdefault("retrieved-constants", {})
```
%% Cell type:code id: tags:
``` python
# Run's creation time:
creation_time = calcat_creation_time(in_folder, run, creation_time)
offset = parser.parse(creation_date_offset)
delta = timedelta(hours=offset.hour, minutes=offset.minute, seconds=offset.second)
creation_time += delta
print(f"Creation time: {creation_time}")
print(f"Outputting to {out_folder}")
out_folder.mkdir(parents=True, exist_ok=True)
melt_snow = False if corr_bools["only_offset"] else SnowResolution.NONE
```
%% Cell type:code id: tags:
``` python
ctrl_src = ctrl_source_template.format(karabo_id_control)
print(f"Detector in use is {karabo_id}")
# Extracting Instrument string
instrument = karabo_id.split("_")[0]
# Evaluate detector instance for mapping
if instrument == "SPB":
nmods = 16
elif instrument == "MID":
nmods = 16
elif instrument == "HED":
nmods = 8
print(f"Instrument {instrument}")
if karabo_da[0] == '-1':
if modules[0] == -1:
modules = list(range(nmods))
karabo_da = ["AGIPD{:02d}".format(i) for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
```
%% Cell type:code id: tags:
``` python
run_dc = RunDirectory(in_folder / f"r{run:04d}")
instrument_src = instrument_source_template.format(karabo_id, receiver_template)
instr_dc = run_dc.select(instrument_src.format("*"), require_all=True)
instr_dc = run_dc.select(instrument_src.format("*"))
for m in modules:
# Remove empty sources from `instr_dc`
if instr_dc[instrument_src.format(m), 'image.data'].shape[0] == 0:
instr_dc = instr_dc.deselect(instrument_src.format(m))
if not instr_dc.train_ids:
if not instr_dc.all_sources:
raise ValueError(f"No images found for {in_folder / f'r{run:04d}'}")
```
%% Cell type:code id: tags:
``` python
agipd_cond = AgipdCtrl(
run_dc=run_dc,
image_src=None, # Not neededed, as we wont read mem_cells or acq_rate.
ctrl_src=ctrl_src,
)
if gain_setting == -1:
gain_setting = agipd_cond.get_gain_setting(creation_time)
if bias_voltage == -1:
bias_voltage = agipd_cond.get_bias_voltage(karabo_id_control)
if integration_time == -1:
integration_time = agipd_cond.get_integration_time()
if gain_mode == -1:
gain_mode = agipd_cond.get_gain_mode()
else:
gain_mode = AgipdGainMode(gain_mode)
```
%% Cell type:markdown id: tags:
## Retrieve Constants ##
%% Cell type:code id: tags:
``` python
pc_bools = [ # flags that points to the need for retrieving SlopesPC and BadPixelsPC constants.
corr_bools.get("rel_gain"),
corr_bools.get("adjust_mg_baseline"),
corr_bools.get('blc_noise'),
corr_bools.get('blc_hmatch'),
corr_bools.get('blc_stripes'),
melt_snow,
]
```
%% Cell type:code id: tags:
``` python
def retrieve_constants(
k_da: str, idx: int
) -> Tuple[str, str, float, float, str, dict]:
"""
Retrieve constants for a module.
:return:
k_da: karabo data aggregator.
acq_rate: acquisition rate parameter.
mem_cells: number of memory cells.
mdata_dict: (DICT) dictionary with the metadata for the retrieved constants.
"""
# check if this module has images to process.
if instrument_src.format(idx) not in instr_dc.all_sources:
print("ERROR: No raw images found for "
f"{module_index_to_qm(idx)}({k_da}).")
return None, k_da, None, None
agipd_cond.image_src = instrument_src.format(idx)
if mem_cells == -1:
# Read value from fast data.
local_mem_cells = agipd_cond.get_num_cells()
else:
# or use overriding notebook parameter.
local_mem_cells = mem_cells
if acq_rate == -1.:
local_acq_rate = agipd_cond.get_acq_rate()
else:
local_acq_rate = acq_rate
const_dict = assemble_constant_dict(
corr_bools,
pc_bools,
local_mem_cells,
bias_voltage,
gain_setting,
local_acq_rate,
photon_energy=9.2,
gain_mode=gain_mode,
beam_energy=None,
only_dark=False,
integration_time=integration_time
)
# Retrieve multiple constants through an input dictionary
# to return a dict of useful metadata.
mdata_dict = dict()
mdata_dict["constants"] = dict()
mdata_dict["physical-detector-unit"] = None # initialization
for const_name, (const_init_fun, const_shape, (cond_type, cond_param)) in const_dict.items(): # noqa
if gain_mode and const_name in ("ThresholdsDark",):
continue
# saving metadata in a dict
const_mdata = dict()
mdata_dict["constants"][const_name] = const_mdata
if slopes_ff_from_files and const_name in ["SlopesFF", "BadPixelsFF"]:
const_mdata["file-path"] = (
f"{slopes_ff_from_files}/slopesff_bpmask_module_{module_index_to_qm(idx)}.h5") # noqa
const_mdata["creation-time"] = "00:00:00"
continue
if gain_mode and const_name in (
"BadPixelsPC", "SlopesPC", "BadPixelsFF", "SlopesFF"
):
param_copy = cond_param.copy()
del param_copy["gain_mode"]
condition = getattr(Conditions, cond_type).AGIPD(**param_copy)
else:
condition = getattr(Conditions, cond_type).AGIPD(**cond_param)
_, mdata = get_from_db(
karabo_id,
k_da,
getattr(Constants.AGIPD, const_name)(),
condition,
getattr(np, const_init_fun)(const_shape),
cal_db_interface,
creation_time,
meta_only=True,
verbosity=0,
)
mdata_const = mdata.calibration_constant_version
# check if constant was sucessfully retrieved.
if mdata.comm_db_success:
const_mdata["file-path"] = (
f"{mdata_const.hdf5path}" f"{mdata_const.filename}"
)
const_mdata["creation-time"] = f"{mdata_const.begin_at}"
mdata_dict["physical-detector-unit"] = mdata_const.device_name
else:
const_mdata["file-path"] = const_dict[const_name][:2]
const_mdata["creation-time"] = None
return mdata_dict, k_da, local_acq_rate, local_mem_cells
```
%% Cell type:code id: tags:
``` python
inp = []
da_to_qm = dict()
for module_index, k_da in zip(modules, karabo_da):
da_to_qm[k_da] = module_index_to_qm(module_index)
if k_da in retrieved_constants:
print(
f"Constant for {k_da} already in calibration_metadata.yml, won't query again.")
continue
inp.append((k_da, module_index))
```
%% Cell type:code id: tags:
``` python
with multiprocessing.Pool(processes=nmods) as pool:
results = pool.starmap(retrieve_constants, inp)
```
%% Cell type:code id: tags:
``` python
acq_rate_mods = []
mem_cells_mods = []
for md_dict, k_da, acq_rate, mem_cells in results:
if acq_rate is None and mem_cells is None:
continue
md_dict, k_da, acq_rate, mem_cells
retrieved_constants[k_da] = md_dict
mem_cells_mods.append(mem_cells)
acq_rate_mods.append(acq_rate)
# Validate that mem_cells and acq_rate are the same for all modules.
# TODO: Should a warning be enough?
if len(set(mem_cells_mods)) != 1 or len(set(acq_rate_mods)) != 1:
print(
"WARNING: Number of memory cells or "
"acquisition rate are not identical for all modules.\n"
f"mem_cells: {mem_cells_mods}.\nacq_rate: {acq_rate_mods}.")
# check if it is requested not to retrieve any constants from the database
print("\nRetrieved constants for modules:",
', '.join([module_index_to_qm(x) for x in modules]))
print(f"Operating conditions are:")
print(f"• Bias voltage: {bias_voltage}")
print(f"• Memory cells: {mem_cells}")
print(f"• Acquisition rate: {acq_rate}")
print(f"• Gain mode: {gain_mode.name}")
print(f"• Gain setting: {gain_setting}")
print(f"• Integration time: {integration_time}")
print(f"• Photon Energy: 9.2")
print("Constant metadata is saved under \"retrieved-constants\" in calibration_metadata.yml\n")
```
%% Cell type:code id: tags:
``` python
print("Using constants with creation times:")
timestamps = {}
for k_da, module_name in da_to_qm.items():
if k_da not in retrieved_constants.keys():
continue
module_timestamps = timestamps[module_name] = {}
module_constants = retrieved_constants[k_da]
print(f"{module_name}:")
for cname, mdata in module_constants["constants"].items():
if hasattr(mdata["creation-time"], 'strftime'):
mdata["creation-time"] = mdata["creation-time"].strftime('%y-%m-%d %H:%M')
print(f'{cname:.<12s}', mdata["creation-time"])
for cname in ['Offset', 'SlopesPC', 'SlopesFF']:
if cname in module_constants["constants"]:
module_timestamps[cname] = module_constants["constants"][cname]["creation-time"]
else:
module_timestamps[cname] = "NA"
time_summary = retrieved_constants.setdefault("time-summary", {})
time_summary["SAll"] = timestamps
metadata.save()
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment