Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • calibration/pycalibration
1 result
Show changes
Commits on Source (35)
Showing with 586 additions and 289 deletions
......@@ -57,10 +57,14 @@ automated_test:
OUTPUT: $CI_MERGE_REQUEST_SOURCE_BRANCH_NAME
DETECTORS: all
CALIBRATION: all
PYENV_VERSION: "3.8"
CAL_CAL_TOOLS_CALCAT: "{base-api-url='http://exflcalproxy:8080/api', use-oauth2=false}"
stage: automated_test
only: [merge_requests]
when: manual
allow_failure: false
tags:
- integration
<<: *before_script
script:
- export LANG=C # Hopefully detect anything relying on locale
......
......@@ -42,7 +42,7 @@ A quick setup would be:
1. ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate``
2. ``git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration`` - clone the offline calibration package from EuXFEL GitLab
3. ``pyenv shell 3.8.11`` - load required version of python
3. ``pyenv shell 3.8.18`` - load required version of python
4. ``python3 -m venv .venv`` - create the virtual environment
5. ``source .venv/bin/activate`` - activate the virtual environment
6. ``python3 -m pip install --upgrade pip`` - upgrade version of pip
......@@ -55,7 +55,7 @@ Copy/paste script:
source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate
git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git
cd pycalibration
pyenv shell 3.8.11
pyenv shell 3.8.18
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install --upgrade pip
......@@ -73,7 +73,7 @@ installation method instead.
1. ``source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate``
2. ``git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration`` - clone the offline calibration package from EuXFEL GitLab
3. ``pyenv shell 3.8.11`` - load required version of python
3. ``pyenv shell 3.8.18`` - load required version of python
4. ``pip install .`` - install the pycalibration package (add ``-e`` flag for editable development installation)
5. ``export PATH=$HOME/.local/bin:$PATH`` - make sure that the home directory is in the PATH environment variable
......@@ -83,7 +83,7 @@ Copy/paste script:
source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate
git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git
pyenv shell 3.8.11
pyenv shell 3.8.18
cd pycalibration
pip install --user . # `-e` flag for editable install, e.g. `pip install -e .`
export PATH=$HOME/.local/bin:$PATH
......
# Automated deployment
Ansible playbook for automated deployment on the online cluster
## Setup Ansible
```bash
$ git clone https://github.com/ansible/ansible.git
$ cd ansible
$ git checkout v2.8.1
```
## Activate the environment
```bash
$ source ansible/hacking/env-setup
```
Alternatively you can use the provided environment on `exflgateway`
```bash
$ source /opt/karabo/ansible/hacking/env-setup
```
## Python dependencies
Ensure `yaml` and `jinja2` packages are installed in your Python environment. If not:
```bash
pip install pyYaml jinja2
```
## Install pycalibration
```bash
./install.yml GROUP_NAME
```
[defaults]
forks = 30
inventory = ./hosts
stdout_callback = debug
[ssh_connection]
pipelining = True
ssh_args = -o ControlMaster=auto -o ControlPersist=1200
\ No newline at end of file
---
ansible_user: "xcal"
install_dir: "/scratch/xcal"
pycal_dir: "{{ install_dir }}/pycalibration"
pycal_version: "3.14.3"
[HED]
sa2-onc-0[1:6]
sa2-onc-hed
\ No newline at end of file
#!play
- hosts: all
tasks:
- name: Find all backup directory
find:
paths: "{{ install_dir }}"
patterns: "^pycalibration_backup_\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}$"
file_type: directory
use_regex: True
register: backup_dirs
- name: Display matching directories
debug:
msg: "Matching directories: {{ backup_dirs.files | map(attribute='path') | list }}"
- name: Sort backup directories
set_fact:
sorted_backup_dirs: "{{ backup_dirs.files | sort(attribute='mtime', reverse=true) | list }}"
- name: Delete all but the most recent backup
file:
path: "{{ item.path }}"
state: absent
loop: "{{ sorted_backup_dirs[1:] }}"
when: sorted_backup_dirs | length > 1
register: deleted_dirs
- name: Information about backup retention
debug:
msg: |
Kept directory: {{ sorted_backup_dirs[0].path if sorted_backup_dirs else 'None' }}
Deleted directories ({{ deleted_dirs.results | length }}):
{% for dir in deleted_dirs.results %}
- {{ dir.path }}
{% endfor %}
- name: Get current date
command: date +%Y-%m-%dT%H:%M:%S
register: current_date
changed_when: false
- name: Set backup directory name
set_fact:
backup_dir_name: "pycalibration_backup_{{ current_date.stdout }}"
- name: Create backup directory
file:
state: directory
path: "{{ install_dir }}/{{ backup_dir_name }}"
mode: 0755
register: backup_dir
- name: Check for previous installation
stat:
path: "{{ pycal_dir }}"
register: pycal_dir_stat
- name: Backup previous installation
command: "mv {{ pycal_dir }} {{ backup_dir.path }}"
when: pycal_dir_stat.stat.exists
- name: Clone pycalibration
shell: git clone ssh://git@git.xfel.eu:10022/calibration/pycalibration.git -b {{ pycal_version }} {{ pycal_dir }}
- name: create venv
shell: /gpfs/exfel/sw/calsoft/.pyenv/versions/3.8.18/bin/python3 -m venv --prompt pycal-{{ pycal_version }} {{ pycal_dir }}/.venv
- name: Install Pycalibration
shell: all_proxy="http://exflproxy01.desy.de:3128" {{ pycal_dir }}/.venv/bin/pip install -e {{ pycal_dir }}
#!/bin/bash
if [[ $# -lt 2 ]]
then
echo "USAGE: $1 GROUP_NAME"
exit 1
fi
PLAYBOOK="$1"
shift
exec ansible-playbook "$PLAYBOOK" -l "$@"
......@@ -16,7 +16,7 @@ echo "job ID: ${SLURM_JOB_ID:-none}"
# set-up enviroment
source /etc/profile.d/modules.sh
module load texlive/2019
module load texlive/2022
# make sure we use agg backend
export MPLBACKEND=AGG
......
......@@ -31,7 +31,7 @@ A quick setup would be:
1. `source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate`
2. `git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git && cd pycalibration` -
clone the offline calibration package from EuXFEL GitLab
3. `pyenv shell 3.8.11` - load required version of python
3. `pyenv shell 3.8.18` - load required version of python
4. `python3 -m venv .venv` - create the virtual environment
5. `source .venv/bin/activate` - activate the virtual environment
6. `python3 -m pip install --upgrade pip` - upgrade version of pip
......@@ -44,7 +44,7 @@ Copy/paste script:
source /gpfs/exfel/sw/calsoft/.pyenv/bin/activate
git clone ssh://git@git.xfel.eu:10022/detectors/pycalibration.git
cd pycalibration
pyenv shell 3.8.11
pyenv shell 3.8.18
python3 -m venv .venv
source .venv/bin/activate
python3 -m pip install --upgrade pip
......
%% Cell type:markdown id: tags:
# Jungfrau Dark Image Characterization #
Author: European XFEL Detector Group, Version: 2.0
Analyzes Jungfrau dark image data to deduce offset, noise and resulting bad pixel maps
%% Cell type:code id: tags:
``` python
in_folder = '/gpfs/exfel/exp/SPB/202130/p900204/raw/' # folder under which runs are located, required
out_folder = '/gpfs/exfel/data/scratch/ahmedk/test/remove' # path to place reports at, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate
run_high = 141 # run number for G0 dark run, required
run_med = 142 # run number for G1 dark run, required
run_low = 143 # run number for G2 dark run, required
# Parameters used to access raw data.
karabo_da = ['JNGFR01', 'JNGFR02','JNGFR03','JNGFR04', 'JNGFR05', 'JNGFR06','JNGFR07','JNGFR08'] # list of data aggregators, which corresponds to different JF modules
karabo_id = 'SPB_IRDA_JF4M' # karabo_id (detector identifier) prefix of Jungfrau detector to process.
karabo_id_control = '' # if control is on a different ID, set to empty string if it is the same a karabo-id
receiver_template = 'JNGFR{:02}' # inset for receiver devices
instrument_source_template = '{}/DET/{}:daqOutput' # template for instrument source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput'
ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control)
# Parameters for calibration database and storing constants.
use_dir_creation_date = True # use dir creation date
cal_db_interface = 'tcp://max-exfl-cal001:8016#8045' # calibrate db interface to connect to
cal_db_timeout = 300000 # timeout on caldb requests
local_output = True # output constants locally
db_output = False # output constants to database
# Parameters affecting creating dark calibration constants.
badpixel_threshold_sigma = 5. # bad pixels defined by values outside n times this std from median
offset_abs_threshold_low = [1000, 10000, 10000] # absolute bad pixel threshold in terms of offset, lower values
offset_abs_threshold_high = [8000, 15000, 15000] # absolute bad pixel threshold in terms of offset, upper values
max_trains = 1000 # Maximum trains to process darks. Set to 0 to process all available train images. 1000 trains is enough resolution to create the dark constants
min_trains = 100 # Minimum number of trains to process dark constants. Raise a warning if the run has fewer trains.
manual_slow_data = False # if true, use manually entered bias_voltage and integration_time values
time_limits = 0.025 # to find calibration constants later on, the integration time is allowed to vary by 0.5 us
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
# Parameters to be used for injecting dark calibration constants.
integration_time = -1 # Integration time in us. Set to -1 to overwrite by value in file.
gain_setting = -1 # 0 for dynamic, forceswitchg1, forceswitchg2, 1 for dynamichg0, fixgain1, fixgain2. Set to overwrite by value in file.
gain_mode = -1 # 1 if medium and low runs are fixgain1 and fixgain2, otherwise 0. Set to -1 to overwrite by value in file.
bias_voltage = -1 # sensor bias voltage in V, will be overwritten by value in file
memory_cells = -1 # Number of memory cells.
# Parameters used for plotting
detailed_report = False
# TODO: this is used for only Warning check at AGIPD dark.
# Need to rethink if it makes sense to use it here as well.
operation_mode = 'ADAPTIVE_GAIN' # Detector operation mode, optional
```
%% Cell type:code id: tags:
``` python
import os
import warnings
from datetime import timedelta
from logging import warning
warnings.filterwarnings('ignore')
from pathlib import Path
from tempfile import NamedTemporaryFile
import matplotlib
import matplotlib.pyplot as plt
import multiprocessing
import numpy as np
import pasha as psh
import yaml
from IPython.display import Markdown, display
from extra_data import RunDirectory
matplotlib.use('agg')
%matplotlib inline
from XFELDetAna.plotting.heatmap import heatmapPlot
from XFELDetAna.plotting.histogram import histPlot
from cal_tools import step_timing
from cal_tools.jungfrau import jungfraulib
from cal_tools.calcat_interface2 import (
CalibrationData,
JUNGFRAUConditions,
)
from cal_tools.constants import inject_ccv, write_ccv
from cal_tools.enums import BadPixels, JungfrauGainMode
from cal_tools.tools import (
get_dir_creation_date,
get_pdu_from_db,
get_random_db_interface,
get_report,
save_const_to_h5,
send_to_db,
from cal_tools.jungfrau import jungfraulib
from cal_tools.restful_config import (
calibration_client,
extra_calibration_client,
)
from iCalibrationDB import Conditions, Constants
from cal_tools.tools import calcat_creation_time
matplotlib.use('agg')
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
# Constants relevant for the analysis
run_nums = [run_high, run_med, run_low] # run number for G0/HG0, G1, G2
sensor_size = (1024, 512)
gains = [0, 1, 2]
fixed_settings = [
JungfrauGainMode.FIX_GAIN_1.value, JungfrauGainMode.FIX_GAIN_2.value]
dynamic_settings = [
JungfrauGainMode.FORCE_SWITCH_HG1.value, JungfrauGainMode.FORCE_SWITCH_HG2.value]
old_fixed_settings = ["fixgain1", "fixgain2"]
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run_high)
print(f"Using {creation_time} as creation time")
creation_time = calcat_creation_time(in_folder, run_high, creation_time)
print(f"Using {creation_time} as creation time")
os.makedirs(out_folder, exist_ok=True)
cal_db_interface = get_random_db_interface(cal_db_interface)
print(f'Calibration database interface: {cal_db_interface}')
if karabo_id_control == "":
karabo_id_control = karabo_id
```
%% Cell type:code id: tags:
``` python
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
file_loc = f"proposal:{proposal} runs:{run_high} {run_med} {run_low}"
report = get_report(metadata_folder)
step_timer = step_timing.StepTimer()
```
%% Cell type:markdown id: tags:
## Reading control data
%% Cell type:code id: tags:
``` python
step_timer.start()
gain_runs = dict()
med_low_settings = []
ctrl_src = ctrl_source_template.format(karabo_id_control)
run_nums = jungfraulib.sort_runs_by_gain(
raw_folder=in_folder,
runs=run_nums,
ctrl_src=ctrl_src,
)
_gain_mode = None
for gain, run_n in enumerate(run_nums):
run_dc = RunDirectory(f"{in_folder}/r{run_n:04d}/")
gain_runs[run_n] = [gain, run_dc]
ctrl_data = jungfraulib.JungfrauCtrl(run_dc, ctrl_src)
# Read control data for the high gain run only.
if gain == 0:
run_mcells, sc_start = ctrl_data.get_memory_cells()
if integration_time < 0:
integration_time = ctrl_data.get_integration_time()
print(f"Integration time is {integration_time} us.")
else:
print(f"Integration time is manually set to {integration_time} us.")
if bias_voltage < 0:
bias_voltage = ctrl_data.get_bias_voltage()
print(f"Bias voltage is {bias_voltage} V.")
else:
print(f"Bias voltage is manually set to {bias_voltage} V.")
if gain_setting < 0:
gain_setting = ctrl_data.get_gain_setting()
print(f"Gain setting is {gain_setting} ({ctrl_data.run_settings})")
else:
print(f"Gain setting is manually set to {gain_setting}.")
if run_mcells == 1:
memory_cells = 1
print('Dark runs in single cell mode, '
f'storage cell start: {sc_start:02d}')
else:
memory_cells = 16
print('Dark runs in burst mode, '
f'storage cell start: {sc_start:02d}')
else: # medium and low gain
_gain_mode = ctrl_data.get_gain_mode()
med_low_settings.append(ctrl_data.run_mode)
# TODO: consider updating this cell into something similar to agipdlib.AgipdCtrlsRuns()
if gain_mode < 0:
gain_mode = _gain_mode
print(f"Gain mode is {gain_mode} ({med_low_settings})")
else:
print(f"Gain mode is manually set to {gain_mode}.")
step_timer.done_step(f'Reading control data.')
```
%% Cell type:code id: tags:
``` python
step_timer.start()
# set the operating condition
condition = Conditions.Dark.jungfrau(
conditions = JUNGFRAUConditions(
sensor_bias_voltage=bias_voltage,
memory_cells=memory_cells,
bias_voltage=bias_voltage,
integration_time=integration_time,
gain_setting=gain_setting,
gain_mode=gain_mode,
)
# This is useful to setup caldb_root from the loaded config.
# e.g. switch to test_calcat
extra_calibration_client()
cc = calibration_client()
# TODO: Shouldn't we have something like this in CalibrationDataAPI?
# I haven't added it as I want to confirm first if CalibrationDataAPI vision aligns with this.
# This (after DynamicFF) already uses CalibrationData API for injection (conditions)
resp = cc.get_all_phy_det_units_from_detector({
"detector_identifier": karabo_id,
"pdu_snapshot_at": creation_time,
})
db_modules = get_pdu_from_db(
karabo_id=karabo_id,
karabo_da=karabo_da,
constant=Constants.jungfrau.Offset(),
condition=condition,
cal_db_interface=cal_db_interface,
snapshot_at=creation_time)
step_timer.done_step('Set conditions and get PDU names from CalCat.')
```
%% Cell type:code id: tags:
``` python
# Start retrieving existing constants for comparison
step_timer.start()
mod_x_const = [(mod, const) for const in ["Offset", "Noise", "BadPixelsDark"] for mod in karabo_da]
from cal_tools.tools import get_from_db
from datetime import timedelta
def retrieve_old_constant(mod, const):
dconst = getattr(Constants.jungfrau, const)()
data, mdata = get_from_db(
karabo_id=karabo_id,
karabo_da=mod,
constant=dconst,
condition=condition,
empty_constant=None,
cal_db_interface=cal_db_interface,
creation_time=creation_time-timedelta(seconds=60) if creation_time else None,
strategy="pdu_prior_in_time",
verbosity=1,
timeout=cal_db_timeout
)
if mdata is None or data is None:
timestamp = "Not found"
filepath = None
h5path = None
else:
timestamp = mdata.calibration_constant_version.begin_at.isoformat()
filepath = os.path.join(
mdata.calibration_constant_version.hdf5path,
mdata.calibration_constant_version.filename
)
h5path = mdata.calibration_constant_version.h5path
return data, timestamp, filepath, h5path
if not resp["success"]:
raise ValueError("Failed to retrieve PDUs")
else:
da_to_pdu = dict()
pdu_to_uuid = dict()
for pdu in resp['data']:
if pdu['karabo_da'] in karabo_da: # exclude unselected das
da_to_pdu[pdu['karabo_da']] = pdu['physical_name']
pdu_to_uuid[pdu['physical_name']] = pdu['uuid']
first_pdu = resp['data'][0]
detector_info = first_pdu['detector']
detector_info['detector_type'] = first_pdu['detector_type']['name']
old_retrieval_pool = multiprocessing.Pool()
old_retrieval_res = old_retrieval_pool.starmap_async(
retrieve_old_constant, mod_x_const
)
old_retrieval_pool.close()
step_timer.done_step('Retrieved old dark constants for comparison.')
step_timer.done_step('Set conditions and get PDU names from CalCat.')
```
%% Cell type:code id: tags:
``` python
# Use only high gain threshold for all gains in case of fixed_gain.
if gain_mode: # fixed_gain
offset_abs_threshold = [[offset_abs_threshold_low[0]]*3, [offset_abs_threshold_high[0]]*3]
else:
offset_abs_threshold = [offset_abs_threshold_low, offset_abs_threshold_high]
```
%% Cell type:code id: tags:
``` python
context = psh.context.ThreadContext(num_workers=memory_cells)
```
%% Cell type:code id: tags:
``` python
"""
All jungfrau runs are taken through one acquisition, except for the forceswitch runs.
While taking non-fixed dark runs, a procedure of multiple acquisitions is used to switch the storage cell indices.
This is done for medium and low gain dark dynamic runs, only [forceswitchg1, forceswitchg2]:
Switching the cell indices in burst mode is a work around for hardware procedure
deficiency that produces wrong data for dark runs except for the first storage cell.
This is why multiple acquisitions are taken to switch the used storage cells and
acquire data through two cells for each of the 16 cells instead of acquiring darks through all 16 cells.
"""
print(f"Maximum trains to process is set to {max_trains}")
noise_map = dict()
offset_map = dict()
bad_pixels_map = dict()
for mod in karabo_da:
step_timer.start()
instrument_src = instrument_source_template.format(
karabo_id, receiver_template.format(int(mod[-2:])))
print(f"\n- Instrument data path for {mod} is {instrument_src}.")
# (1024, 512, 1 or 16, 3)
offset_map[mod] = context.alloc(
shape=(sensor_size+(memory_cells, 3)), fill=0, dtype=np.float32)
noise_map[mod] = context.alloc(like=offset_map[mod], fill=0)
bad_pixels_map[mod] = context.alloc(shape=offset_map[mod].shape, dtype=np.uint32, fill=0)
for run_n, [gain, run_dc] in gain_runs.items():
def process_cell(worker_id, array_index, cell_number):
cell_slice_idx = acelltable == cell_number
if cell_slice_idx.sum() == 0:
# This cell is not in the data (or it's deliberated excluded)
bad_pixels_map[mod][..., cell_number, gain] = BadPixels.NO_DARK_DATA.value
offset_map[mod][..., cell_number, gain] = np.nan
noise_map[mod][..., cell_number, gain] = np.nan
return
thiscell = images[..., cell_slice_idx] # [1024, 512, n_trains]
# Identify cells/trains with images of 0 pixels.
# TODO: An investigation is ongoing by DET to identify reason for these empty images.
nonzero_adc = np.any(thiscell != 0 , axis=(0, 1)) # [n_trains]
# Exclude empty images with 0 pixels, before calculating offset and noise
thiscell = thiscell[..., nonzero_adc]
offset_map[mod][..., cell_number, gain] = np.mean( # [1024, 512]
thiscell, axis=2, dtype=np.float32)
noise_map[mod][..., cell_number, gain] = np.std( # [1024, 512]
thiscell, axis=2, dtype=np.float32)
del thiscell
# Check if there are wrong bad gain values.
# 1. Exclude empty images.
# 2. Indicate pixels with wrong gain value for any train for each cell.
# TODO: mean is used to use thresholds for accepting gain values, even if not 0 mean value.
gain_avg = np.mean( # [1024, 512]
gain_vals[..., cell_slice_idx][..., nonzero_adc],
axis=2, dtype=np.float32
)
# Assign WRONG_GAIN_VALUE for a pixel in a badpixel map for all gains.
bad_pixels_map[mod][:, :,cell_number][gain_avg != raw_g] |= BadPixels.WRONG_GAIN_VALUE.value
print(f"Gain stage {gain}, run {run_n}")
# load shape of data for memory cells, and detector size (imgs, cells, x, y)
n_trains = run_dc[instrument_src, "data.adc"].shape[0]
# load number of data available, including trains with empty data.
all_trains = len(run_dc.train_ids)
instr_dc = run_dc.select(instrument_src, require_all=True)
empty_trains = all_trains - n_trains
if empty_trains != 0:
print(f"{mod} has {empty_trains} empty trains out of {all_trains} trains")
if max_trains > 0:
n_trains = min(n_trains, max_trains)
print(f"Processing {n_trains} images.")
if n_trains == 0:
raise ValueError(f"{run_n} has no trains to process.")
if n_trains < min_trains:
warning(f"Less than {min_trains} trains are available in RAW data.")
# Select only requested number of images to process darks.
instr_dc = instr_dc.select_trains(np.s_[:n_trains])
images = np.transpose(
instr_dc[instrument_src, "data.adc"].ndarray(), (3, 2, 1, 0))
acelltable = np.transpose(instr_dc[instrument_src, "data.memoryCell"].ndarray())
gain_vals = np.transpose(
instr_dc[instrument_src, "data.gain"].ndarray(), (3, 2, 1, 0))
# define gain value as saved in raw gain map
raw_g = 3 if gain == 2 else gain
if memory_cells == 1:
acelltable -= sc_start
# Only for dynamic medium and low gain runs [forceswitchg1, forceswitchg2] in burst mode.
if (
gain_mode == 0 and # dynamic gain mode
gain > 0 and # Medium and low runs
memory_cells == 16 and # Burst mode
acelltable.shape[0] == 2 # forceswitchg1 and forceswitchg2 acquired with the MDL device.
):
# 255 similar to the receiver which uses the 255
# value to indicate a cell without an image.
# image shape for forceswitchg1 and forceswitchg2 = (1024, 512, 2, trains)
# compared to expected shape of (1024, 512, 16, trains) for high gain run.
acelltable[1:] = 255
# Calculate offset and noise maps
context.map(process_cell, range(memory_cells))
cells_missing = (bad_pixels_map[mod][0, 0, :, gain] & BadPixels.NO_DARK_DATA) > 0
if np.any(cells_missing):
print(f"No dark data in gain stage {gain} found for cells", np.nonzero(cells_missing)[0])
del images
del acelltable
del gain_vals
step_timer.done_step('Creating Offset and noise constants for a module.')
```
%% Cell type:code id: tags:
``` python
if detailed_report:
display(Markdown("## Offset and Noise Maps:"))
display(Markdown(
"Below offset and noise maps for the high ($g_0$) gain stage are shown, "
"alongside the distribution of these values. One expects block-like "
"structures mapping to the ASICs of the detector"))
g_name = ['G0', 'G1', 'G2']
g_range = [(0, 8000), (8000, 16000), (8000, 16000)]
n_range = [(0., 50.), (0., 50.), (0., 50.)]
unit = '[ADCu]'
# TODO: Fix plots arrangment and speed for Jungfrau burst mode.
step_timer.start()
for pdu, mod in zip(db_modules, karabo_da):
for mod, pdu in da_to_pdu.items():
for g_idx in gains:
for cell in range(0, memory_cells):
f_o0 = heatmapPlot(
np.swapaxes(offset_map[mod][..., cell, g_idx], 0, 1),
y_label="Row",
x_label="Column",
lut_label=unit,
aspect=1.,
vmin=g_range[g_idx][0],
vmax=g_range[g_idx][1],
title=f'Pedestal {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})')
fo0, ax_o0 = plt.subplots()
res_o0 = histPlot(
ax_o0, offset_map[mod][..., cell, g_idx],
bins=800,
range=g_range[g_idx],
facecolor='b',
histotype='stepfilled',
)
ax_o0.tick_params(axis='both',which='major',labelsize=15)
ax_o0.set_title(
f'Module pedestal distribution - Cell {cell:02d} - Module {mod} ({pdu})',
fontsize=15)
ax_o0.set_xlabel(f'Pedestal {g_name[g_idx]} {unit}',fontsize=15)
ax_o0.set_yscale('log')
f_n0 = heatmapPlot(
np.swapaxes(noise_map[mod][..., cell, g_idx], 0, 1),
y_label="Row",
x_label="Column",
lut_label= unit,
aspect=1.,
vmin=n_range[g_idx][0],
vmax=n_range[g_idx][1],
title=f"RMS noise {g_name[g_idx]} - Cell {cell:02d} - Module {mod} ({pdu})",
)
fn0, ax_n0 = plt.subplots()
res_n0 = histPlot(
ax_n0,
noise_map[mod][..., cell, g_idx],
bins=100,
range=n_range[g_idx],
facecolor='b',
histotype='stepfilled',
)
ax_n0.tick_params(axis='both', which='major', labelsize=15)
ax_n0.set_title(
f'Module noise distribution - Cell {cell:02d} - Module {mod} ({pdu})',
fontsize=15)
ax_n0.set_xlabel(
f'RMS noise {g_name[g_idx]} ' + unit, fontsize=15)
plt.show()
step_timer.done_step('Plotting offset and noise maps.')
```
%% Cell type:markdown id: tags:
## Bad Pixel Map ###
The bad pixel map is deduced by comparing offset and noise of each pixel ($v_i$) and each gain ($g$) against the median value for that gain stage:
$$
v_i > \mathrm{median}(v_{k,g}) + n \sigma_{v_{k,g}}
$$
or
$$
v_i < \mathrm{median}(v_{k,g}) - n \sigma_{v_{k,g}}
$$
Values are encoded in a 32 bit mask, where for the dark image deduced bad pixels the following non-zero entries are relevant:
%% Cell type:code id: tags:
``` python
def print_bp_entry(bp):
print("{:<30s} {:032b} -> {}".format(bp.name, bp.value, int(bp.value)))
print_bp_entry(BadPixels.OFFSET_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.NOISE_OUT_OF_THRESHOLD)
print_bp_entry(BadPixels.OFFSET_NOISE_EVAL_ERROR)
print_bp_entry(BadPixels.NO_DARK_DATA)
print_bp_entry(BadPixels.WRONG_GAIN_VALUE)
def eval_bpidx(d):
mdn = np.nanmedian(d, axis=(0, 1))[None, None, :, :]
std = np.nanstd(d, axis=(0, 1))[None, None, :, :]
idx = (d > badpixel_threshold_sigma*std+mdn) | (d < (-badpixel_threshold_sigma)*std+mdn)
return idx
```
%% Cell type:code id: tags:
``` python
step_timer.start()
for pdu, mod in zip(db_modules, karabo_da):
for mod, pdu in da_to_pdu.items():
display(Markdown(f"### Badpixels for module {mod} ({pdu}):"))
offset_abs_threshold = np.array(offset_abs_threshold)
bad_pixels_map[mod][eval_bpidx(offset_map[mod])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(offset_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][eval_bpidx(noise_map[mod])] |= BadPixels.NOISE_OUT_OF_THRESHOLD.value
bad_pixels_map[mod][~np.isfinite(noise_map[mod])] |= BadPixels.OFFSET_NOISE_EVAL_ERROR.value
bad_pixels_map[mod][(offset_map[mod] < offset_abs_threshold[0][None, None, None, :]) | (offset_map[mod] > offset_abs_threshold[1][None, None, None, :])] |= BadPixels.OFFSET_OUT_OF_THRESHOLD.value # noqa
if detailed_report:
for g_idx in gains:
for cell in range(memory_cells):
bad_pixels = bad_pixels_map[mod][:, :, cell, g_idx]
fn_0 = heatmapPlot(
np.swapaxes(bad_pixels, 0, 1),
y_label="Row",
x_label="Column",
lut_label=f"Badpixels {g_name[g_idx]} [ADCu]",
aspect=1.,
vmin=0, vmax=5,
title=f'G{g_idx} Bad pixel map - Cell {cell:02d} - Module {mod} ({pdu})')
step_timer.done_step('Creating bad pixels constant')
```
%% Cell type:markdown id: tags:
## Inject and save calibration constants
%% Cell type:code id: tags:
``` python
step_timer.start()
for mod, db_mod in zip(karabo_da, db_modules):
constants = {
'Offset': np.moveaxis(offset_map[mod], 0, 1),
'Noise': np.moveaxis(noise_map[mod], 0, 1),
'BadPixelsDark': np.moveaxis(bad_pixels_map[mod], 0, 1),
}
constants = {}
for mod, pdu in da_to_pdu.items():
constants['Offset10Hz'] = np.moveaxis(offset_map[mod], 0, 1)
constants['Noise10Hz'] = np.moveaxis(noise_map[mod], 0, 1)
constants['BadPixelsDark10Hz'] = np.moveaxis(bad_pixels_map[mod], 0, 1)
md = None
upper_lower_dev = {"Integration Time": time_limits}
for const_name, const_data in constants.items():
for key, const_data in constants.items():
with NamedTemporaryFile() as tempf:
ccv_root = write_ccv(
tempf.name,
pdu,
pdu_to_uuid[pdu],
detector_info["detector_type"],
const_name,
conditions,
creation_time,
proposal,[run_high, run_med, run_low],
const_data,
const_data.shape,
lower_deviations=upper_lower_dev,
upper_deviations=upper_lower_dev,
)
const = getattr(Constants.jungfrau, key)()
const.data = const_data
if db_output:
inject_ccv(tempf.name, ccv_root, metadata_folder)
for parm in condition.parameters:
if parm.name == "Integration Time":
parm.lower_deviation = time_limits
parm.upper_deviation = time_limits
if db_output:
md = send_to_db(
db_module=db_mod,
karabo_id=karabo_id,
constant=const,
condition=condition,
file_loc=file_loc,
report_path=report,
cal_db_interface=cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout,
)
if local_output:
md = save_const_to_h5(
db_module=db_mod,
karabo_id=karabo_id,
constant=const,
condition=condition,
data=const.data,
file_loc=file_loc,
report=report,
creation_time=creation_time,
out_folder=out_folder,
)
print(f"Calibration constant {key} is stored locally at {out_folder}.\n")
if local_output:
ofile = f"{out_folder}/const_{const_name}_{pdu}.h5"
if os.path.isfile(ofile):
print(f'File {ofile} already exists and will be overwritten\n')
from shutil import copyfile
copyfile(tempf.name, ofile)
print(f"Calibration constant {const_name} is stored locally at {out_folder}.\n")
print("Constants parameter conditions are:\n")
print(
f"• Bias voltage: {bias_voltage}\n"
f"• Memory cells: {memory_cells}\n"
f"• Integration time: {integration_time}\n"
f"• Gain setting: {gain_setting}\n"
f"• Gain mode: {gain_mode}\n"
f"• Creation time: {md.calibration_constant_version.begin_at if md is not None else creation_time}\n") # noqa
step_timer.done_step("Injecting constants.")
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
%% Cell type:code id: tags:
``` python
# now we need the old constants
jf_caldata = CalibrationData.from_condition(
conditions,
karabo_id,
pdu_snapshot_at=creation_time,
# TODO: use prior in time strategy
event_at=creation_time-timedelta(seconds=60) if creation_time else None,
)
```
%% Cell type:code id: tags:
``` python
# Start retrieving existing constants for comparison
step_timer.start()
old_const = {}
old_mdata = {}
old_retrieval_res.wait()
for (mod, const), (data, timestamp, filepath, h5path) in zip(
mod_x_const, old_retrieval_res.get()):
old_const.setdefault(mod, {})[const] = data
old_mdata.setdefault(mod, {})[const] = {
"timestamp": timestamp,
"filepath": filepath,
"h5path": h5path,
}
jf_caldata = CalibrationData.from_condition(
conditions,
karabo_id,
pdu_snapshot_at=creation_time,
# TODO: use prior in time strategy
event_at=creation_time-timedelta(seconds=60) if creation_time else None,
)
for mod in karabo_da:
old_const[mod] = {}
old_mdata[mod] = {}
for cname in constants.keys():
cmdata = jf_caldata.get(cname, None)
data_found = cmdata and mod in cmdata.aggregator_names
old_const[mod][cname] = cmdata[mod].ndarray() if data_found else None
old_mdata[mod][cname] = {
"timestamp": cmdata[mod].metadata("begin_validity_at") if data_found else "Not found", # noqa
"filepath": str(cmdata[mod].get_full_path()) if data_found else None,
"h5path": cmdata[mod].dataset if data_found else None,
}
step_timer.done_step('Retrieved old dark constants for comparison.')
```
%% Cell type:code id: tags:
``` python
display(Markdown("## The following pre-existing constants are used for comparison:"))
for mod, consts in old_mdata.items():
pdu = db_modules[karabo_da.index(mod)]
pdu = da_to_pdu[mod]
display(Markdown(f"- {mod} ({pdu})"))
for const in consts:
display(Markdown(f" - {const} at {consts[const]['timestamp']}"))
# saving locations of old constants for summary notebook
with open(f"{metadata_folder or out_folder}/module_metadata_{mod}.yml", "w") as fd:
yaml.safe_dump(
{
"module": mod,
"pdu": pdu,
"old-constants": old_mdata[mod],
},
fd,
)
```
......
%% Cell type:markdown id: tags:
# LPD Offline Correction #
Author: European XFEL Data Analysis Group
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/schmidtp/random/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
output_source = '' # Output fast data source, empty to use same as input.
output_source = '{karabo_id}/CORR/{module_index}CH0:output' # Output fast data source, empty to use same as input.
xgm_source = 'SA1_XTD2_XGM/DOOCS/MAIN'
xgm_pulse_count_key = 'pulseEnergy.numberOfSa1BunchesActual'
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store' # The calibration database root path to access constant files. For example accessing constants from the test database.
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
use_cell_order = 'auto' # Whether to use memory cell order as a detector condition; auto/always/never
# Correction parameters
offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant.
# Output options
ignore_no_frames_no_pulses = False # Whether to run without SA1 pulses AND frames.
overwrite = True # set to True if existing data should be overwritten
chunks_data = 1 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence).
# Parallelization options
sequences_per_node = 1 # Sequence files to process per node
max_nodes = 8 # Maximum number of SLURM jobs to split correction work into
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)
```
%% Cell type:code id: tags:
``` python
from logging import warning
from pathlib import Path
from time import perf_counter
import gc
import re
import numpy as np
import h5py
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%matplotlib inline
import extra_data as xd
import extra_geom as xg
import pasha as psh
from extra_data.components import LPD1M
import cal_tools.restful_config as rest_cfg
from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData
from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition
from cal_tools.tools import (
CalibrationMetadata,
calcat_creation_time,
write_constants_fragment,
)
from cal_tools.files import DataFile
```
%% Cell type:markdown id: tags:
# Prepare environment
%% Cell type:code id: tags:
``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if karabo_da == ['']:
if modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
# Pick all sequences or those selected.
if not sequences or sequences == [-1]:
do_sequence = lambda seq: True
else:
do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
if use_cell_order not in {'auto', 'always', 'never'}:
raise ValueError("use_cell_order must be auto/always/never")
```
%% Cell type:markdown id: tags:
# Select data to process
%% Cell type:code id: tags:
``` python
data_to_process = []
for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem)
if match[2] not in karabo_da or not do_sequence(int(match[3])):
continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((match[2], inp_path, outp_path))
print('Files to process:')
for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):
print(f'{data_descr[0]}\t{data_descr[1]}')
# Collect the train ID contained in the input LPD files.
inp_lpd_dc = xd.DataCollection.from_paths([x[1] for x in data_to_process])
frame_count = sum([
int(inp_lpd_dc[source, 'image.data'].data_counts(labelled=False).sum())
for source in inp_lpd_dc.all_sources], 0)
if frame_count == 0:
inp_dc = xd.RunDirectory(run_folder) \
.select_trains(xd.by_id[inp_lpd_dc.train_ids])
try:
pulse_count = int(inp_dc[xgm_source, xgm_pulse_count_key].ndarray().sum())
except xd.SourceNameError:
warning(f'Missing XGM source `{xgm_source}`')
pulse_count = None
except xd.PropertyNameError:
warning(f'Missing XGM pulse count key `{xgm_pulse_count_key}`')
pulse_count = None
if pulse_count == 0 and not ignore_no_frames_no_pulses:
warning(f'Affected files contain neither LPD frames nor SA1 pulses '
f'according to {xgm_source}, processing is skipped. If this '
f'incorrect, please contact da-support@xfel.eu')
from sys import exit
exit(0)
elif pulse_count is None:
raise ValueError('Affected files contain no LPD frames and SA1 pulses '
'could not be inferred from XGM data')
else:
raise ValueError('Affected files contain no LPD frames but SA1 pulses')
else:
print(f'Total number of LPD pulses across all modules: {frame_count}')
```
%% Cell type:markdown id: tags:
# Obtain and prepare calibration constants
%% Cell type:code id: tags:
``` python
start = perf_counter()
cell_ids_pattern_s = None
if use_cell_order != 'never':
# Read the order of memory cells used
raw_data = xd.DataCollection.from_paths([e[1] for e in data_to_process])
cell_ids_pattern_s = make_cell_order_condition(
use_cell_order, get_mem_cell_pattern(raw_data, det_inp_sources)
)
print("Memory cells order:", cell_ids_pattern_s)
lpd_cal = LPD_CalibrationData(
detector_name=karabo_id,
modules=karabo_da,
sensor_bias_voltage=bias_voltage,
memory_cells=mem_cells,
feedback_capacitor=capacitor,
source_energy=photon_energy,
memory_cell_order=cell_ids_pattern_s,
category=category,
event_at=creation_time,
client=rest_cfg.calibration_client(),
caldb_root=Path(cal_db_root),
)
lpd_metadata = lpd_cal.metadata(["Offset", "BadPixelsDark"])
try:
illum_metadata = lpd_cal.metadata(lpd_cal.illuminated_calibrations)
for key, value in illum_metadata.items():
lpd_metadata.setdefault(key, {}).update(value)
except CalCatError as e: # TODO: replace when API errors are improved.
warning(f"CalCatError: {e}")
total_time = perf_counter() - start
print(f'Looking up constants {total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
# Validate the constants availability and raise/warn accordingly.
for mod, calibrations in lpd_metadata.items():
missing_offset = {"Offset"} - set(calibrations)
warn_missing_constants = {
"BadPixelsDark", "BadPixelsFF", "GainAmpMap",
"FFMap", "RelativeGain"} - set(calibrations)
if missing_offset:
warning(f"Offset constant is not available to correct {mod}.")
karabo_da.remove(mod)
if warn_missing_constants:
warning(f"Constants {warn_missing_constants} were not retrieved for {mod}.")
if not karabo_da: # Offsets are missing for all modules.
raise Exception("Could not find offset constants for any modules, will not correct data.")
# Remove skipped correction modules from data_to_process
data_to_process = [(mod, in_f, out_f) for mod, in_f, out_f in data_to_process if mod in karabo_da]
```
%% Cell type:code id: tags:
``` python
# write constants metadata to fragment YAML
write_constants_fragment(
out_folder=(metadata_folder or out_folder),
det_metadata=lpd_metadata,
caldb_root=lpd_cal.caldb_root,
)
# Load constants data for all constants
const_data = lpd_cal.ndarray_map(metadata=lpd_metadata)
```
%% Cell type:code id: tags:
``` python
# These are intended in order cell, X, Y, gain
ccv_offsets = {}
ccv_gains = {}
ccv_masks = {}
ccv_shape = (mem_cells, 256, 256, 3)
constant_order = {
'Offset': (2, 1, 0, 3),
'BadPixelsDark': (2, 1, 0, 3),
'RelativeGain': (2, 0, 1, 3),
'FFMap': (2, 0, 1, 3),
'BadPixelsFF': (2, 0, 1, 3),
'GainAmpMap': (2, 0, 1, 3),
}
def prepare_constants(wid, index, aggregator):
consts = const_data.get(aggregator, {})
def _prepare_data(calibration_name, dtype):
# Some old BadPixels constants have <f8 dtype.
# Convert nan to float 0 to avoid having 2147483648 after
# converting float64 to uint32.
if "BadPixels" in calibration_name and consts[calibration_name].dtype != np.uint32:
consts[calibration_name] = np.nan_to_num(
consts[calibration_name], nan=0.0)
return consts[calibration_name] \
.transpose(constant_order[calibration_name]) \
.astype(dtype, copy=True) # Make sure array is contiguous.
if offset_corr and 'Offset' in consts:
ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)
else:
ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)
if 'BadPixelsDark' in consts:
ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)
else:
ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)
if rel_gain and 'RelativeGain' in consts:
ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)
if ff_map and 'FFMap' in consts:
ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)
if 'BadPixelsFF' in consts:
np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),
out=ccv_masks[aggregator])
if gain_amp_map and 'GainAmpMap' in consts:
ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)
print('.', end='', flush=True)
print('Preparing constants', end='', flush=True)
start = perf_counter()
psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
const_data.clear() # Clear raw constants data now to save memory.
gc.collect();
```
%% Cell type:code id: tags:
``` python
def correct_file(wid, index, work):
aggregator, inp_path, outp_path = work
module_index = int(aggregator[-2:])
start = perf_counter()
dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)
inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]
inp_source_name = input_source.format(karabo_id=karabo_id, module_index=module_index)
inp_source = dc[inp_source_name]
open_time = perf_counter() - start
# Load raw data for this file.
# Reshaping gets rid of the extra 1-len dimensions without
# mangling the frame axis for an actual frame count of 1.
start = perf_counter()
in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256)
in_cell = inp_source['image.cellId'].ndarray().reshape(-1)
in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)
read_time = perf_counter() - start
# Allocate output arrays.
out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)
out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)
out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)
start = perf_counter()
correct_lpd_frames(in_raw, in_cell,
out_data, out_gain, out_mask,
ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],
num_threads=num_threads_per_worker)
correct_time = perf_counter() - start
image_counts = inp_source['image.data'].data_counts(labelled=False)
start = perf_counter()
if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:
outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)
with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(dc.train_ids, from_file=dc.files[0])
outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))
outp_file.create_metadata(like=dc, instrument_channels=sorted({
f'{outp_source_name}/image', f'{inp_source_name}/image'
}))
outp_source = outp_file.create_instrument_source(outp_source_name)
outp_source.create_index(image=image_counts)
outp_source.create_key('image.cellId', data=in_cell,
chunks=(min(chunks_ids, in_cell.shape[0]),))
outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(min(chunks_ids, in_pulse.shape[0]),))
outp_source.create_key('image.data', data=out_data,
chunks=(min(chunks_data, out_data.shape[0]), 256, 256))
outp_source.create_compressed_key('image.gain', data=out_gain)
outp_source.create_compressed_key('image.mask', data=out_mask)
if output_source != input_source:
outp_file[f'/INSTRUMENT/{inp_source_name}'] = h5py.SoftLink(f'/INSTRUMENT/{outp_source_name}')
outp_file[f'/INDEX/{inp_source_name}'] = h5py.SoftLink(f'/INDEX/{outp_source_name}')
write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time
frame_rate = in_raw.shape[0] / total_time
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, aggregator, open_time, read_time, correct_time, write_time, total_time,
in_raw.shape[0], frame_rate))
in_raw = None
in_cell = None
in_pulse = None
out_data = None
out_gain = None
out_mask = None
gc.collect()
print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
start = perf_counter()
psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)
total_time = perf_counter() - start
print(f'Total time: {total_time:.1f}s')
```
%% Cell type:markdown id: tags:
# Data preview for first train
%% Cell type:code id: tags:
``` python
geom = xg.LPD_1MGeometry.from_quad_positions(
[(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])
output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]
if not output_paths:
warning('Data preview is skipped as there are no existing output paths')
from sys import exit
exit(0)
dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])
det = LPD1M(dc, detector_name=karabo_id)
data = det.get_array('image.data')
```
%% Cell type:markdown id: tags:
### Intensity histogram across all cells
%% Cell type:code id: tags:
``` python
left_edge_ratio = 0.01
right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value):
return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1)
pass
```
%% Cell type:markdown id: tags:
### First memory cell
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Train average
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Lowest gain stage per pixel
%% Cell type:code id: tags:
``` python
highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
```
%% Cell type:markdown id: tags:
### Create virtual CXI file
%% Cell type:code id: tags:
``` python
if create_virtual_cxi_in:
vcxi_folder = Path(create_virtual_cxi_in.format(
run=run, proposal_folder=str(Path(in_folder).parent)))
vcxi_folder.mkdir(parents=True, exist_ok=True)
def sort_files_by_seq(by_seq, outp_path):
by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path)
return by_seq
from functools import reduce
reduce(sort_files_by_seq, output_paths, output_by_seq := {})
for seq_number, seq_output_paths in output_by_seq.items():
# Create data collection and detector components only for this sequence.
try:
det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4)
except ValueError: # Couldn't find enough data for min_modules
continue
det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi')
```
......
......@@ -88,6 +88,7 @@ install_requires = [
"notebook==6.1.5",
"numpy==1.20.3",
"pasha==0.1.1",
"pillow==10.3.0",
"prettytable==0.7.2",
"princess==0.5",
"pymunge==0.1.3",
......
import os
import posixpath
import zlib
from dataclasses import dataclass, field
from dataclasses import dataclass
from datetime import datetime
from logging import warning
from multiprocessing import Manager
......@@ -29,9 +28,9 @@ from cal_tools.agipdutils import (
melt_snowy_pixels,
)
from cal_tools.enums import AgipdGainMode, BadPixels, SnowResolution
from cal_tools.h5_copy_except import h5_copy_except_paths
from logging import warning
@dataclass
class AgipdCtrl:
"""Access AGIPD control parameters from a single run.
......@@ -732,11 +731,7 @@ class AgipdCorrections:
:param ofile_name: Name of output file including path
:param i_proc: Index of shared memory array
"""
module_idx = int(file_name.split('/')[-1].split('-')[2][-2:])
agipd_base = f'INSTRUMENT/{self.h5_data_path}/'.format(module_idx)
idx_base = self.h5_index_path.format(module_idx)
data_path = f'{agipd_base}/image'
from .files import DataFile
# Obtain a shallow copy of the pointer map to allow for local
# changes in this method.
......@@ -749,23 +744,52 @@ class AgipdCorrections:
n_img = data_dict['nImg'][0]
if n_img == 0:
return
trains = data_dict['trainId'][:n_img]
# Re-cast fields in-place, i.e. using the same memory region.
for field, dtype in self.recast_image_fields.items():
data_dict[field] = cast_array_inplace(data_dict[field], dtype)
with h5py.File(ofile_name, "w") as outfile:
# Copy any other data from the input file.
# This includes indexes, so it's important that the corrected data
# we write is aligned with the raw data.
with h5py.File(file_name, "r") as infile:
self.copy_and_sanitize_non_cal_data(
infile, outfile, agipd_base, idx_base, trains
)
dc = H5File(file_name)
# make index for corrected images
trains, count = np.unique(data_dict['trainId'][:n_img],
return_counts=True)
# parse filename and get parameters
out_folder, fname = os.path.split(ofile_name)
tokens = os.path.splitext(fname)[0].split('-')
runno = int(tokens[1][1:])
modno = int(tokens[2][-2:])
agg = tokens[2]
seqno = int(tokens[3][1:])
agipd_base = self.h5_data_path.format(modno)
karabo_id, _, channel = agipd_base.split('/')
channel = channel.partition(":")[0] + ":output"
agipd_corr_source = f"{karabo_id}/CORR/{channel}"
instrument_channels = [f"{agipd_corr_source}/image"]
# backward compatibility BEGIN
instrument_channels.append(f"{agipd_base}/image")
# backward compatibility END
with DataFile.from_details(out_folder, agg, runno, seqno) as outfile:
outfile.create_metadata(
like=dc, instrument_channels=instrument_channels)
outfile.create_index(trains, from_file=dc.files[0])
# All corrected data goes in a /INSTRUMENT/.../image group
image_grp = outfile[data_path]
agipd_src = outfile.create_instrument_source(agipd_corr_source)
agipd_src.create_index(image=count)
image_grp = agipd_src.require_group("image")
# backward compatibility BEGIN
outfile[f"INDEX/{agipd_base}"] = h5py.SoftLink(
f"/INDEX/{agipd_corr_source}")
outfile[f"INSTRUMENT/{agipd_base}"] = h5py.SoftLink(
f"/INSTRUMENT/{agipd_corr_source}")
# backward compatibility END
# Set up all the datasets before filling them. This puts the
# metadata about the datasets together at the start of the file,
......@@ -1229,74 +1253,7 @@ class AgipdCorrections:
return n_img_sel
def copy_and_sanitize_non_cal_data(self, infile, outfile, agipd_base,
idx_base, trains):
""" Copy and sanitize data in `infile` that is not touched by
`correctAGIPD`
"""
# these are touched in the correct function, do not copy them here
dont_copy = ["data", "cellId", "trainId", "pulseId", "status",
"length"]
dont_copy = [posixpath.join(agipd_base, "image", ds)
for ds in dont_copy]
# don't copy index as we may need to adjust if we filter trains
dont_copy.append(posixpath.join(idx_base, "image"))
h5_copy_except_paths(infile, outfile, dont_copy)
# sanitize indices
for do in ["image", ]:
# uq: INDEX/trainID
# fidxv: INDEX/.../image/first idx values
# cntsv: INDEX/.../image/counts values
# Extract parameters through identifying
# unique trains, index and numbers.
uq, fidxv, cntsv = np.unique(trains, return_index=True, return_counts=True) # noqa
# Validate calculated CORR INDEX contents by checking
# difference between trainId stored in RAW data and trains from
train_diff = np.isin(np.array(infile["/INDEX/trainId"]), uq, invert=True) # noqa
# Insert zeros for missing trains.
# fidxv and cntsv should have same length as
# raw INDEX/.../image/first and INDEX/.../image/count,
# respectively
# first_inc = first incrementation
first_inc = True
for i, diff in enumerate(train_diff):
if diff:
if i < len(cntsv):
cntsv = np.insert(cntsv, i, 0)
fidxv = np.insert(fidxv, i, 0) if i == 0 else np.insert(fidxv, i, fidxv[i])
else:
# append if at the end of the array
cntsv = np.append(cntsv, 0)
# increment fidxv once with the
# no. of processed mem-cells.
if first_inc:
fidxv = np.append(fidxv,
(2 * fidxv[i-1]) - fidxv[i-2])
first_inc = False
else:
fidxv = np.append(fidxv, fidxv[i-1])
# save INDEX contents (first, count) in CORR files
outfile.create_dataset(idx_base + "{}/first".format(do),
fidxv.shape,
dtype=fidxv.dtype,
data=fidxv,
fletcher32=True)
outfile.create_dataset(idx_base + "{}/count".format(do),
cntsv.shape,
dtype=cntsv.dtype,
data=cntsv,
fletcher32=True)
def init_constants(
self, cons_data: dict, module_idx: int, variant: dict):
def init_constants(self, cons_data: dict, module_idx: int, variant: dict):
"""
For CI derived gain, a mean multiplication factor of 4.48 compared
to medium gain is used, as no reliable CI data for all memory cells
......
......@@ -258,13 +258,16 @@ class SingleConstant:
_have_calcat_metadata=True,
)
def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
def get_full_path(self, caldb_root=None):
if caldb_root is not None:
caldb_root = Path(caldb_root)
else:
caldb_root = _get_default_caldb_root()
return caldb_root / self.path
def dataset_obj(self, caldb_root=None) -> h5py.Dataset:
f = h5py.File(caldb_root / self.path, "r")
f = h5py.File(self.get_full_path(caldb_root), "r")
return f[self.dataset]["data"]
def ndarray(self, caldb_root=None):
......@@ -608,7 +611,7 @@ class CalibrationData(Mapping):
module_details = sorted(pdus.values(), key=lambda d: d["karabo_da"])
return cls(constant_groups, module_details, det_name)
def __getitem__(self, key) -> MultiModuleConstant:
def __getitem__(self, key):
if isinstance(key, str):
return MultiModuleConstant(
self.constant_groups[key], self.module_details, self.detector_name, key
......@@ -725,6 +728,84 @@ class CalibrationData(Mapping):
return type(self)(constant_groups, module_details, det_name)
def markdown_table(self, module_naming="modnum") -> str:
"""Make a markdown table overview of the constants found.
Columns are calibration types, rows are modules.
If there are >4 calibrations, the table will be split up into several
pieces with up to 4 calibrations in each.
Args:
module_naming (str): modnum, aggregator or qm, to change how the
modules are labelled in the table. Defaults to modnum.
"""
from tabulate import tabulate
if module_naming == "aggregator":
modules = self.aggregator_names
elif module_naming == "modnum":
modules = self.module_nums
elif module_naming == "qm":
modules = self.qm_names
else:
raise ValueError(
f"{module_naming=} (must be 'aggregator', 'modnum' or 'qm'"
)
cal_groups = [
sorted(self.constant_groups)[x:x+4] for x in range(0, len(self.constant_groups), 4)
]
md_tables = []
# Loop over groups of calibrations.
for cal_group in cal_groups:
table = [["Modules"] + cal_group]
# Loop over calibrations and modules to form the next rows.
for mod in modules:
mod_consts = []
for cname in cal_group:
try:
singleconst = self[cname, mod]
except KeyError:
# Constant is not available for this module.
mod_consts.append("")
else:
# Have the creation time a reference
# link to the CCV on CALCAT.
c_time = datetime.fromisoformat(
singleconst.metadata("begin_validity_at")).strftime(
"%Y-%m-%d %H:%M")
try:
view_url = singleconst.metadata("view_url")
mod_consts.append(f"[{c_time}]({view_url})")
except KeyError:
mod_consts.append(f"{c_time} ({singleconst.ccv_id})")
table.append([mod] + mod_consts)
md_tables.append(tabulate(table, tablefmt="pipe", headers="firstrow"))
return '\n\n'.join(md_tables)
def display_markdown_table(self, module_naming="modnum"):
"""Make a markdown table overview of the constants found.
Columns are calibration types, rows are modules.
If there are >4 calibrations, the table will be split up into several
pieces with up to 4 calibrations in each.
Args:
ccvs_url (str, optional): URL for calibration constant versions.
Defaults to
"https://in.xfel.eu/calibration/calibration_constant_versions/".
module_naming (str): modnum, aggregator or qm, to change how the
modules are labelled in the table. Defaults to modnum.
"""
from IPython.display import display, Markdown
display(Markdown(self.markdown_table(module_naming=module_naming)))
class ConditionsBase:
calibration_types = {} # For subclasses: {calibration: [parameter names]}
......@@ -849,6 +930,46 @@ class DSSCConditions(ConditionsBase):
}
@dataclass
class JUNGFRAUConditions(ConditionsBase):
"""Conditions for JUNGFRAU detectors"""
sensor_bias_voltage: float
memory_cells: int
integration_time: float
gain_setting: int
gain_mode: Optional[int] = None
sensor_temperature: float = 291
pixels_x: int = 1024
pixels_y: int = 512
_params = [
"Sensor Bias Voltage",
"Memory Cells",
"Pixels X",
"Pixels Y",
"Integration Time",
"Sensor temperature",
"Gain Setting",
"Gain mode",
]
calibration_types = {
"Offset10Hz": _params,
"Noise10Hz": _params,
"BadPixelsDark10Hz": _params,
"RelativeGain10Hz": _params,
"BadPixelsFF10Hz": _params,
}
def make_dict(self, parameters):
cond = super().make_dict(parameters)
# Fix-up some database quirks.
if int(cond.get("Gain mode", -1)) == 0:
del cond["Gain mode"]
return cond
@dataclass
class ShimadzuHPVX2Conditions(ConditionsBase):
burst_frame_count: float
......
import binascii
import time
import warnings
from datetime import datetime, timezone
from struct import pack, unpack
from hashlib import md5
from pathlib import Path
from shutil import copyfile
from hashlib import md5
import binascii
import time
from struct import pack, unpack
import numpy as np
import h5py
import numpy as np
from calibration_client import CalibrationClient
from cal_tools.calcat_interface2 import _get_default_caldb_root, get_client
from cal_tools.tools import run_prop_seq_from_path
from cal_tools.restful_config import calibration_client
from cal_tools.tools import run_prop_seq_from_path
class CCVAlreadyInjected(UserWarning):
"""Exception when same CCV was already injected.
expected response: {
'success': False, 'status_code': 422,
'info': 'Error creating calibration_constant_version',
'app_info': {
'calibration_constant_id': ['has already been taken'],
'physical_detector_unit_id': ['has already been taken'],
'begin_at': ['has already been taken']
}, 'pagination': {}, 'data': {}}
"""
pass
def custom_warning_formatter(
message, category, filename, lineno, file=None, line=None):
"""Custom warning format to avoid display filename and lineno."""
return f"{category.__name__}: {message}\n"
# Apply the custom warning formatter
warnings.formatwarning = custom_warning_formatter
def write_ccv(
const_path,
pdu_name, pdu_uuid, detector_type,
calibration, conditions, created_at, proposal, runs,
data, dims, key='0'
data, dims, key='0', lower_deviations={}, upper_deviations={},
):
"""Write CCV data file.
......@@ -73,8 +96,8 @@ def write_ccv(
key = db_name.lower().replace(' ', '_')
dset = opcond_group.create_dataset(key, data=value,
dtype=np.float64)
dset.attrs['lower_deviation'] = 0.0
dset.attrs['upper_deviation'] = 0.0
dset.attrs['lower_deviation'] = lower_deviations.get(db_name, 0.0)
dset.attrs['upper_deviation'] = upper_deviations.get(db_name, 0.0)
dset.attrs['database_name'] = db_name
dset = ccv_group.create_dataset('data', data=data)
......@@ -83,56 +106,31 @@ def write_ccv(
return ccv_group_name
def inject_ccv(const_src, ccv_root, report_to=None):
"""Inject new CCV into CalCat.
Args:
const_path (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
Returns:
None
def _get_ccv_attributes(ccv_group):
return (
ccv_group.attrs['proposal'],
ccv_group.attrs['runs'],
ccv_group.attrs['begin_at'],
)
Raises:
RuntimeError: If CalCat POST request fails.
"""
pdu_name, calibration, key = ccv_root.lstrip('/').split('/')
def condition_parameters(condition_group):
params = []
with h5py.File(const_src, 'r') as const_file:
pdu_group = const_file[pdu_name]
pdu_uuid = pdu_group.attrs['uuid']
detector_type = pdu_group.attrs['detector_type']
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
params.append({
'parameter_name': param_dset.attrs['database_name'],
'value': float(param_dset[()]),
'lower_deviation_value': param_dset.attrs['lower_deviation'],
'upper_deviation_value': param_dset.attrs['upper_deviation'],
'flg_available': True
})
return params
ccv_group = const_file[ccv_root]
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs']
begin_at_str = ccv_group.attrs['begin_at']
condition_group = ccv_group['operating_condition']
cond_params = []
# It's really not ideal we're mixing conditionS and condition now.
for parameter in condition_group:
param_dset = condition_group[parameter]
cond_params.append({
'parameter_name': param_dset.attrs['database_name'],
'value': float(param_dset[()]),
'lower_deviation_value': param_dset.attrs['lower_deviation'],
'upper_deviation_value': param_dset.attrs['upper_deviation'],
'flg_available': True
})
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
if proposal and len(runs) > 0:
raw_data_location = 'proposal:{} runs: {}'.format(
proposal, ' '.join([str(x) for x in runs]))
else:
pass # Fallback for non-run based constants
def generate_unique_name(detector_type, pdu_name, pdu_uuid, cond_params):
# Generate condition name.
unique_name = detector_type[:detector_type.index('-Type')] + ' Def'
cond_hash = md5(pdu_name.encode())
......@@ -144,7 +142,26 @@ def inject_ccv(const_src, ccv_root, report_to=None):
cond_hash.update(str(param_dict['value']).encode())
unique_name += binascii.b2a_base64(cond_hash.digest()).decode()
unique_name = unique_name[:60]
return unique_name[:60]
def prepare_injection_payload(
ccv_attrs,
pdu_attrs,
cond_params,
const_filename,
const_rel_path,
unique_name,
report_to
):
proposal, runs, begin_at_str = ccv_attrs
pdu_name, calibration, detector_type, pdu_uuid = pdu_attrs
if proposal and len(runs) > 0:
raw_data_location = (
f'proposal:{proposal} runs: {" ".join([str(x) for x in runs])}')
else:
pass # Fallback for non-run based constants
# Add PDU "UUID" to parameters.
cond_params.append({
......@@ -186,7 +203,56 @@ def inject_ccv(const_src, ccv_root, report_to=None):
'name': report_path.stem,
'file_path': str(report_path)
}
return inject_h
def inject_ccv(const_src, ccv_root, report_to=None):
"""Inject new CCV into CalCat.
Args:
const_path (str or Path): Path to CCV data file.
ccv_root (str): CCV HDF group name.
report_to (str): Metadata location.
Returns:
None
Raises:
RuntimeError: If CalCat POST request fails.
"""
pdu_name, calibration, key = ccv_root.lstrip('/').split('/')
with h5py.File(const_src, 'r') as const_file:
pdu_group = const_file[pdu_name]
pdu_uuid = pdu_group.attrs['uuid']
detector_type = pdu_group.attrs['detector_type']
ccv_group = const_file[ccv_root]
ccv_attrs = _get_ccv_attributes(ccv_group)
proposal, runs = ccv_group.attrs['proposal'], ccv_group.attrs['runs']
begin_at_str = ccv_group.attrs['begin_at']
condition_group = ccv_group['operating_condition']
cond_params = condition_parameters(condition_group)
const_rel_path = f'xfel/cal/{detector_type.lower()}/{pdu_name.lower()}'
const_filename = f'cal.{time.time()}.h5'
unique_name = generate_unique_name(
detector_type, pdu_name, pdu_uuid, cond_params)
pdu_attrs = (pdu_name, calibration, detector_type, pdu_uuid)
inject_h = prepare_injection_payload(
ccv_attrs,
pdu_attrs,
cond_params,
const_filename,
const_rel_path,
unique_name,
report_to
)
const_dest = _get_default_caldb_root() / const_rel_path / const_filename
const_dest.parent.mkdir(parents=True, exist_ok=True)
copyfile(const_src, const_dest)
......@@ -196,4 +262,14 @@ def inject_ccv(const_src, ccv_root, report_to=None):
if not resp['success']:
const_dest.unlink() # Delete already copied CCV file.
raise RuntimeError(resp)
# Unfortunately this is the best we can do for a check at the moment.
if (
resp['status_code'] == 422 and
"taken" in resp['app_info'].get("begin_at", [""])[0]
):
warnings.warn(
f"{calibration} calibration constant version for {pdu_name}"
" has been already injected.\n",
CCVAlreadyInjected)
else:
raise RuntimeError(resp)
import numpy as np
import pytest
import re
import xarray as xr
from cal_tools.calcat_interface2 import (
......@@ -36,6 +37,10 @@ def test_AGIPD_CalibrationData_metadata():
assert isinstance(agipd_cd["Offset", "AGIPD00"], SingleConstant)
assert agipd_cd["Offset", "Q1M2"] == agipd_cd["Offset", "AGIPD01"]
assert re.search(
r"\[2022-\d{2}-\d{2} \d{2}:\d{2}\]\(https://in.xfel.eu/", agipd_cd.markdown_table()
)
@pytest.mark.requires_gpfs
def test_AGIPD_merge():
......