Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • calibration/pycalibration
1 result
Show changes
Commits on Source (20)
Showing
with 579 additions and 324 deletions
# .readthedocs.yaml
# Read the Docs configuration file
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
# Required
version: 2
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/source/conf.py
fail_on_warning: false
# Optionally set the version of Python and requirements required to build your docs
python:
version: 3.8
install:
- requirements: docs/requirements.txt
- method: pip
path: .
extra_requirements:
- docs
iCalibrationDB @ git+https://xcalgitlab:${GITHUB_TOKEN}@git.xfel.eu/gitlab/detectors/cal_db_interactive.git@2.0.9
\ No newline at end of file
......@@ -13,6 +13,8 @@
# All configuration values have a default; values that are commented out
# serve to show the default.
import glob
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
......@@ -20,6 +22,23 @@
# import os
# import sys
# sys.path.insert(0, os.path.abspath('.'))
import os
import shutil
import sys
import textwrap
from datetime import datetime
from subprocess import Popen, check_output
from textwrap import dedent, indent
from uuid import uuid4
import nbformat
import tabulate
from dateutil.parser import parse
from lxml import etree
from nbconvert import RSTExporter
# generate the list of available notebooks
from xfel_calibrate import notebooks
# -- General configuration ------------------------------------------------
......@@ -39,11 +58,6 @@ extensions = [
'sphinx.ext.viewcode',
]
import glob
import os
import sys
from subprocess import Popen
sys.path.append(os.path.abspath("../pycalibration/"))
p = Popen(["./makeAllDocs.sh"])
p.communicate()
......@@ -374,18 +388,7 @@ except:
check_call(["wget", pandoc_url])
check_call(["dpkg", "-i", pandoc_pack])
import os
from subprocess import check_output
from textwrap import dedent, indent
import nbformat
from nbconvert import RSTExporter
# generate the list of available notebooks
from xfel_calibrate import notebooks
rst_exporter = RSTExporter()
with open("available_notebooks.rst", "w") as f:
f.write(dedent("""
.. _available_notebooks:
......@@ -407,7 +410,8 @@ with open("available_notebooks.rst", "w") as f:
for caltype in sorted(values.keys()):
data = values[caltype]
if data.get("notebook", None) is None:
continue
nbpath = os.path.abspath("{}/../../../{}".format(__file__, data["notebook"]))
with open(nbpath, "r") as nf:
nb = nbformat.read(nf, as_version=4)
......@@ -440,16 +444,7 @@ with open("available_notebooks.rst", "w") as f:
f.write("\n\n")
# add test results
test_artefact_dir = os.path.realpath("../../tests/artefacts")
import shutil
import textwrap
from datetime import datetime
from uuid import uuid4
import tabulate
from dateutil.parser import parse
from lxml import etree
test_artefact_dir = os.path.realpath("../../tests/legacy/artefacts")
def xml_to_rst_report(xml, git_tag, reports=[]):
......@@ -575,7 +570,6 @@ Contents:
"""
if not os.path.exists("./test_rsts"):
os.makedirs("./test_rsts")
with open("test_results.rst", "w") as f:
f.write(header)
for commit, modtime in sorted_dir(test_artefact_dir):
......@@ -610,5 +604,6 @@ with open("test_results.rst", "w") as f:
fr.write(rst)
f.write(" test_rsts/{}\n".format(commit))
def setup(app):
app.add_stylesheet('css/test_decorators.css')
%% Cell type:markdown id: tags:
# AGIPD Retrieving Constants Pre-correction #
Author: European XFEL Detector Group, Version: 1.0
Retrieving Required Constants for Offline Calibration of the AGIPD Detector
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SPB/202030/p900119/raw" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/AGIPD_" # the folder to output to, required
sequences = [-1] # sequences to correct, set to -1 for all, range allowed
modules = [-1] # modules to correct, set to -1 for all, range allowed
run = 80 # runs to process, required
karabo_id = "SPB_DET_AGIPD1M-1" # karabo karabo_id
karabo_da = ['-1'] # a list of data aggregators names, Default [-1] for selecting all data aggregators
path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data
h5path_ctrl = '/CONTROL/{}/MDL/FPGA_COMP_TEST' # path to control information
karabo_id_control = "SPB_IRU_AGIPD1M1" # karabo-id for control device
karabo_da_control = 'AGIPD1MCTRL00' # karabo DA for control infromation
use_dir_creation_date = True # use the creation data of the input dir for database queries
cal_db_interface = "tcp://max-exfl016:8015#8045" # the database interface to use
creation_date_offset = "00:00:00" # add an offset to creation date, e.g. to get different constants
slopes_ff_from_files = "" # Path to locally stored SlopesFF and BadPixelsFF constants
calfile = "" # path to calibration file. Leave empty if all data should come from DB
nodb = False # if set only file-based constants will be used
mem_cells = 0 # number of memory cells used, set to 0 to automatically infer
bias_voltage = 300
acq_rate = 0. # the detector acquisition rate, use 0 to try to auto-determine
gain_setting = 0.1 # the gain setting, use 0.1 to try to auto-determine
gain_mode = -1 # gain mode (0: adaptive, 1-3 fixed high/med/low, -1: read from CONTROL data)
photon_energy = 9.2 # photon energy in keV
max_cells_db_dark = 0 # set to a value different than 0 to use this value for dark data DB queries
max_cells_db = 0 # set to a value different than 0 to use this value for DB queries
integration_time = -1 # integration time, negative values for auto-detection.
# Correction Booleans
only_offset = False # Apply only Offset correction. if False, Offset is applied by Default. if True, Offset is only applied.
rel_gain = False # do relative gain correction based on PC data
xray_gain = True # do relative gain correction based on xray data
blc_noise = False # if set, baseline correction via noise peak location is attempted
blc_stripes = False # if set, baseline corrected via stripes
blc_hmatch = False # if set, base line correction via histogram matching is attempted
match_asics = False # if set, inner ASIC borders are matched to the same signal level
adjust_mg_baseline = False # adjust medium gain baseline to match highest high gain value
```
%% Cell type:code id: tags:
``` python
# Fill dictionaries comprising bools and arguments for correction and data analysis
# Here the hierarichy and dependencies for correction booleans are defined
corr_bools = {}
# offset is at the bottom of AGIPD correction pyramid.
corr_bools["only_offset"] = only_offset
# Dont apply any corrections if only_offset is requested
if not only_offset:
corr_bools["adjust_mg_baseline"] = adjust_mg_baseline
corr_bools["rel_gain"] = rel_gain
corr_bools["xray_corr"] = xray_gain
corr_bools["blc_noise"] = blc_noise
corr_bools["blc_hmatch"] = blc_hmatch
```
%% Cell type:code id: tags:
``` python
from typing import List, Tuple
import matplotlib
import numpy as np
matplotlib.use("agg")
import multiprocessing
from datetime import timedelta
from pathlib import Path
import matplotlib.pyplot as plt
from cal_tools import agipdlib, tools
from cal_tools.enums import AgipdGainMode
from dateutil import parser
from iCalibrationDB import Conditions, Constants, Detectors
```
%% Cell type:code id: tags:
``` python
# slopes_ff_from_files left as str for now
in_folder = Path(in_folder)
out_folder = Path(out_folder)
metadata = tools.CalibrationMetadata(out_folder)
```
%% Cell type:code id: tags:
``` python
max_cells = mem_cells
creation_time = None
if use_dir_creation_date:
creation_time = tools.get_dir_creation_date(str(in_folder), run)
offset = parser.parse(creation_date_offset)
delta = timedelta(hours=offset.hour, minutes=offset.minute, seconds=offset.second)
creation_time += delta
print(f"Using {creation_time} as creation time")
if sequences[0] == -1:
sequences = None
print(f"Outputting to {out_folder}")
out_folder.mkdir(parents=True, exist_ok=True)
melt_snow = False if corr_bools["only_offset"] else agipdlib.SnowResolution.NONE
```
%% Cell type:code id: tags:
``` python
control_fn = in_folder / f'r{run:04d}' / f'RAW-R{run:04d}-{karabo_da_control}-S00000.h5'
h5path_ctrl = h5path_ctrl.format(karabo_id_control)
slow_paths = (control_fn, karabo_id_control)
if gain_setting == 0.1:
if creation_time.replace(tzinfo=None) < parser.parse('2020-01-31'):
print("Set gain-setting to None for runs taken before 2020-01-31")
gain_setting = None
else:
try:
gain_setting = agipdlib.get_gain_setting(str(control_fn), h5path_ctrl)
except Exception as e:
print(f'ERROR: while reading gain setting from: \n{control_fn}')
print(e)
print("Set gain setting to 0")
gain_setting = 0
# Evaluate gain mode (operation mode)
if gain_mode < 0:
gain_mode = get_gain_mode(control_fn, h5path_ctrl)
gain_mode = agipdlib.get_gain_mode(control_fn, h5path_ctrl)
else:
gain_mode = AgipdGainMode(gain_mode)
# Evaluate integration time
if integration_time < 0:
integration_time = agipdlib.get_integration_time(control_fn, h5path_ctrl)
print(f"Gain setting: {gain_setting}")
print(f"Gain mode: {gain_mode.name}")
print(f"Detector in use is {karabo_id}")
# Extracting Instrument string
instrument = karabo_id.split("_")[0]
# Evaluate detector instance for mapping
if instrument == "SPB":
dinstance = "AGIPD1M1"
nmods = 16
elif instrument == "MID":
dinstance = "AGIPD1M2"
nmods = 16
elif instrument == "HED":
dinstance = "AGIPD500K"
nmods = 8
print(f"Instrument {instrument}")
print(f"Detector instance {dinstance}")
if karabo_da[0] == '-1':
if modules[0] == -1:
modules = list(range(nmods))
karabo_da = ["AGIPD{:02d}".format(i) for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
```
%% Cell type:markdown id: tags:
## Retrieve Constants ##
%% Cell type:code id: tags:
``` python
def retrieve_constants(
qm_files: List[Path], qm: str, karabo_da: str, idx: int
) -> Tuple[str, str, float, float, str, dict]:
"""
Retrieve constants for a module.
:return:
qm: module virtual name i.e. Q1M1.
karabo_da: karabo data aggregator.
acq_rate: acquisition rate parameter.
max_cells: number of memory cells.
mdata_dict: (DICT) dictionary with the metadata for the retrieved constants.
"""
if max_cells != 0:
# either use overriding notebook parameter
local_max_cells = max_cells
else:
# or look around in sequence files
for f in qm_files:
local_max_cells = agipdlib.get_num_cells(f, karabo_id, idx)
if local_max_cells is not None:
break
# maybe we never found this in a sequence file...
if local_max_cells is None:
raise ValueError(f"No raw images found for {qm} for all sequences")
if acq_rate == 0:
local_acq_rate = agipdlib.get_acq_rate(
fast_paths=(f, karabo_id, idx), slow_paths=slow_paths)
else:
local_acq_rate = acq_rate
# avoid retrieving constant, if requested.
if nodb_with_dark:
return
const_dict = agipdlib.assemble_constant_dict(
corr_bools,
pc_bools,
local_max_cells,
bias_voltage,
gain_setting,
local_acq_rate,
photon_energy,
gain_mode=gain_mode,
beam_energy=None,
only_dark=only_dark,
integration_time=integration_time
)
# Retrieve multiple constants through an input dictionary
# to return a dict of useful metadata.
mdata_dict = dict()
mdata_dict["constants"] = dict()
mdata_dict["physical-detector-unit"] = None # initialization
for const_name, (const_init_fun, const_shape, (cond_type, cond_param)) in const_dict.items():
if gain_mode and const_name in ("ThresholdsDark",):
continue
# saving metadata in a dict
const_mdata = dict()
mdata_dict["constants"][const_name] = const_mdata
if slopes_ff_from_files and const_name in ["SlopesFF", "BadPixelsFF"]:
const_mdata["file-path"] = f"{slopes_ff_from_files}/slopesff_bpmask_module_{qm}.h5"
const_mdata["creation-time"] = "00:00:00"
continue
if gain_mode and const_name in ("BadPixelsPC", "SlopesPC", "BadPixelsFF", "SlopesFF"):
param_copy = cond_param.copy()
del param_copy["gain_mode"]
condition = getattr(Conditions, cond_type).AGIPD(**param_copy)
else:
condition = getattr(Conditions, cond_type).AGIPD(**cond_param)
_, mdata = tools.get_from_db(
karabo_id,
karabo_da,
getattr(Constants.AGIPD, const_name)(),
condition,
getattr(np, const_init_fun)(const_shape),
cal_db_interface,
creation_time,
meta_only=True,
verbosity=0,
)
mdata_const = mdata.calibration_constant_version
# check if constant was sucessfully retrieved.
if mdata.comm_db_success:
const_mdata["file-path"] = (
f"{mdata_const.hdf5path}" f"{mdata_const.filename}"
)
const_mdata["creation-time"] = f"{mdata_const.begin_at}"
mdata_dict["physical-detector-unit"] = mdata_const.device_name
else:
const_mdata["file-path"] = const_dict[const_name][:2]
const_mdata["creation-time"] = None
return qm, mdata_dict, karabo_da, local_acq_rate, local_max_cells
```
%% Cell type:code id: tags:
``` python
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata.setdefault("retrieved-constants", {})
```
%% Cell type:code id: tags:
``` python
# set everything up filewise
mapped_files, _, _, _, _ = tools.map_modules_from_folder(
str(in_folder), run, path_template, karabo_da, sequences
)
pc_bools = [corr_bools.get("rel_gain"),
corr_bools.get("adjust_mg_baseline"),
corr_bools.get('blc_noise'),
corr_bools.get('blc_hmatch'),
corr_bools.get('blc_stripes'),
melt_snow]
inp = []
only_dark = False
nodb_with_dark = False
if not nodb:
only_dark = (calfile != "")
if calfile != "" and not corr_bools["only_offset"]:
nodb_with_dark = nodb
da_to_qm = dict()
for module_index, k_da in zip(modules, karabo_da):
qm = tools.module_index_to_qm(module_index)
da_to_qm[k_da] = qm
if k_da in retrieved_constants:
print(f"Constant for {k_da} already in calibration_metadata.yml, won't query again.")
continue
if qm in mapped_files and not mapped_files[qm].empty():
# TODO: make map_modules_from_folder just return list(s)
qm_files = [Path(mapped_files[qm].get()) for _ in range(mapped_files[qm].qsize())]
else:
continue
inp.append((qm_files, qm, k_da, module_index))
```
%% Cell type:code id: tags:
``` python
with multiprocessing.Pool(processes=nmods) as pool:
results = pool.starmap(retrieve_constants, inp)
```
%% Cell type:code id: tags:
``` python
for qm, md_dict, karabo_da, acq_rate, max_cells in results:
retrieved_constants[karabo_da] = md_dict
# check if it is requested not to retrieve any constants from the database
if nodb_with_dark:
print("No constants were retrieved as calibrated files will be used.")
else:
print("\nRetrieved constants for modules:",
', '.join([tools.module_index_to_qm(x) for x in modules]))
print(f"Operating conditions are:")
print(f"• Bias voltage: {bias_voltage}")
print(f"• Memory cells: {max_cells}")
print(f"• Acquisition rate: {acq_rate}")
print(f"• Gain mode: {gain_mode.name}")
print(f"• Gain setting: {gain_setting}")
print(f"• Integration time: {integration_time}")
print(f"• Photon Energy: {photon_energy}")
print("Constant metadata is saved under \"retrieved-constants\" in calibration_metadata.yml\n")
```
%% Cell type:code id: tags:
``` python
print("Using constants with creation times:")
timestamps = {}
for k_da, module_name in da_to_qm.items():
module_timestamps = timestamps[module_name] = {}
module_constants = retrieved_constants[k_da]
print(f"{module_name}:")
for cname, mdata in module_constants["constants"].items():
if hasattr(mdata["creation-time"], 'strftime'):
mdata["creation-time"] = mdata["creation-time"].strftime('%y-%m-%d %H:%M')
print(f'{cname:.<12s}', mdata["creation-time"])
for cname in ['Offset', 'SlopesPC', 'SlopesFF']:
if cname in module_constants["constants"]:
module_timestamps[cname] = module_constants["constants"][cname]["creation-time"]
else:
module_timestamps[cname] = "NA"
time_summary = retrieved_constants.setdefault("time-summary", {})
time_summary["SAll"] = timestamps
metadata.save()
```
......
......@@ -27,11 +27,9 @@
"karabo_id = \"SPB_IRDA_JF4M\" # karabo prefix of Jungfrau devices\n",
"karabo_da = ['JNGFR01', 'JNGFR02', 'JNGFR03', 'JNGFR04', 'JNGFR05', 'JNGFR06', 'JNGFR07', 'JNGFR08'] # data aggregators\n",
"receiver_template = \"JNGFR{:02d}\" # Detector receiver template for accessing raw data files. e.g. \"JNGFR{:02d}\"\n",
"path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # template to use for file name\n",
"instrument_source_template = '{}/DET/{}:daqOutput' # template for source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput'\n",
"ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id).\n",
"ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control)\n",
"karabo_id_control = \"\" # if control is on a different ID, set to empty string if it is the same a karabo-id\n",
"karabo_da_control = \"JNGFRCTRL00\" # file inset for control data\n",
"\n",
"# Parameters for calibration database.\n",
"use_dir_creation_date = True # use the creation data of the input dir for database queries\n",
......@@ -44,11 +42,12 @@
"plt_images = 100 # Number of images to plot after applying selected corrections.\n",
"limit_images = 0 # ONLY FOR TESTING. process only first N images, Use 0 to process all.\n",
"\n",
"\n",
"# Parameters for retrieving calibration constants\n",
"manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values\n",
"integration_time = 4.96 # integration time in us, will be overwritten by value in file\n",
"gain_setting = 0 # 0 for dynamic gain, 1 for dynamic HG0, will be overwritten by value in file\n",
"gain_mode = 0 # 0 for runs with dynamic gain setting, 1 for fixgain. It will be overwritten by value in file, if manual_slow_data is set to True.
",
"mem_cells = 0 # leave memory cells equal 0, as it is saved in control information starting 2019.\n",
"bias_voltage = 180 # will be overwritten by value in file\n",
"\n",
......@@ -196,12 +195,15 @@
"if not manual_slow_data:\n",
" integration_time = ctrl_data.get_integration_time()\n",
" bias_voltage = ctrl_data.get_bias_voltage()\n",
" gain_str, gain_setting = ctrl_data.get_gain_setting()\n",
" gain_setting = ctrl_data.get_gain_setting()\n",
" gain_mode = ctrl_data.get_gain_mode()\n",
"\n",
"print(f\"Integration time is {integration_time} us\")\n",
"print(f\"Gain setting is {gain_setting} ({gain_str})\")\n",
"print(f\"Gain setting is {gain_setting} (run settings: \"\n",
" f\"{ctrl_data.run_settings.value if ctrl_data.run_settings else ctrl_data.run_settings})\") # noqa\n",
"print(f\"Gain mode is {gain_mode}\")\n",
"print(f\"Bias voltage is {bias_voltage} V\")\n",
"print(f\"Number of memory cells is {memory_cells}\")"
"print(f\"Number of memory cells are {memory_cells}\")"
]
},
{
......@@ -217,12 +219,19 @@
"metadata": {},
"outputs": [],
"source": [
"condition = Conditions.Dark.jungfrau(\n",
" memory_cells=memory_cells,\n",
" bias_voltage=bias_voltage,\n",
" integration_time=integration_time,\n",
" gain_setting=gain_setting,\n",
")\n",
"condition = {\n",
" \"memory_cells\": memory_cells,\n",
" \"bias_voltage\": bias_voltage,\n",
" \"integration_time\": integration_time,\n",
" \"gain_setting\": gain_setting,\n",
"}\n",
"# TODO: Currently there are no gain constants for fixed gain mode.\n",
"# This should not be the case in the future.\n",
"gain_condition = Conditions.Dark.jungfrau(**condition)\n",
"\n",
"condition[\"gain_mode\"] = gain_mode\n",
"dark_condition = Conditions.Dark.jungfrau(**condition)\n",
"\n",
"\n",
"def get_constants_for_module(karabo_da: str):\n",
" \"\"\" Get calibration constants for given module of Jungfrau\n",
......@@ -240,25 +249,28 @@
" get_constant_from_db_and_time,\n",
" karabo_id=karabo_id,\n",
" karabo_da=karabo_da,\n",
" condition=condition,\n",
" cal_db_interface=cal_db_interface,\n",
" creation_time=creation_time,\n",
" timeout=cal_db_timeout,\n",
" print_once=False,\n",
" )\n",
" offset_map, when[\"Offset\"] = retrieval_function(\n",
" condition=dark_condition,\n",
" constant=Constants.jungfrau.Offset(),\n",
" empty_constant=np.zeros((1024, 512, 1, 3))\n",
" empty_constant=np.zeros((512, 1024, memory_cells, 3))\n",
" )\n",
" mask, when[\"BadPixelsDark\"] = retrieval_function(\n",
" condition=dark_condition,\n",
" constant=Constants.jungfrau.BadPixelsDark(),\n",
" empty_constant=np.zeros((1024, 512, 1, 3)),\n",
" empty_constant=np.zeros((512, 1024, memory_cells, 3), dtype=np.uint32),\n",
" )\n",
" mask_ff, when[\"BadPixelsFF\"] = retrieval_function(\n",
" condition=gain_condition,\n",
" constant=Constants.jungfrau.BadPixelsFF(),\n",
" empty_constant=None\n",
" )\n",
" gain_map, when[\"Gain\"] = retrieval_function(\n",
" condition=gain_condition,\n",
" constant=Constants.jungfrau.RelativeGain(),\n",
" empty_constant=None\n",
" )\n",
......@@ -267,20 +279,22 @@
" if mask_ff is not None:\n",
" mask |= np.moveaxis(mask_ff, 0, 1)\n",
"\n",
" # move from x,y,cell,gain to cell,x,y,gain\n",
" offset_map = np.squeeze(offset_map)\n",
" mask = np.squeeze(mask)\n",
"\n",
" if memory_cells > 1:\n",
" offset_map = np.moveaxis(np.moveaxis(offset_map, 0, 2), 0, 2)\n",
" mask = np.moveaxis(np.moveaxis(mask, 0, 2), 0, 2)\n",
" # move from x, y, cell, gain to cell, x, y, gain\n",
" offset_map = np.moveaxis(offset_map, [0, 1], [1, 2])\n",
" mask = np.moveaxis(mask, [0, 1], [1, 2])\n",
" else:\n",
" offset_map = np.squeeze(offset_map)\n",
" mask = np.squeeze(mask)\n",
"\n",
" if gain_map is not None:\n",
" if memory_cells > 1:\n",
" gain_map = np.moveaxis(np.moveaxis(gain_map, 0, 2), 0, 1)\n",
" gain_map = np.moveaxis(gain_map, [0, 2], [2, 0])\n",
" # add extra empty cell constant\n",
" b = np.ones(((1,)+gain_map.shape[1:]))\n",
" gain_map = np.concatenate((gain_map, b), axis=0)\n",
" else:\n",
" gain_map = np.squeeze(gain_map)\n",
" gain_map = np.moveaxis(gain_map, 1, 0)\n",
" gain_map = np.moveaxis(np.squeeze(gain_map), 1, 0)\n",
"\n",
" return offset_map, mask, gain_map, karabo_da, when\n",
"\n",
......@@ -309,28 +323,27 @@
"source": [
"# Correct a chunk of images for offset and gain\n",
"def correct_train(wid, index, d):\n",
" d = d.astype(np.float32) # [2, x, y]\n",
" d = d.astype(np.float32) # [cells, x, y]\n",
" g = gain[index]\n",
" m = memcells[index]\n",
"\n",
" # Jungfrau gains 0[00], 1[01], 3[11]\n",
" g[g==3] = 2\n",
"\n",
" if 0 <= index < plt_images:\n",
" r_data[index, ...] = d[0, ...]\n",
" if memory_cells == 1:\n",
" g_data[index, ...] = g\n",
" else:\n",
" g_data[index, ...] = g[0, ...]\n",
" # Select memory cells\n",
" \n",
" # TODO: This needs to be revisited.\n",
" # As this result in copying data to a new array on every train,\n",
" # even when there's the same pattern of memory cells on every train.\n",
" if memory_cells > 1:\n",
" m[m>16] = 0 # TODO: this is wrong and needs to be updated with burst mode.\n",
" # For an invalid image a memory cell of 255 is set.\n",
" # These images doesn't need to be processed.\n",
" offset_map_cell = offset_map[m, ...]\n",
" \"\"\"\n",
" Even though it is correct to assume that memory cells pattern \n",
" can be the same across all trains (for one correction run\n",
" taken with one acquisition), it is preferred to not assume\n",
" this to account for exceptions that can happen.\n",
" \"\"\"\n",
" m = memcells[index].copy()\n",
" # 255 is a cell value pointing to no cell image data (image of 0 pixels).\n",
" # Corresponding image will be corrected with constant of cell 0. To avoid values of 0.\n",
" # This line is depending on not storing the modified memory cells in the corrected data.\n",
" m[m==255] = 0\n",
"\n",
" offset_map_cell = offset_map[m, ...] # [16 + empty cell, x, y]\n",
" mask_cell = mask[m, ...]\n",
" else:\n",
" offset_map_cell = offset_map\n",
......@@ -338,6 +351,7 @@
"\n",
" # Offset correction\n",
" offset = np.choose(g, np.moveaxis(offset_map_cell, -1, 0))\n",
"\n",
" d -= offset\n",
"\n",
" # Gain correction\n",
......@@ -347,7 +361,6 @@
" else:\n",
" gain_map_cell = gain_map\n",
" cal = np.choose(g, np.moveaxis(gain_map_cell, -1, 0))\n",
"\n",
" d /= cal\n",
"\n",
" msk = np.choose(g, np.moveaxis(mask_cell, -1, 0))\n",
......@@ -379,9 +392,7 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"metadata": {},
"outputs": [],
"source": [
"fim_data = {}\n",
......@@ -432,14 +443,6 @@
" offset_map, mask, gain_map = constants[local_karabo_da]\n",
"\n",
" # Allocate shared arrays.\n",
"\n",
" # Shared arrays for plotting.\n",
" g_data = context.alloc(\n",
" shape=(plt_images, dshape[2], dshape[3]), dtype=np.uint8)\n",
" r_data = context.alloc(\n",
" shape=(plt_images, dshape[2], dshape[3]), dtype=np.float32)\n",
"\n",
" # shared arrays for corrected data.\n",
" data_corr = context.alloc(shape=dshape, dtype=np.float32)\n",
" mask_corr = context.alloc(shape=dshape, dtype=np.uint32)\n",
"\n",
......@@ -451,6 +454,8 @@
" gain = seq_dc[instrument_src_kda, \"data.gain\"].ndarray()\n",
" memcells = seq_dc[instrument_src_kda, \"data.memoryCell\"].ndarray()\n",
"\n",
" rim_data[local_karabo_da] = data[:plt_images, 0, ...].copy()\n",
"\n",
" context.map(correct_train, data)\n",
" step_timer.done_step(f'Correction time.')\n",
"\n",
......@@ -482,12 +487,13 @@
"\n",
" # Prepare plotting arrays\n",
" step_timer.start()\n",
"\n",
" # TODO: Print out which cell is being selected for plotting\n",
" fim_data[local_karabo_da] = data_corr[:plt_images, 0, ...].copy()\n",
" msk_data[local_karabo_da] = mask_corr[:plt_images, 0, ...].copy()\n",
" gim_data[local_karabo_da] = g_data\n",
" rim_data[local_karabo_da] = r_data\n",
" step_timer.done_step(f'Preparing plotting data of {plt_images} images:.')"
" gim_data[local_karabo_da] = gain[:plt_images, 0, ...].copy()\n",
"\n",
" step_timer.done_step(f'Preparing plotting data of {plt_images} images.')"
]
},
{
......@@ -552,7 +558,7 @@
" do_2d_plot(\n",
" h, (ex, ey),\n",
" \"Signal (ADU)\",\n",
" \"Gain Bit Value\",\n",
" \"Gain Bit Value (high gain=0[00], medium gain=1[01], low gain=3[11])\",\n",
" f\"Module {mod}\")"
]
},
......
%% Cell type:markdown id: tags:
# Jungfrau Dark Image Characterization #
Author: European XFEL Detector Group, Version: 2.0
Analyzes Jungfrau dark image data to deduce offset, noise and resulting bad pixel maps
%% Cell type:code id: tags:
``` python
in_folder = '/gpfs/exfel/exp/SPB/202130/p900204/raw/' # folder under which runs are located, required
out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove' # path to place reports at, required
run_high = 141 # run number for G0 dark run, required
run_med = 142 # run number for G1 dark run, required
run_low = 143 # run number for G2 dark run, required
# Parameters used to access raw data.
karabo_da = ['JNGFR01', 'JNGFR02','JNGFR03','JNGFR04', 'JNGFR05', 'JNGFR06','JNGFR07','JNGFR08'] # list of data aggregators, which corresponds to different JF modules
karabo_id = 'SPB_IRDA_JF4M' # karabo_id (detector identifier) prefix of Jungfrau detector to process.
karabo_id_control = '' # if control is on a different ID, set to empty string if it is the same a karabo-id
receiver_template = 'JNGFR{:02}' # inset for receiver devices
receiver_control_id = "CONTROL" # inset for control devices
path_template = 'RAW-R{:04d}-{}-S{{:05d}}.h5' # template to use for file name, double escape sequence number
instrument_source_template = '{}/DET/{}:daqOutput' # template for instrument source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput'
ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id).
karabo_da_control = 'JNGFRCTRL00' # file inset for control data
ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control)
# Parameters for calibration database and storing constants.
use_dir_creation_date = True # use dir creation date
cal_db_interface = 'tcp://max-exfl016:8016' # calibrate db interface to connect to
cal_db_timeout = 300000 # timeout on caldb requests
local_output = True # output constants locally
db_output = False # output constants to database
# Parameters affecting creating dark calibration constants.
badpixel_threshold_sigma = 5. # bad pixels defined by values outside n times this std from median
offset_abs_threshold_low = [1000, 10000, 10000] # absolute bad pixel threshold in terms of offset, lower values
offset_abs_threshold_high = [8000, 15000, 15000] # absolute bad pixel threshold in terms of offset, upper values
max_trains = 500 # maximum trains to process darks from.
max_trains = 0 # Maximum trains to process darks. Set to 0 to process all available train images.
min_trains = 1 # Minimum number of trains that should be available to process dark constants. Default 1.
manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values
time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us
# Parameters to be used for injecting dark calibration constants.
integration_time = 1000 # integration time in us, will be overwritten by value in file
gain_setting = 0 # 0 for dynamic, forceswitchg1, forceswitchg2, 1 for dynamichg0, fixedgain1, fixgain2. Will be overwritten by value in file
gain_setting = 0 # 0 for dynamic, forceswitchg1, forceswitchg2, 1 for dynamichg0, fixgain1, fixgain2. Will be overwritten by value in file
gain_mode = 0 # 1 if medium and low runs are fixgain1 and fixgain2, otherwise 0. It will be overwritten by value in file, if manual_slow_data
bias_voltage = 90 # sensor bias voltage in V, will be overwritten by value in file
memory_cells = 16 # number of memory cells
# Don't remove. myMDC sends this by default.
operation_mode = '' # Detector operation mode, optional
# TODO: this is used for only Warning check at AGIPD dark.
# Need to rethink if it makes sense to use it here as well.
operation_mode = 'ADAPTIVE_GAIN' # Detector operation mode, optional
# TODO: Remove
db_module = [""] # ID of module in calibration database. TODO: remove from calibration_configurations.
```
%% Cell type:code id: tags:
``` python
import glob
import os
import warnings
from pathlib import Path
warnings.filterwarnings('ignore')
import matplotlib
import matplotlib.pyplot as plt
import multiprocessing
import numpy as np
import pasha as psh
from IPython.display import Markdown, display
from extra_data import RunDirectory
matplotlib.use('agg')
%matplotlib inline
from XFELDetAna.plotting.heatmap import heatmapPlot
from XFELDetAna.plotting.histogram import histPlot
from cal_tools import jungfraulib, step_timing
from cal_tools.ana_tools import save_dict_to_hdf5
from cal_tools.enums import BadPixels
from cal_tools.enums import BadPixels, JungfrauSettings
from cal_tools.tools import (
get_dir_creation_date,
get_pdu_from_db,
get_random_db_interface,
get_report,
save_const_to_h5,
send_to_db,
)
from iCalibrationDB import Conditions, Constants
```
%% Cell type:code id: tags:
``` python
# Constants relevant for the analysis
run_nums = [run_high, run_med, run_low] # run number for G0/HG0, G1, G2
sensor_size = (1024, 512)
gains = [0, 1, 2]
fixed_settings = [
JungfrauSettings.FIX_GAIN_1.value, JungfrauSettings.FIX_GAIN_2.value] # noqa
dynamic_settings = [
JungfrauSettings.FORCE_SWITCH_HG1.value, JungfrauSettings.FORCE_SWITCH_HG2.value] # noqa
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run_high)
print(f"Using {creation_time} as creation time")
os.makedirs(out_folder, exist_ok=True)
cal_db_interface = get_random_db_interface(cal_db_interface)
print(f'Calibration database interface: {cal_db_interface}')
offset_abs_threshold = [offset_abs_threshold_low, offset_abs_threshold_high]
if karabo_id_control == "":
karabo_id_control = karabo_id
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = f"proposal:{proposal} runs:{run_high} {run_med} {run_low}"
report = get_report(out_folder)
step_timer = step_timing.StepTimer()
```
%% Cell type:markdown id: tags:
## Reading control data
%% Cell type:code id: tags:
``` python
step_timer.start()
gain_runs = dict()
noise_map = dict()
offset_map = dict()
gain_str = None
med_low_settings = []
ctrl_src = ctrl_source_template.format(karabo_id_control)
for gain, run_n in enumerate(run_nums):
run_dc = RunDirectory(f"{in_folder}/r{run_n:04d}/")
gain_runs[run_n] = [gain, run_dc]
ctrl_data = jungfraulib.JungfrauCtrl(run_dc, ctrl_src)
run_settings = ctrl_data.run_settings.value if ctrl_data.run_settings else ctrl_data.run_settings # noqa
# Read control data for the high gain run only.
if run_n == run_high:
ctrl_data = jungfraulib.JungfrauCtrl(run_dc, ctrl_src)
run_mcells, sc_start = ctrl_data.get_memory_cells()
if not manual_slow_data:
integration_time = ctrl_data.get_integration_time()
bias_voltage = ctrl_data.get_bias_voltage()
gain_str, gain_setting = ctrl_data.get_gain_setting()
print(f"Gain setting is {gain_setting} ({gain_str})")
print(f"Integration time is {integration_time} us")
print(f"Bias voltage is {bias_voltage} V")
gain_setting = ctrl_data.get_gain_setting()
print(f"Gain setting is {gain_setting} ({run_settings})")
print(f"Integration time is {integration_time} us")
print(f"Bias voltage is {bias_voltage} V")
if run_mcells == 1:
memory_cells = 1
print('Dark runs in single cell mode, '
f'storage cell start: {sc_start:02d}')
else:
memory_cells = 16
print('Dark runs in burst mode, '
f'storage cell start: {sc_start:02d}')
else:
gain_mode = ctrl_data.get_gain_mode()
med_low_settings.append(run_settings)
# A transperent workaround for old raw data with wrong/missing medium and low settings
if med_low_settings == [None, None]:
print("WARNING: run.settings is not stored in the data to read. "
f"Hence assuming gain_mode = {gain_mode} for adaptive old data.")
elif med_low_settings == ["dynamicgain", "forceswitchg1"]:
print(f"WARNING: run.settings for medium and low gain runs are wrong {med_low_settings}. "
f"This is an expected bug for old raw data. Setting gain_mode to {gain_mode}.")
# Validate that low_med_settings is not a mix of adaptive and fixed settings.
elif not (sorted(med_low_settings) in [fixed_settings, dynamic_settings]): # noqa
raise ValueError(
"Medium and low run settings are not as expected. "
f"Either {dynamic_settings} or {fixed_settings} are expected.\n"
f"Got {sorted(med_low_settings)} for both runs, respectively.")
print(f"Gain mode is {gain_mode} ({med_low_settings})")
step_timer.done_step(f'Reading control data.')
# Initialize noise_map and offset_map module arrays.
for mod in karabo_da:
noise_map[mod] = np.zeros(sensor_size+(memory_cells, 3))
offset_map[mod] = np.zeros(sensor_size+(memory_cells, 3))
```
%% Cell type:code id: tags:
``` python
# Use only high gain threshold for all gains in case of fixed_gain.
if gain_mode: # fixed_gain
offset_abs_threshold = [[offset_abs_threshold_low[0]]*3, [offset_abs_threshold_high[0]]*3]
else:
offset_abs_threshold = [offset_abs_threshold_low, offset_abs_threshold_high]
```
%% Cell type:code id: tags:
``` python
context = psh.context.ThreadContext(num_workers=multiprocessing.cpu_count())
```
%% Cell type:code id: tags:
``` python
"""
All jungfrau runs are taken through one acquisition, except for the forceswitch runs.
While taking non-fixed dark runs, a procedure of multiple acquisitions is used to switch the storage cell indices.
This is done for medium and low gain dark dynamic runs, only [forceswitchg1, forceswitchg2]:
Switching the cell indices in burst mode is a work around for hardware procedure
deficiency that produces wrong data for dark runs except for the first storage cell.
This is why multiple acquisitions are taken to switch the used storage cells and
acquire data through two cells for each of the 16 cells instead of acquiring darks through all 16 cells.
"""
print(f"Maximum trains to process is set to {max_trains}")
noise_map = dict()
offset_map = dict()
bad_pixels_map = dict()
for mod in karabo_da:
step_timer.start()
instrument_src = instrument_source_template.format(
karabo_id, receiver_template.format(int(mod[-2:])))
print(f"\n- Instrument data path for {mod} is {instrument_src}.")
offset_map[mod] = context.alloc(shape=(sensor_size+(memory_cells, 3)), fill=0)
noise_map[mod] = context.alloc(like=offset_map[mod], fill=0)
bad_pixels_map[mod] = context.alloc(like=offset_map[mod], dtype=np.uint32, fill=0)
for run_n, [gain, run_dc] in gain_runs.items():
def process_cell(worker_id, array_index, cell_number):
cell_slice_idx = acelltable == cell_number
thiscell = images[..., cell_slice_idx]
offset_map[mod][..., cell_number, gain] = np.mean(thiscell, axis=2)
noise_map[mod][..., cell_number, gain] = np.std(thiscell, axis=2)
# Check if there are wrong bad gain values.
# Indicate pixels with wrong gain value across all trains for each cell.
bad_pixels_map[mod][
np.average(gain_vals[..., cell_slice_idx], axis=2) != raw_g] |= BadPixels.WRONG_GAIN_VALUE.value
print(f"Gain stage {gain}, run {run_n}")
# load shape of data for memory cells, and detector size (imgs, cells, x, y)
n_imgs = run_dc[instrument_src, "data.adc"].shape[0]
# load number of data available, including trains with empty data.
n_trains = len(run_dc.train_ids)
instr_dc = run_dc.select(instrument_src, require_all=True)
if n_trains-n_imgs != 0:
empty_trains = n_trains - n_imgs
if empty_trains != 0:
print(
f"\tWARNING: {Path(run_dc.files[0].filename).name} has {n_trains-n_imgs} " # noqa
f"trains with empty data out of {n_trains} trains.")
f"\tWARNING: {mod} has {empty_trains} trains with empty data out of {n_trains} trains at " # noqa
f"{Path(run_dc[instrument_src, 'data.adc'].files[0].filename).parent}.")
if max_trains > 0:
n_imgs = min(n_imgs, max_trains)
print(f"Processing {n_imgs} images based on the given max_trains: {max_trains}.") # noqa
print(f"Processing {n_imgs} images.")
# Select only requested number of images to process darks.
instr_dc = instr_dc.select_trains(np.s_[:n_imgs])
if n_imgs < min_trains:
raise ValueError(
f"Less than {min_trains} trains are available in RAW data."
" Not enough data to process darks.")
images = np.transpose(
instr_dc[instrument_src, "data.adc"].ndarray(), (3, 2, 1, 0))
instr_dc[instrument_src, "data.adc"].ndarray().astype(np.float64), (3, 2, 1, 0))
acelltable = np.transpose(instr_dc[instrument_src, "data.memoryCell"].ndarray())
gain_vals = np.transpose(
instr_dc[instrument_src, "data.gain"].ndarray(), (3, 2, 1, 0))
# define gain value as saved in raw gain map
raw_g = 3 if gain == 2 else gain
if memory_cells == 1:
acelltable -= sc_start
if gain > 0 and memory_cells == 16:
# 255 is used as the detector sets 255 as well for
# cell images identified as bad.
# Only for dynamic medium and low gain runs [forceswitchg1, forceswitchg2] in burst mode.
if gain_mode == 0 and gain > 0 and memory_cells == 16:
# 255 similar to the receiver which uses the 255
# value to indicate a cell without an image.
# image shape for forceswitchg1 and forceswitchg2 = (1024, 512, 2, trains)
# compared to expected shape of (1024, 512, 16, trains) for high gain run.
acelltable[1:] = 255
for cell in range(memory_cells):
thiscell = images[..., acelltable == cell]
noise_map[mod][..., cell, gain] = np.std(thiscell, axis=2)
offset_map[mod][..., cell, gain] = np.mean(thiscell, axis=2)
# Calculate offset and noise maps
context.map(process_cell, range(memory_cells))
step_timer.done_step(f'Creating Offset and noise constants for a module.')
```
%% Cell type:markdown id: tags:
## Offset and Noise Maps ##
Below offset and noise maps for the high ($g_0$) gain stage are shown, alongside the distribution of these values. One expects block-like structures mapping to the ASICs of the detector
%% Cell type:code id: tags:
``` python
g_name = ['G0', 'G1', 'G2']
g_range = [(0, 8000), (8000, 16000), (8000, 16000)]
n_range = [(0., 50.), (0., 50.), (0., 50.)]
unit = '[ADCu]'
```
%% Cell type:code id: tags:
``` python
# TODO: Fix plots arrangment and speed for Jungfrau burst mode.
step_timer.start()
for mod in karabo_da:
for g_idx in gains:
for cell in range(0, memory_cells):
f_o0 = heatmapPlot(
np.swapaxes(offset_map[mod][..., cell, g_idx], 0, 1),
y_label="Row",
x_label="Column",
lut_label=unit,
aspect=1.,
vmin=g_range[g_idx][0],
vmax=g_range[g_idx][1],
title=f'Pedestal {g_name[g_idx]} - Cell {cell:02d} - Module {mod}')
fo0, ax_o0 = plt.subplots()
res_o0 = histPlot(
ax_o0, offset_map[mod][..., cell, g_idx],
bins=800,
range=g_range[g_idx],
facecolor='b',
histotype='stepfilled',
)
ax_o0.tick_params(axis='both',which='major',labelsize=15)
ax_o0.set_title(
f'Module pedestal distribution - Cell {cell:02d} - Module {mod}',
fontsize=15)
ax_o0.set_xlabel(f'Pedestal {g_name[g_idx]} {unit}',fontsize=15)
ax_o0.set_yscale('log')
f_n0 = heatmapPlot(
np.swapaxes(noise_map[mod][..., cell, g_idx], 0, 1),
y_label="Row",
x_label="Column",
lut_label= unit,
aspect=1.,
vmin=n_range[g_idx][0],
vmax=n_range[g_idx][1],
title=f"RMS noise {g_name[g_idx]} - Cell {cell:02d} - Module {mod}",
)
fn0, ax_n0 = plt.subplots()
res_n0 = histPlot(
ax_n0,
noise_map[mod][..., cell, g_idx],
bins=100,
range=n_range[g_idx],
facecolor='b',
histotype='stepfilled',
)
ax_n0.tick_params(axis='both', which='major', labelsize=15)
ax_n0.set_title(
f'Module noise distribution - Cell {cell:02d} - Module {mod}',
fontsize=15)
ax_n0.set_xlabel(
f'RMS noise {g_name[g_idx]} ' + unit, fontsize=15)
plt.show()
step_timer.done_step(f'Plotting offset and noise maps.')
```
%% Cell type:markdown id: tags:
## Bad Pixel Map ###
The bad pixel map is deduced by comparing offset and noise of each pixel ($v_i$) and each gain ($g$) against the median value for that gain stage:
$$
v_i > \mathrm{median}(v_{k,g}) + n \sigma_{v_{k,g}}
$$
or
$$
v_i < \mathrm{median}(v_{k,g}) - n \sigma_{v_{k,g}}
$$
Values are encoded in a 32 bit mask, where for the dark image deduced bad pixels the following non-zero entries are relevant:
%% Cell type:code id: tags:
``` python
def print_bp_entry(bp):
print("{:<30s} {:032b}".format(bp.name, bp.value))
print("{:<30s} {:032b} -> {}".format(bp.name, bp.value, int(bp.value)))
print_bp_entry(BadPixels.OFFSET_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.NOISE_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.OFFSET_NOISE_EVAL_ERROR)
print_bp_entry(BadPixels.WRONG_GAIN_VALUE)
def eval_bpidx(d):
mdn = np.nanmedian(d, axis=(0, 1))[None, None, :, :]
std = np.nanstd(d, axis=(0, 1))[None, None, :, :]
idx = (d > badpixel_threshold_sigma*std+mdn) | (d < (-badpixel_threshold_sigma)*std+mdn)
return idx
```
%% Cell type:code id: tags:
``` python
step_timer.start()
bad_pixels_map = dict()
for mod in karabo_da:
bad_pixels_map[mod] = np.zeros(noise_map[mod].shape, np.uint32)
display(Markdown(f"### Badpixels for module {mod}:"))
offset_abs_threshold = np.array(offset_abs_threshold)
bad_pixels_map[mod][eval_bpidx(offset_map[mod])] = BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][eval_bpidx(offset_map[mod])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(offset_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][eval_bpidx(noise_map[mod])] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(noise_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][(offset_map[mod] < offset_abs_threshold[0][None, None, None, :]) | (offset_map[mod] > offset_abs_threshold[1][None, None, None, :])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value # noqa
for g_idx in gains:
for cell in range(memory_cells):
bad_pixels = bad_pixels_map[mod][:, :, cell, g_idx]
fn_0 = heatmapPlot(
np.swapaxes(bad_pixels, 0, 1),
y_label="Row",
x_label="Column",
lut_label=f"Badpixels {g_name[g_idx]} [ADCu]",
aspect=1.,
vmin=0, title=f'G{g_idx} Bad pixel map - Cell {cell:02d} - Module {mod}')
vmin=0, vmax=5,
title=f'G{g_idx} Bad pixel map - Cell {cell:02d} - Module {mod}')
step_timer.done_step(f'Creating bad pixels constant and plotting it for a module.')
```
%% Cell type:code id: tags:
``` python
# set the operating condition
condition = Conditions.Dark.jungfrau(
memory_cells=memory_cells,
bias_voltage=bias_voltage,
integration_time=integration_time,
gain_setting=gain_setting)
gain_setting=gain_setting,
gain_mode=gain_mode,
)
db_modules = get_pdu_from_db(
karabo_id=karabo_id,
karabo_da=karabo_da,
constant=Constants.jungfrau.Offset(),
condition=condition,
cal_db_interface=cal_db_interface,
snapshot_at=creation_time)
```
%% Cell type:markdown id: tags:
## Inject and save calibration constants
%% Cell type:code id: tags:
``` python
step_timer.start()
for mod, db_mod in zip(karabo_da, db_modules):
constants = {
'Offset': np.moveaxis(offset_map[mod], 0, 1),
'Noise': np.moveaxis(noise_map[mod], 0, 1),
'BadPixelsDark': np.moveaxis(bad_pixels_map[mod], 0, 1),
}
md = None
for key, const_data in constants.items():
const = getattr(Constants.jungfrau, key)()
const.data = const_data
for parm in condition.parameters:
if parm.name == "Integration Time":
parm.lower_deviation = time_limits
parm.upper_deviation = time_limits
if db_output:
md = send_to_db(
db_module=db_mod,
karabo_id=karabo_id,
constant=const,
condition=condition,
file_loc=file_loc,
report_path=report,
cal_db_interface=cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout,
)
if local_output:
md = save_const_to_h5(
db_module=db_mod,
karabo_id=karabo_id,
constant=const,
condition=condition,
data=const.data,
file_loc=file_loc,
report=report,
creation_time=creation_time,
out_folder=out_folder,
)
print(f"Calibration constant {key} is stored locally at {out_folder}.\n")
print("Constants parameter conditions are:\n")
print(
f"• Bias voltage: {bias_voltage}\n"
f"• Memory cells: {memory_cells}\n"
f"• Integration time: {integration_time}\n"
f"• Gain setting: {gain_setting}\n"
f"• Gain mode: {gain_mode}\n"
f"• Creation time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n") # noqa
step_timer.done_step("Injecting constants.")
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
......
%% Cell type:markdown id: tags:
# Test Notebook - CLI
Author: Robert Rosca
Version: 0.1
Notebook for use with the unit and continuous integration tests.
%% Cell type:code id: tags:
```
``` python
root = "root/path/for/nb" # Variables included in the user notebook path must
# be included in the notebook for reasons
in_folder = "./" # input folder
out_folder = "./" # output folder
list_normal = [10] # parameterized list, range allowed
list_intellilist = [2345] # parameterized list with ranges, range allowed
concurrency_parameter = [1] # concurrency parameter, range allowed
number = 0 # parameterized number
```
%% Cell type:markdown id: tags:
Tests notebook execution by just creating an empty file in the output directory.
%% Cell type:code id: tags:
```
``` python
from pathlib import Path
```
%% Cell type:code id: tags:
```
``` python
in_folder = Path(in_folder)
(in_folder / "touch").touch()
```
%% Cell type:markdown id: tags:
Include some non-ascii characters to check that files are reliably processed as UTF-8. 🤖
%% Cell type:code id: tags:
```
``` python
print("🥼")
```
......
import sys
from distutils.command.build import build
from subprocess import check_output
import numpy
from Cython.Build import cythonize
from Cython.Distutils import build_ext
from setuptools.extension import Extension
from setuptools import find_packages, setup
from setuptools.extension import Extension
from src.xfel_calibrate.notebooks import notebooks
......@@ -40,43 +41,10 @@ for ctypes in notebooks.values():
data_files = list(filter(None, data_files)) # Get rid of `None` entries
setup(
name="European XFEL Offline Calibration",
version="1.0",
author="Steffen Hauf",
author_email="steffen.hauf@xfel.eu",
maintainer="EuXFEL Calibration Team",
url="",
description="",
long_description="",
long_description_content_type="text/markdown",
# TODO: find licence, assuming this will be open sourced eventually
license="(c) European XFEL GmbH 2018",
packages=find_packages("src"),
package_dir={"": "src"},
package_data={
"xfel_calibrate": [
"bin/*.sh",
"titlepage.tmpl",
"xfel.pdf",
]
+ data_files
},
entry_points={
"console_scripts": [
"xfel-calibrate = xfel_calibrate.calibrate:run",
],
},
cmdclass={
"build": PreInstallCommand,
"build_ext": build_ext,
},
ext_modules=cythonize(ext_modules, language_level=3),
install_requires=[
"iCalibrationDB @ git+ssh://git@git.xfel.eu:10022/detectors/cal_db_interactive.git@2.0.9", # noqa
"XFELDetectorAnalysis @ git+ssh://git@git.xfel.eu:10022/karaboDevices/pyDetLib.git@2.7.0", # noqa
install_requires = [
"Cython==0.29.21",
"Jinja2==2.11.2",
"markupsafe==2.0.1",
"astcheck==0.2.5",
"astsearch==0.2.0",
"cfelpyutils==1.0.1",
......@@ -98,6 +66,7 @@ setup(
"kafka-python==2.0.2",
"karabo_data==0.7.0",
"lxml==4.5.0",
"markupsafe==2.0.1",
"matplotlib==3.4.2",
"metadata_client==3.0.8",
"nbclient==0.5.1",
......@@ -120,7 +89,47 @@ setup(
"sphinx==1.8.5",
"tabulate==0.8.6",
"traitlets==4.3.3",
],
]
if "readthedocs.org" not in sys.executable:
install_requires += [
"iCalibrationDB @ git+ssh://git@git.xfel.eu:10022/detectors/cal_db_interactive.git@2.1.0", # noqa
"XFELDetectorAnalysis @ git+ssh://git@git.xfel.eu:10022/karaboDevices/pyDetLib.git@2.7.0", # noqa
]
setup(
name="European XFEL Offline Calibration",
version="1.0",
author="Steffen Hauf",
author_email="steffen.hauf@xfel.eu",
maintainer="EuXFEL Calibration Team",
url="",
description="",
long_description="",
long_description_content_type="text/markdown",
# TODO: find licence, assuming this will be open sourced eventually
license="(c) European XFEL GmbH 2018",
packages=find_packages("src"),
package_dir={"": "src"},
package_data={
"xfel_calibrate": [
"bin/*.sh",
"titlepage.tmpl",
"xfel.pdf",
]
+ data_files
},
entry_points={
"console_scripts": [
"xfel-calibrate = xfel_calibrate.calibrate:run",
],
},
cmdclass={
"build": PreInstallCommand,
"build_ext": build_ext,
},
ext_modules=cythonize(ext_modules, language_level=3),
install_requires=install_requires,
extras_require={
"docs": [
"nbsphinx",
......
......@@ -5,28 +5,28 @@ class BadPixels(IntFlag):
""" The European XFEL Bad Pixel Encoding
"""
OFFSET_OUT_OF_THRESHOLD = 0b000000000000000000001 # bit 1
NOISE_OUT_OF_THRESHOLD = 0b000000000000000000010 # bit 2
OFFSET_NOISE_EVAL_ERROR = 0b000000000000000000100 # bit 3
NO_DARK_DATA = 0b000000000000000001000 # bit 4
CI_GAIN_OF_OF_THRESHOLD = 0b000000000000000010000 # bit 5
CI_LINEAR_DEVIATION = 0b000000000000000100000 # bit 6
CI_EVAL_ERROR = 0b000000000000001000000 # bit 7
FF_GAIN_EVAL_ERROR = 0b000000000000010000000 # bit 8
FF_GAIN_DEVIATION = 0b000000000000100000000 # bit 9
FF_NO_ENTRIES = 0b000000000001000000000 # bit 10
CI2_EVAL_ERROR = 0b000000000010000000000 # bit 11
VALUE_IS_NAN = 0b000000000100000000000 # bit 12
VALUE_OUT_OF_RANGE = 0b000000001000000000000 # bit 13
GAIN_THRESHOLDING_ERROR = 0b000000010000000000000 # bit 14
DATA_STD_IS_ZERO = 0b000000100000000000000 # bit 15
ASIC_STD_BELOW_NOISE = 0b000001000000000000000 # bit 16
INTERPOLATED = 0b000010000000000000000 # bit 17
NOISY_ADC = 0b000100000000000000000 # bit 18
OVERSCAN = 0b001000000000000000000 # bit 19
NON_SENSITIVE = 0b010000000000000000000 # bit 20
NON_LIN_RESPONSE_REGION = 0b100000000000000000000 # bit 21
OFFSET_OUT_OF_THRESHOLD = 1 << 0 # bit 1
NOISE_OUT_OF_THRESHOLD = 1 << 1 # bit 2
OFFSET_NOISE_EVAL_ERROR = 1 << 2 # bit 3
NO_DARK_DATA = 1 << 3 # bit 4
CI_GAIN_OF_OF_THRESHOLD = 1 << 4 # bit 5
CI_LINEAR_DEVIATION = 1 << 5 # bit 6
CI_EVAL_ERROR = 1 << 6 # bit 7
FF_GAIN_EVAL_ERROR = 1 << 7 # bit 8
FF_GAIN_DEVIATION = 1 << 8 # bit 9
FF_NO_ENTRIES = 1 << 9 # bit 10
CI2_EVAL_ERROR = 1 << 10 # bit 11
VALUE_IS_NAN = 1 << 11 # bit 12
VALUE_OUT_OF_RANGE = 1 << 12 # bit 13
GAIN_THRESHOLDING_ERROR = 1 << 13 # bit 14
DATA_STD_IS_ZERO = 1 << 14 # bit 15
ASIC_STD_BELOW_NOISE = 1 << 15 # bit 16
INTERPOLATED = 1 << 16 # bit 17
NOISY_ADC = 1 << 17 # bit 18
OVERSCAN = 1 << 18 # bit 19
NON_SENSITIVE = 1 << 19 # bit 20
NON_LIN_RESPONSE_REGION = 1 << 20 # bit 21
WRONG_GAIN_VALUE = 1 << 21 # bit 22
class SnowResolution(Enum):
""" An Enum specifying how to resolve snowy pixels
......@@ -42,3 +42,13 @@ class AgipdGainMode(IntEnum):
FIXED_HIGH_GAIN = 1
FIXED_MEDIUM_GAIN = 2
FIXED_LOW_GAIN = 3
class JungfrauSettings(Enum):
""" Jungfrau run gain settings."""
DYNAMIC_GAIN = "dynamicgain"
DYNAMIC_GAIN_HG0 = "dynamichg0"
FIX_GAIN_1 = "fixgain1"
FIX_GAIN_2 = "fixgain2"
FORCE_SWITCH_HG1 = "forceswitchg1"
FORCE_SWITCH_HG2 = "forceswitchg2"
from typing import Tuple
from cal_tools.enums import JungfrauSettings
def _get_settings(run_dc, ctrl_src) -> str:
try:
return(JungfrauSettings(
run_dc.get_run_value(ctrl_src, "settings.value")))
except KeyError:
print(
"WARNING: \'settings.value\' key is not available at "
f"{run_dc.select(ctrl_src).files[0].filename},\n")
return
class JungfrauCtrl():
def __init__(
self,
......@@ -7,37 +23,40 @@ class JungfrauCtrl():
"""Read slow data from RUN source.
:param run_dir: EXtra-data RunDirectory DataCollection object.
:param karabo_id_control: Karabo ID for control h5file with slow data.
:param ctrl_src: Control source name for accessing run slow data.
"""
self.run_dc = run_dc
self.ctrl_src = ctrl_src
self.run_settings = _get_settings(run_dc, ctrl_src)
def get_memory_cells(self):
def get_memory_cells(self) -> Tuple[int, int]:
n_storage_cells = int(self.run_dc.get_run_value(
self.ctrl_src, "storageCells.value")) + 1
storage_cell_st = int(self.run_dc.get_run_value(
self.ctrl_src, "storageCellStart.value"))
return n_storage_cells, storage_cell_st
def get_bias_voltage(self):
def get_bias_voltage(self) -> int:
return(int(self.run_dc.get_run_value(
self.ctrl_src, "vHighVoltage.value")[0]))
def get_integration_time(self):
def get_integration_time(self) -> float:
return(float(self.run_dc.get_run_value(
self.ctrl_src, "exposureTime.value")) * 1e6)
def get_gain_setting(self):
try:
gain_str = str(self.run_dc.get_run_value(
self.ctrl_src, "settings.value"))
except KeyError:
print(
"gain_setting is not available for h5 ctrl path "
f"/RUN/{self.karabo_id_control}/DET/CONTROL/settings/value,\n"
"WARNING: Setting gain_setting to 0,"
"assuming that this is an old run.\n"
)
gain_str = "KeyError"
gain_setting = 1 if gain_str == 'dynamichg0' else 0 # 'dynamicgain'
return gain_str, gain_setting
\ No newline at end of file
def get_gain_setting(self) -> int:
if self.run_settings == JungfrauSettings.DYNAMIC_GAIN_HG0:
gain_setting = 1
else: # JungfrauSettings.DYNAMIC_GAIN
gain_setting = 0
if self.run_settings != JungfrauSettings.DYNAMIC_GAIN:
print(
"WARNING: Setting gain_setting to 0, "
"assuming that this is an old run.\n")
return gain_setting
def get_gain_mode(self) -> int:
if self.run_settings in [JungfrauSettings.FIX_GAIN_1, JungfrauSettings.FIX_GAIN_2]: # noqa
return 1
else:
return 0
......@@ -703,7 +703,12 @@ def send_to_db(db_module: str, karabo_id: str, constant, condition,
raise
except Exception as e:
# TODO: refactor to use custom exception class
if "has already been taken" in str(e):
# Refactor error message for re-injecting an
# identical CCV to the database.
if all(s in str(e) for s in [
"Error creating calibration_constant_version",
"has already been taken",
]):
print(
f"WARNING: {constant.name} for {db_module}"
" has already been injected with the same "
......@@ -789,6 +794,10 @@ class CalibrationMetadata(dict):
else:
print(f"Warning: existing {self._yaml_fn} is malformed, "
"will be overwritten")
else:
# TODO: update after resolving this discussion
# https://git.xfel.eu/detectors/pycalibration/-/merge_requests/624
self.save()
def save(self):
with self._yaml_fn.open("w") as fd:
......
......@@ -558,7 +558,8 @@ def make_par_table(parms, run_tmp_path: str):
def run(argv=None):
""" Run a calibration task with parser arguments """
# Ensure files are opened as UTF-8 by default, regardless of environment.
locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))
if "readthedocs.org" not in sys.executable:
locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))
if argv is None:
argv = sys.argv
......
......@@ -317,7 +317,8 @@ def extend_params(nb, extend_func_name, argv):
# Make a temporary parser that won't exit if it sees -h or --help
pre_parser = make_initial_parser(add_help=False)
add_args_from_nb(nb, pre_parser, no_required=True)
params = extract_parameters(nb, lang='python')
add_args_from_nb(params, pre_parser, no_required=True)
known, _ = pre_parser.parse_known_args(argv[1:])
args = deconsolize_args(vars(known))
......@@ -328,7 +329,7 @@ def extend_params(nb, extend_func_name, argv):
extension = f(*[args[p] for p in sig.parameters])
fcc = first_code_cell(nb)
fcc["source"] += "\n" + extension
fcc["source"] += "\n" + extension if extension else "\n"
@dataclass
......
......@@ -13,6 +13,7 @@ from webservice.webservice import ( # noqa: import not at top of file
parse_config,
run_action,
wait_on_transfer,
get_slurm_partition,
)
......@@ -151,3 +152,28 @@ async def test_run_action(mode, cmd, retcode, expected):
webservice.webservice.run_proc_async = mock_run_proc_async
ret = await run_action(job_db, cmd, mode, 1, 1, 1)
assert ret.lower().startswith(expected)
@pytest.mark.asyncio
@pytest.mark.parametrize(
'proposal_number, action, mock_proposal_status, expected_result',
[
(42, 'correct', 'A', 'upex-middle'), # active
('42', 'dark', 'R', 'upex-high'), # active
(404, 'correct', 'FR', 'exfel'), # finished and reviewed
(404, 'dark', 'CLO', 'exfel'), # closed
],
)
async def test_get_slurm_partition(proposal_number,
action,
mock_proposal_status,
expected_result):
response = mock.Mock()
response.status_code = 200
response.json = mock.Mock(return_value={'flg_beamtime_status': mock_proposal_status})
client = mock.Mock()
client.get_proposal_by_number_api = mock.Mock(
return_value=response)
ret = await get_slurm_partition(client, action, proposal_number)
assert ret == expected_result
......@@ -35,7 +35,7 @@ correct:
cmd : >-
python -m xfel_calibrate.calibrate {detector} CORRECT
--slurm-scheduling {sched_prio}
--slurm-partition upex-middle
--slurm-partition {partition}
--request-time {request_time}
--slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs}
--report-to /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp}
......@@ -50,7 +50,7 @@ dark:
python -m xfel_calibrate.calibrate {detector} DARK
--concurrency-par karabo_da
--slurm-scheduling {sched_prio}
--slurm-partition upex-high
--slurm-partition {partition}
--request-time {request_time}
--slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs}
--report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp}
......
......@@ -2,10 +2,10 @@ import argparse
import glob
import os
import sqlite3
from collections import OrderedDict
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
from pathlib import Path
from shutil import copyfileobj
from subprocess import check_output
from typing import Optional
......@@ -20,34 +20,11 @@ except:
from config import serve_overview as config
class LimitedSizeDict(OrderedDict):
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
OrderedDict.__init__(self, *args, **kwds)
self._check_size_limit()
def __setitem__(self, key, value):
OrderedDict.__setitem__(self, key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)
pdf_queue = LimitedSizeDict(size_limit=50)
# HTTPRequestHandler class
class RequestHandler(BaseHTTPRequestHandler):
conf_was_init = False
def init_config(self):
global cal_config
self.total_jobs_cmd = config["shell-commands"]["total-jobs"]
self.tail_log_cmd = config["shell-commands"]["tail-log"]
self.cat_log_cmd = config["shell-commands"]["cat-log"]
......@@ -57,98 +34,94 @@ class RequestHandler(BaseHTTPRequestHandler):
for template, tfile in config["templates"].items():
with open(tfile, "r") as tf:
self.templates[template] = tf.read()
global pdf_queue
self.pdf_queue = pdf_queue
self.conf_was_init = True
def serve_css(self):
"""Serve /serve_overview.css"""
self.send_response(200)
self.send_header('Content-type', 'text/css')
self.end_headers()
self.wfile.write(self.templates["css"].encode('utf-8'))
def serve_dark_overview(self):
# Send headers
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
host = config["server-config"]["host"]
port = config["server-config"]["port"]
reports = {}
for instrument, detectors in cal_config['dark'].items():
reports[instrument] = {}
for detector in detectors:
tmpl = f'/gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{detector}/dark/*pdf'
files = glob.glob(tmpl)
files.sort(key=os.path.getmtime, reverse=True)
file_info = []
for i, file in enumerate(files):
if (len(file_info) % 2) == 0:
bgcolor = 'EEEEEE'
else:
bgcolor = 'FFFFFF'
time = os.stat(file).st_mtime
d_time = datetime.fromtimestamp(time).replace(
tzinfo=timezone.utc)
s_time = d_time.strftime('%y-%m-%d %H:%M')
file_info.append([file, s_time, bgcolor])
reports[instrument][detector] = file_info
tmpl = Template(self.templates["dark-overview"])
message = tmpl.render(reports=reports, host=host, port=port)
self.wfile.write(bytes(message, "utf8"))
def serve_file_from_gpfs(self):
"""Serve a file from a path starting with /gpfs"""
if self.path.endswith(".html"):
mimetype = 'text/html'
elif self.path.endswith(".jpg"):
mimetype = 'image/jpg'
elif self.path.endswith(".gif"):
mimetype = 'image/gif'
elif self.path.endswith(".png"):
mimetype = 'image/png'
elif self.path.endswith(".pdf"):
mimetype = 'application/pdf'
elif self.path.endswith(".js"):
mimetype = 'application/javascript'
elif self.path.endswith(".css"):
mimetype = 'text/css'
else:
return self.send_error(404)
if os.path.isfile(self.path):
self.send_response(200)
self.send_header('Content-Length', str(os.stat(self.path).st_size))
self.send_header('Content-type', mimetype)
self.end_headers()
with open(self.path, "rb") as f:
copyfileobj(f, self.wfile)
else:
self.send_error(404)
def do_GET(self):
if not self.conf_was_init:
self.init_config()
# Send response status code
self.send_response(200)
if "/serve_overview.css" in self.path:
self.send_header('Content-type', 'text/css')
self.end_headers()
for s in self.templates["css"].split("\n"):
self.wfile.write(s.encode())
return
if "pdf?" in self.path:
puuid = self.path.split("?")[1]
fpath = self.pdf_queue.get(puuid, None)
if fpath is None:
return
self.send_header('Content-type', 'application/pdf')
self.end_headers()
with open(fpath, "rb") as f:
self.wfile.write(f.read())
return
return self.serve_css()
if "dark?" in self.path:
# Send headers
self.send_header('Content-type', 'text/html')
self.end_headers()
host = config["server-config"]["host"]
port = config["server-config"]["port"]
reports = {}
for instrument, detectors in cal_config['dark'].items():
reports[instrument] = {}
for detector in detectors:
tmpl = f'/gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{detector}/dark/*pdf'
files = glob.glob(tmpl)
files.sort(key=os.path.getmtime, reverse=True)
file_info = []
for i, file in enumerate(files):
if (len(file_info) % 2) == 0:
bgcolor = 'EEEEEE'
else:
bgcolor = 'FFFFFF'
time = os.stat(file).st_mtime
d_time = datetime.fromtimestamp(time).replace(
tzinfo=timezone.utc)
s_time = d_time.strftime('%y-%m-%d %H:%M')
file_info.append([file, s_time, bgcolor])
reports[instrument][detector] = file_info
tmpl = Template(self.templates["dark-overview"])
message = tmpl.render(reports=reports, host=host, port=port)
self.wfile.write(bytes(message, "utf8"))
return
return self.serve_dark_overview()
if "/gpfs" in self.path:
sendReply = False
if self.path.endswith(".html"):
mimetype = 'text/html'
sendReply = True
if self.path.endswith(".jpg"):
mimetype = 'image/jpg'
sendReply = True
if self.path.endswith(".gif"):
mimetype = 'image/gif'
sendReply = True
if self.path.endswith(".png"):
mimetype = 'image/png'
sendReply = True
if self.path.endswith(".pdf"):
mimetype = 'application/pdf'
sendReply = True
if self.path.endswith(".js"):
mimetype = 'application/javascript'
sendReply = True
if self.path.endswith(".css"):
mimetype = 'text/css'
sendReply = True
if sendReply == True and os.path.isfile(self.path):
with open(self.path, "rb") as f:
self.send_header('Content-type', mimetype)
self.end_headers()
self.wfile.write(f.read())
return
return self.serve_file_from_gpfs()
# Send response status code
self.send_response(200)
# Send headers
self.send_header('Content-type', 'text/html')
......
......@@ -19,8 +19,9 @@ from datetime import datetime, timezone
from pathlib import Path
from subprocess import PIPE, run
from threading import Thread
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple, Union
import requests
import yaml
import zmq
import zmq.asyncio
......@@ -627,6 +628,46 @@ def check_files(in_folder: str,
files_exists = False
return files_exists
async def get_slurm_partition(mdc: MetadataClient,
action: str,
proposal_number: Union[int, str]) -> str:
"""Check MyMDC for the proposal status and select the appropriate slurm
partition.
The partition is either upex-high (for darks) or upex-middle (for
corrections) if the proposal is 'R'eady or 'A'ctive.
In other cases, the jobs default to the exfel partition.
:param mdc: an authenticated MyMDC client
:param action: the type of action ('correct' or 'dark')
:param proposal_number: the proposal number
:return: 'exfel' on closed proposals,
'upex-high' if dark request on active proposals,
'upex-middle' if correct request on active proposals.
"""
# See
# https://git.xfel.eu/ITDM/metadata_catalog/-/blob/develop/app/models/proposal.rb
# for possible proposals states.
loop = get_event_loop()
response = await shield(loop.run_in_executor(None,
mdc.get_proposal_by_number_api,
proposal_number))
if response.status_code != 200:
logging.error(f'Failed to check MDC for proposal "{proposal_number}" '
'status. ASSUMING CLOSED')
logging.error(Errors.MDC_RESPONSE.format(response))
partition = 'exfel'
status = response.json().get('flg_beamtime_status', 'whoopsie')
if status in ['R', 'A']:
partition = 'upex-high' if action == 'dark' else 'upex-middle'
logging.debug(f"Using {partition} for {proposal_number} because {status}")
return partition
async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str,
out_path: str, report_path: str):
......@@ -698,6 +739,42 @@ async def update_mdc_status(mdc: MetadataClient, action: str,
logging.error(Errors.MDC_RESPONSE.format(response))
def _orca_passthrough(
proposal_number = None,
runs = None,
action = None,
route = "execute",
**kwargs,
):
""" Passes through requests received by the webservice to Orca for use during
the transition to the new webservice
Due to network restrictions on Maxwell, this sends post requests to localhost on
port 42751 (a random port number), so either Orca should be running locally or a
ssh tunnel should be set up with `ssh -L 42751:localhost:42751 TARGET-SERVER -N &`
"""
try:
base_url = "http://localhost"
port = "42751"
args = []
args.append(f"action={action}") if action else None
args.append(f"runs={','.join(str(r) for r in runs)}") if runs else None
args.extend([f"{k}={v}" for k, v in kwargs.items()])
url = f"{base_url}:{port}/{route}/{proposal_number}?{'&'.join(filter(None, args))}"
except Exception as e:
logging.warning("error building orca passthrough request", exc_info=True)
return None
try:
requests.post(url)
except Exception as e:
logging.error(f"orca post request error for url {url}", exc_info=True)
class ActionsServer:
def __init__(self, config, mode):
self.config = config
......@@ -821,6 +898,12 @@ class ActionsServer:
proposal = self._normalise_proposal_num(proposal)
pconf_full = self.load_proposal_config(cycle, proposal)
_orca_passthrough(
proposal_number=proposal,
runs=[runnr],
route="execute",
)
data_conf = pconf_full['data-mapping']
if instrument in pconf_full['correct']:
pconf = pconf_full['correct'][instrument]
......@@ -942,6 +1025,14 @@ class ActionsServer:
proposal = self._normalise_proposal_num(proposal)
_orca_passthrough(
proposal_number=proposal,
runs=runs,
action="dark",
route="execute",
karabo_id=karabo_id,
)
pconf_full = self.load_proposal_config(cycle, proposal)
data_conf = pconf_full['data-mapping']
......@@ -1108,6 +1199,9 @@ class ActionsServer:
) -> Tuple[str, List[str]]:
report = []
ret = []
partition = await get_slurm_partition(self.mdc, action, proposal)
# run xfel_calibrate
for karabo_id, dconfig in detectors.items():
detector = dconfig['detector-type']
......@@ -1115,6 +1209,7 @@ class ActionsServer:
cmd = self.config[action]['cmd'].format(
detector=detector,
sched_prio=str(self.config[action]['sched-prio']),
partition=partition,
action=action, instrument=instrument,
cycle=cycle, proposal=proposal,
runs="_".join([f"r{r}" for r in run_nrs]),
......