Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • calibration/pycalibration
1 result
Show changes
Commits on Source (44)
Showing
with 779 additions and 621 deletions
%% Cell type:markdown id: tags:
# JUNGFRAU Retrieving Constants Pre-correction #
Author: European XFEL Detector Group, Version: 1.0
Retrieving Required Constants for Offline Calibration of the JUNGFRAU Detector
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SPB/202130/p900204/raw" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/remove" # the folder to output to, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 112 # run to process, required
sequences = [-1] # sequences to correct, set to [-1] for all, range allowed
sequences_per_node = 1 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel
run = 95 # run to process, required
# Parameters used to access raw data.
karabo_id = "SPB_IRDA_JF4M" # karabo prefix of Jungfrau devices
karabo_da = ['JNGFR01', 'JNGFR02', 'JNGFR03', 'JNGFR04', 'JNGFR05', 'JNGFR06', 'JNGFR07', 'JNGFR08'] # data aggregators
receiver_template = "JNGFR{:02d}" # Detector receiver template for accessing raw data files. e.g. "JNGFR{:02d}"
instrument_source_template = '{}/DET/{}:daqOutput' # template for source name (filled with karabo_id & receiver_id). e.g. 'SPB_IRDA_JF4M/DET/JNGFR01:daqOutput'
ctrl_source_template = '{}/DET/CONTROL' # template for control source name (filled with karabo_id_control)
karabo_id_control = "" # if control is on a different ID, set to empty string if it is the same a karabo-id
# Parameters for calibration database.
use_dir_creation_date = True # use the creation data of the input dir for database queries
cal_db_interface = "tcp://max-exfl016:8017#8025" # the database interface to use
cal_db_timeout = 180000 # timeout on cal db requests
creation_time = "" # To overwrite the measured creation_time. Required Format: YYYY-MM-DD HR:MN:SC e.g. "2022-06-28 13:00:00"
# Parameters affecting corrected data.
relative_gain = True # do relative gain correction
# Parameters for retrieving calibration constants
manual_slow_data = False # if true, use manually entered bias_voltage, integration_time, gain_setting, and gain_mode values
integration_time = 4.96 # integration time in us, will be overwritten by value in file
gain_setting = 0 # 0 for dynamic gain, 1 for dynamic HG0, will be overwritten by value in file
gain_mode = 0 # 0 for runs with dynamic gain setting, 1 for fixgain. It will be overwritten by value in file, if manual_slow_data is set to True.
mem_cells = -1 # Set mem_cells to -1 to automatically use the value stored in RAW data.#
bias_voltage = 180 # will be overwritten by value in file
```
%% Cell type:code id: tags:
``` python
import datetime
from functools import partial
from logging import warning
import multiprocessing
from extra_data import RunDirectory
from pathlib import Path
from cal_tools.calcat_interface import JUNGFRAU_CalibrationData
from cal_tools.jungfraulib import JungfrauCtrl
import cal_tools.restful_config as rest_cfg
from cal_tools.step_timing import StepTimer
from cal_tools.tools import (
get_dir_creation_date,
get_from_db,
calcat_creation_time,
CalibrationMetadata,
)
from iCalibrationDB import Conditions, Constants
```
%% Cell type:code id: tags:
``` python
in_folder = Path(in_folder)
out_folder = Path(out_folder)
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata["retrieved-constants"] = dict()
run_folder = in_folder / f'r{run:04d}'
run_dc = RunDirectory(run_folder)
out_folder.mkdir(parents=True, exist_ok=True)
creation_time = None
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run)
print(f"Using {creation_time} as creation time")
# Run's creation time:
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f"Creation time: {creation_time}")
if karabo_id_control == "":
karabo_id_control = karabo_id
```
%% Cell type:code id: tags:
``` python
ctrl_src = ctrl_source_template.format(karabo_id_control)
ctrl_data = JungfrauCtrl(run_dc, ctrl_src)
if mem_cells < 0:
memory_cells, sc_start = ctrl_data.get_memory_cells()
mem_cells_name = "single cell" if memory_cells == 1 else "burst"
print(f"Run is in {mem_cells_name} mode.\nStorage cell start: {sc_start:02d}")
else:
memory_cells = mem_cells
mem_cells_name = "single cell" if memory_cells == 1 else "burst"
print(f"Run is in manually set to {mem_cells_name} mode. With {memory_cells} memory cells")
if not manual_slow_data:
integration_time = ctrl_data.get_integration_time()
bias_voltage = ctrl_data.get_bias_voltage()
gain_setting = ctrl_data.get_gain_setting()
gain_mode = ctrl_data.get_gain_mode()
print(f"Integration time is {integration_time} us")
print(f"Gain setting is {gain_setting} (run settings: {ctrl_data.run_settings}")
print(f"Gain mode is {gain_mode} ({ctrl_data.run_mode})")
print(f"Bias voltage is {bias_voltage} V")
print(f"Number of memory cells are {memory_cells}")
```
%% Cell type:code id: tags:
``` python
condition = Conditions.Dark.jungfrau(
step_timer = StepTimer()
```
%% Cell type:code id: tags:
``` python
step_timer.start()
jf_cal = JUNGFRAU_CalibrationData(
detector_name=karabo_id,
sensor_bias_voltage=bias_voltage,
event_at=creation_time,
modules=karabo_da,
memory_cells=memory_cells,
bias_voltage=bias_voltage,
integration_time=integration_time,
gain_setting=gain_setting,
gain_mode=gain_mode,
client=rest_cfg.calibration_client(),
)
constant_names = ["Offset10Hz", "BadPixelsDark10Hz"]
def get_constants_for_module(mod: str):
"""Get calibration constants for given module for Jungfrau."""
retrieval_function = partial(
get_from_db,
karabo_id=karabo_id,
karabo_da=mod,
cal_db_interface=cal_db_interface,
creation_time=creation_time,
timeout=cal_db_timeout,
verbosity=0,
meta_only=True,
load_data=False,
empty_constant=None
)
mdata_dict = dict()
mdata_dict["constants"] = dict()
constants = [
"Offset", "BadPixelsDark",
"RelativeGain", "BadPixelsFF",
]
for cname in constants:
mdata_dict["constants"][cname] = dict()
if not relative_gain and cname in ["BadPixelsFF", "RelativeGain"]:
continue
_, mdata = retrieval_function(
condition=condition,
constant=getattr(Constants.jungfrau, cname)(),
)
mdata_const = mdata.calibration_constant_version
const_mdata = mdata_dict["constants"][cname]
# check if constant was successfully retrieved.
if mdata.comm_db_success:
const_mdata["file-path"] = (
f"{mdata_const.hdf5path}" f"{mdata_const.filename}"
)
const_mdata["dataset-name"] = mdata_const.h5path
const_mdata["creation-time"] = f"{mdata_const.begin_at}"
mdata_dict["physical-detector-unit"] = mdata_const.device_name
else:
const_mdata["file-path"] = None
const_mdata["creation-time"] = None
return mdata_dict, mod
```
if relative_gain:
constant_names += ["BadPixelsFF10Hz", "RelativeGain10Hz"]
%% Cell type:code id: tags:
jf_metadata = jf_cal.metadata(constant_names)
``` python
# Constant paths are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata.setdefault("retrieved-constants", {})
# Avoid retrieving constants for available modules in calibration_metadata.yml
# This is used during reproducability.
query_karabo_da = []
for mod in karabo_da:
if mod in retrieved_constants.keys():
print(f"Constant for {mod} already in "
"calibration_metadata.yml, won't query again.")
continue
query_karabo_da.append(mod)
with multiprocessing.Pool() as pool:
results = pool.map(get_constants_for_module, query_karabo_da)
# Add constants metadata in retrieved_constants dict.
for mod, ccv_dict in jf_metadata.items():
mod_dict = retrieved_constants.setdefault(mod, dict())
const_dict = mod_dict.setdefault("constants", dict())
for cname, ccv_metadata in ccv_dict.items():
const_dict[cname] = {
"path": str(jf_cal.caldb_root / ccv_metadata["path"]),
"dataset": ccv_metadata["dataset"],
"creation-time": ccv_metadata["begin_validity_at"],
"ccv_id": ccv_metadata["ccv_id"],
}
mod_dict["physical-name"] = ccv_metadata["physical_name"]
# Validate the constants availability and raise/warn correspondingly.
missing_dark_modules = set()
for mod, calibrations in jf_metadata.items():
missing_dark_constants = {"Offset10Hz", "BadPixelsDark10Hz"} - set(calibrations)
missing_gain_constants = {"BadPixelsFF10Hz", "RelativeGain10Hz"} - set(calibrations)
if missing_dark_constants:
warning(
f"Dark constants {missing_dark_constants} are not available to correct {mod}")
missing_dark_modules.add(mod)
if missing_gain_constants:
warning(
f"Gain constants {missing_gain_constants} were not retrieved. Module: {mod}")
if missing_dark_modules == set(karabo_da):
raise ValueError(f"Dark constants are missing for all modules {karabo_da}.")
step_timer.done_step(f'Retrieving calibration constants.')
```
%% Cell type:code id: tags:
``` python
timestamps = dict()
for md_dict, mod in results:
retrieved_constants[mod] = md_dict
for mod in karabo_da:
module_timestamps = timestamps[mod] = dict()
module_constants = retrieved_constants[mod]
print(f"Module: {mod}:")
for cname, mdata in module_constants["constants"].items():
if hasattr(mdata["creation-time"], 'strftime'):
mdata["creation-time"] = mdata["creation-time"].strftime('%y-%m-%d %H:%M')
print(f'{cname:.<12s}', mdata["creation-time"])
print(f"\t{cname:.<12s}", mdata["creation-time"])
for cname in ["Offset", "BadPixelsDark", "RelativeGain", "BadPixelsFF"]:
for cname in constant_names:
if cname in module_constants["constants"]:
module_timestamps[cname] = module_constants["constants"][cname]["creation-time"]
else:
module_timestamps[cname] = "NA"
time_summary = retrieved_constants.setdefault("time-summary", {})
time_summary = timestamps
retrieved_constants["time-summary"] = timestamps
metadata.save()
```
......
%% Cell type:code id: tags:
``` python
# Data selection parameters.
run = 104 # Run ID.
in_folder = '/gpfs/exfel/exp/SQS/202101/p002535/raw' # Partial input path appended with run ID.
out_folder = '/gpfs/exfel/exp/SQS/202101/p002535/scratch/cal_test' # Full path to output folder.
calib_config_path = '/gpfs/exfel/exp/SQS/202101/p002535/usr/config_board2+4.yaml' # Path to correction and transform configuration
# These parameters are required by xfel-calibrate but ignored in this notebook.
cycle = '' # Proposal cycle, currently not used.
cal_db_timeout = 0 # Calibration DB timeout, currently not used.
cal_db_interface = 'foo' # Calibration DB interface, currently not used.
karabo_da = 'bar' # Karabo data aggregator name, currently not used
# Output parameters.
karabo_id = 'SQS_REMI_DLD6' # Karabo device ID root for virtual output device.
proposal = '' # Proposal, leave empty for auto detection based on in_folder
out_aggregator = 'REMI01' # Aggregator name for output files.
out_seq_len = 5000 # Number of trains per sequence file in output.
det_device_id = '{karabo_id}/DET/{det_name}' # Karabo device ID for virtual output device.
det_output_key = 'output' # Pipeline name for fast data output.
save_raw_triggers = True # Whether to save trigger position in files.
save_raw_edges = True # Whether to save digitized edge positions in files.
save_raw_amplitudes = True # Whether to save analog pulse amplitudes in files.
save_rec_signals = True # Whether to save reconstructed signals (u1-w2, mcp) in files.
save_rec_hits = True # Whether to save reoncstructed hits (x,y,t,m) in files.
chunks_triggers = [500] # HDF chunk size for triggers.
chunks_edges = [500, 7, 50] # HDF chunk size for edges.
chunks_amplitudes = [500, 7, 50] # HDF chunk size for amplitudes.
chunks_hits = [50, 50] # HDF chunk size for hits.
chunks_signals = [50, 50] # HDF chunk size for signals.
dataset_compression = 'gzip' # HDF compression method.
dataset_compression_opts = 3 # HDF GZIP compression level.
# Detector parameters.
quad_anode = False # Reconstruction assumes a hex anode by default, change for quad anodes.
# Trigger parameters.
ppt_source = 'SQS_RR_UTC/TSYS/TIMESERVER:outputBunchPattern'
ignore_fel = False # Ignore any FEL entries in the PPT.
ignore_ppl = False # Ignore any PPL entries in the PPT.
ppl_offset = 0 # In units of the PPT.
laser_ppt_mask = -1 # Bit mask for used laser, negative to auto-detect from instrument.
instrument_sase = 3 # Which SASE we're running at for PPT decoding.
first_pulse_offset = 10000 # Sample position where the first pulse begins, ignored when PPT is reconstructed.
single_pulse_length = 25000 # How many samples if there's only one pulse.
pulse_start_offset = 0 # Signal offset at the start of each pulse.
pulse_end_offset = 0 # Signal offset at the end of each pulse.
# PPT reconstruction parameters.
reconstruct_ppt = False # Reconstruct PPT from some trigger edges.
trigger_edge_channel = '4_D' # Channel to use for triggering.
trigger_edge_offset = 0 # Offset to apply to the first trigger edge position to compute first pulse offset.
fake_ppt_offset = 0 # Offset in reconstructed PPT for pulses.
# Parallelization parameters.
mp_find_triggers = 0.5 # Parallelization for finding triggers.
mp_find_edges = 0.5 # Parallelization for digitizing analog signal.
mt_avg_trace = 2 # Parallelization for trace averaging.
mp_rec_hits = 1.0 # Parallelization for hit reconstruction.
```
%% Cell type:code id: tags:
``` python
from datetime import datetime
from logging import warning
from pathlib import Path
import re
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import LogNorm
from threadpoolctl import threadpool_limits
import re
import h5py
from pathlib import Path
from datetime import datetime
import pasha as psh
from euxfel_bunch_pattern import indices_at_sase, indices_at_laser
from extra_data import RunDirectory
from extra_remi import Analysis, trigger_dt
from extra_remi.util import timing
from extra_remi.rd_resort import signal_dt, hit_dt
from extra_remi.files import DataFile, sequence_pulses
if quad_anode:
from extra_remi.plots import plot_detector_diagnostics_quad as plot_detector_diagnostics
else:
from extra_remi.plots import plot_detector_diagnostics_hex as plot_detector_diagnostics
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
def finite_flattened_slice(array, slice_=np.s_[:]):
"""Return flattened and finite values for a given slice."""
sliced_array = array[slice_]
return sliced_array[np.isfinite(sliced_array)]
```
%% Cell type:code id: tags:
``` python
calib_config_path = Path(calib_config_path)
if not calib_config_path.is_file():
# If the path cannot be resolved right now, try the same path relative to in_folder.
calib_config_path = Path(in_folder) / calib_config_path
if not calib_config_path.is_file():
# Disallow implicit config file creation.
raise ValueError('calib_config_path not found - neither absolute nor relative to in_folder')
remi = Analysis(calib_config_path, use_hex=not quad_anode)
with timing('open_run'):
dc = remi.prepare_dc(RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True),
require_ppt=not reconstruct_ppt)
```
%% Cell type:markdown id: tags:
# Transformation parameters
%% Cell type:code id: tags:
``` python
def print_leaf(leaf, indent=0):
for key, value in leaf.items():
if isinstance(value, dict):
print(indent * 4 * ' ' + key)
print_leaf(value, indent=indent+1)
else:
print(indent * 4 * ' ' + f'{key}: {value}')
print_leaf(remi.tree)
```
%% Cell type:markdown id: tags:
# Pulse and trigger information
%% Cell type:markdown id: tags:
### Read PPT from file or reconstruct PPT for older data
%% Cell type:code id: tags:
``` python
if reconstruct_ppt:
# Take up to the first hundred trains for now.
# Could be done for each train individually, but likely not necessary for now.
trigger_trace = dc[remi['digitizer']['source'], remi['digitizer']['key_pattern'].format(trigger_edge_channel)] \
[:100].ndarray().mean(axis=0).astype(np.float64)
trigger_trace -= trigger_trace[0] # Use simple offset correction.
fake_ppt = np.zeros(2700, dtype=np.uint32)
discr_func, discr_params = remi.get_discriminator([trigger_edge_channel])
edges = np.zeros(1000, dtype=np.float64)
num_pulses = discr_func(trigger_trace, edges, **discr_params[0])
num_pulses = discr_func(trigger_trace, edges=edges, **discr_params[0])
edges = edges[:num_pulses]
first_edge = edges[0]
rel_edges = np.round(edges - first_edge)
edge_diff = rel_edges[1] - rel_edges[0]
if not np.allclose(rel_edges[1:] - rel_edges[:-1], edge_diff):
raise ValueError('PPT reconstruction for unstable edge intervals not supported')
pulse_spacing = edge_diff / (2 * remi['digitizer']['clock_factor']) # In units of PPT
if not float.is_integer(pulse_spacing):
raise ValueError('PPT reconstruction encountered non-integer pulse spacing')
pulse_spacing = int(pulse_spacing)
# Taken from euxfel_bunch_pattern/__init__.py
from euxfel_bunch_pattern import DESTINATION_T4D, DESTINATION_T5D, PHOTON_LINE_DEFLECTION
if instrument_sase == 1:
flag = DESTINATION_T4D
elif instrument_sase == 2:
flag = DESTINATION_T5D
elif instrument_sase == 3:
flag = DESTINATION_T4D | PHOTON_LINE_DEFLECTION
first_pulse_offset = int(first_edge + trigger_edge_offset) # Overwrite notebook argument.
fake_ppt[fake_ppt_offset:fake_ppt_offset + (pulse_spacing * num_pulses):pulse_spacing] = flag
from pasha.functor import Functor, gen_split_slices
class FakeKeyDataFunctor(Functor):
"""Functor appearing KeyData-like with constant data.
This functor serves a constant data row for a given number
of train IDs the same way a KeyData object would.
"""
def __init__(self, row, train_ids):
self.row = row
self.train_ids = train_ids
def split(self, num_workers):
return gen_split_slices(len(self.train_ids), n_parts=num_workers)
def iterate(self, share):
it = zip(range(*share.indices(len(self.train_ids))), self.train_ids)
for index, train_id in it:
yield index, train_id, self.row
ppt_data = FakeKeyDataFunctor(fake_ppt, dc.train_ids)
fig, ax = plt.subplots(num=99, figsize=(9, 6), clear=True, ncols=1, nrows=1)
ax.set_title('Edge trigger signal')
ax.plot(trigger_trace, lw=1, label=f'Mean {trigger_edge_channel} trace')
ax.vlines(edges, trigger_trace.min()*1.1, trigger_trace.max()*1.1,
color='red', linewidth=3, alpha=0.3, label='Edge positions')
ax.set_xlabel('Samples')
ax.set_ylabel('Intensity / ADU')
ax.legend()
else:
ppt_data = dc[ppt_source, 'data.bunchPatternTable']
```
%% Cell type:markdown id: tags:
### Count pulses per train and compute offsets
%% Cell type:code id: tags:
``` python
# Based on the pulse pattern tables, three global variables are obtained:
#
# * `pulse_counts [int32: len(dc.train_ids)]` containing the number of pulses per train.
# * `pulse_offsets [int32: len(dc.train_ids)]` containing the global offset for the first pulse of each train.
# * `num_pulses = pulse_counts.sum(axis=0)`
def get_pulse_positions(ppt, sase, laser, ppl_offset):
# Combine FEL and PPL positions.
fel_pos = indices_at_sase(ppt, sase) if not ignore_fel else np.array([])
ppl_pos = indices_at_laser(ppt, laser) if not ignore_ppl else np.array([])
if len(fel_pos) > 0:
# Move PPL up to the FEL position.
ppl_pos += fel_pos[0] + ppl_offset
return np.union1d(fel_pos, ppl_pos), fel_pos, ppl_pos
if laser_ppt_mask < 0:
# If laser PPT mask is not specified, try to figure it out from device IDs.
from euxfel_bunch_pattern import PPL_BITS
instrument = karabo_id[:karabo_id.index('_')]
try:
laser_ppt_mask = PPL_BITS[f'LP_{instrument}']
except KeyError:
raise ValueError(f'Laser PPT mask unknown for instrument `{instrument}`')
with timing('pulse_info'):
psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_triggers))
# Build the pulse index
pulse_counts = psh.alloc(shape=len(dc.train_ids), dtype=np.uint64)
has_ppt = psh.alloc(shape=len(dc.train_ids), dtype=bool, fill=False)
def count_pulses(wid, index, tid, ppt):
pulse_counts[index] = len(get_pulse_positions(ppt, instrument_sase, laser_ppt_mask, ppl_offset)[0])
has_ppt[index] = True
psh.map(count_pulses, ppt_data)
# Fill any missing values with the highest.
pulse_counts[has_ppt == False] = pulse_counts.max()
# Compute offsets based on pulse counts.
pulse_offsets = np.zeros_like(pulse_counts)
np.cumsum(pulse_counts[:-1], out=pulse_offsets[1:])
# Total number of pulses.
num_pulses = int(pulse_counts.sum())
```
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=1, ncols=1, nrows=1, figsize=(9, 4), clear=True)
ax.set_title('Pulse count')
ax.plot(dc.train_ids, pulse_counts, lw=1)
ax.set_xlabel('Train ID')
ax.set_ylabel('Number of pulses')
ax.set_ylim(0, max(300, pulse_counts.max() + 10))
ax.ticklabel_format(style='plain')
pass
```
%% Cell type:markdown id: tags:
### Find triggers
The trigger defines the boundary of a pulse on the digitizer trace, which is stored per train.
%% Cell type:code id: tags:
``` python
# A trigger defines the boundary of a pulse on the digitizer trace stored per train. This cell creates a
# global variable:
# * `triggers [(start: int32, stop: int32, offset: float64, fel: bool, ppl: bool): num_pulses]`
# containing the triggers for each pulse.
#
# This uses the pulse puttern table to locate the pulse positions on the trace. Only number of pulses and
# their distance can be drawn this way, leaving the absolute offset for the very first pulse to be
# configured via `trigger/ppt/first_pulse_offset`. If a PPL is used, it will be included in the trigger
# pattern. The ppt_offset parameter allows taking into account an offset betwen PPL and FEL.
psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_triggers))
triggers = psh.alloc(shape=(num_pulses,), dtype=trigger_dt, fill=(-1, -1, np.nan, -1, 0, 0))
clock_factor = remi['digitizer']['clock_factor']
def trigger_by_ppt(worker_id, index, train_id, ppt):
all_pos, fel_pos, ppl_pos = get_pulse_positions(ppt, instrument_sase, laser_ppt_mask, ppl_offset)
num_pulses = len(all_pos)
if num_pulses == 0:
return
elif len(ppl_pos) == 0 and ppl_offset < 0:
# No PPL pulses, but a negative offset is configured. This will cause
# first_pulse_offset to start early and most likely miss pulses at the
# end, so we correct by adding the ppl_offset to relative positions
# when computing trace positions.
pos_corr = abs(ppl_offset)
else:
pos_corr = 0
rel_pos = all_pos - all_pos[0]
if num_pulses > 1:
pulse_len = np.unique(rel_pos[1:] - rel_pos[:-1]).min()
elif num_pulses == 1:
pulse_len = single_pulse_length
start_frac = first_pulse_offset + (rel_pos + pos_corr) * 2 * clock_factor
start_int = start_frac.astype(int)
pulse_offset = pulse_offsets[index]
pulse_count = pulse_counts[index]
train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]
train_triggers['start'] = start_int + pulse_start_offset
train_triggers['stop'] = start_int + int(pulse_len * 2 * clock_factor) - 1 + pulse_end_offset
train_triggers['offset'] = start_frac - start_int
train_triggers['pulse'] = all_pos.astype(np.int16)
train_triggers['fel'] = [pos in fel_pos for pos in all_pos]
train_triggers['ppl'] = [pos in ppl_pos for pos in all_pos]
with timing('find_triggers'):
psh.map(trigger_by_ppt, ppt_data)
if (np.unique(triggers['pulse'][1:] - triggers['pulse'][:-1]) > 0).sum() > 1:
# There is more than one delta between pulse entries across all pulses. This is not
# necessarily a problem, as the pattern could simply have changed in between trains
# with each train being split properly.
# If there's more than one delta in a single train, this likely points to a mismatch
# of FEL and PPL repetition rate. This is most likely not intended.
one = np.uint64(1) # Because np.uint64 + int = np.float64
pulse_deltas = set()
for pulse_id, (offset, count) in enumerate(zip(pulse_offsets, pulse_counts)):
deltas = triggers['pulse'][offset+one:offset+count] - triggers['pulse'][offset:offset+count-one]
if len(np.unique(deltas)) > 1:
for delta in deltas:
pulse_deltas.add(delta)
if len(pulse_deltas) > 1:
delta_str = ', '.join([str(x) for x in sorted(pulse_deltas)])
print(f'WARNING: Different pulse lengths (PPT: {delta_str}) encountered within single trains, '
f'separated pulse spectra may split up signals!')
warning(f'Different pulse lengths (PPT: {delta_str}) encountered within single trains, '
f'separated pulse spectra may split up signals!')
else:
print('WARNING: Different pulse lengths encountered across trains, separation may be unstable!')
warning('Different pulse lengths encountered across trains, separation may be unstable!')
```
%% Cell type:code id: tags:
``` python
fig, (lx, rx) = plt.subplots(num=2, ncols=2, nrows=1, figsize=(9, 4), clear=True,
gridspec_kw=dict(top=0.75))
# Display ~400 pulses or 10 trains, whatever is lower
n_trains = max(abs(pulse_offsets - 200).argmin(), 5)
visible_triggers = triggers[:pulse_offsets[n_trains]]
pulse_index = np.arange(len(visible_triggers))
pumped = visible_triggers['fel'] & visible_triggers['ppl']
fel_only = visible_triggers['fel'] & ~pumped
ppl_only = visible_triggers['ppl'] & ~pumped
lx.plot(pulse_index[pumped], visible_triggers[pumped]['start'], ' .', ms=3, c='C0', label='FEL+PPL')
lx.plot(pulse_index[fel_only], visible_triggers[fel_only]['start'], '.', ms=3, c='C1', label='FEL-only')
lx.plot(pulse_index[ppl_only], visible_triggers[ppl_only]['start'], '.', ms=2, c='C2', label='PPL-only')
max_start = visible_triggers['start'].max()
lx.vlines(pulse_offsets[:n_trains], 0, max_start, color='grey', linewidth=1, alpha=0.2)
lx.tick_params(right=True)
lx.set_xlabel('Pulse index')
lx.set_xlim(-15, pulse_offsets[n_trains]+15)
lx.set_ylabel('Trigger position')
lx.set_ylim(-max_start // 20, max_start + max_start // 20)
lx.legend(fontsize='small', loc='lower right')
train_lx = lx.twiny()
train_lx.set_xlabel('Train ID', labelpad=8)
train_lx.set_xlim(lx.get_xlim())
train_lx.set_xticks(pulse_offsets[:n_trains])
train_lx.set_xticklabels([str(int(x)) for x in dc.train_ids[:n_trains]],
rotation=-45, fontsize='x-small')
rx.plot(triggers['start'], lw=0.2)
rx.set_xlabel('Pulse index')
rx.tick_params(left=False, labelleft=False, right=True, labelright=True)
pass
```
%% Cell type:markdown id: tags:
# Analog signal to digital edges
%% Cell type:markdown id: tags:
### Find edges in analog signal
%% Cell type:code id: tags:
``` python
psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_edges))
threadpool_limits(limits=remi.get_num_workers(mt_avg_trace))
edges_by_det = {}
avg_traces_by_det = {}
det_data = {}
for det_name, det in remi['detector'].items():
for i, (det_name, det) in enumerate(remi['detector'].items()):
det_sourcekeys = remi.get_detector_sourcekeys(det_name)
det_get_traces = remi.get_traces_getter(det_name)
trace_len = dc[next(iter(det_sourcekeys))].entry_shape[0]
edges = psh.alloc(shape=(num_pulses, 7, det['max_hits']),
dtype=np.float64, fill=np.nan)
amplitudes = psh.alloc(shape=(num_pulses, 7, det['max_hits']),
dtype=np.float64, fill=np.nan)
avg_traces = psh.alloc_per_worker(shape=(7, trace_len), dtype=np.float64)
def prepare_edge_worker(worker_id):
correct_func = remi.get_baseline_corrector()
discr_func, discr_params = remi.get_discriminator(det['channels'])
source_name = remi['digitizer']['source']
bl_start, bl_stop, _ = remi.get_baseline_limits(trace_len)
bl_sym = remi['digitizer']['baseline_symmetry']
time_cal = remi.get_time_calibration()
traces_corr = np.empty((7, trace_len), dtype=np.float64)
baselines = np.empty(bl_sym, dtype=np.float64)
yield
@psh.with_init(prepare_edge_worker)
def find_edges(worker_id, index, train_id, data):
try:
data = det_get_traces(data[source_name])
except KeyError:
return
for channel_idx in range(7):
correct_func(data[channel_idx], traces_corr[channel_idx],
baselines, bl_start, bl_stop)
avg_traces[worker_id] += traces_corr
pulses_slice = np.s_[pulse_offsets[index]:pulse_offsets[index]+pulse_counts[index]]
for trigger, pulse_edges in zip(triggers[pulses_slice], edges[pulses_slice]):
for trigger, pulse_edges, pulse_amplitudes in zip(
triggers[pulses_slice], edges[pulses_slice], amplitudes[pulses_slice]
):
trigger_slice = np.s_[trigger['start']:trigger['stop']]
for trace, channel_params, channel_edges in zip(traces_corr, discr_params, pulse_edges):
discr_func(trace[trigger_slice], channel_edges, **channel_params)
pulse_edges += trigger['offset']
pulse_edges *= time_cal
for trace, channel_params, channel_edges, channel_amplitudes in zip(
traces_corr, discr_params, pulse_edges, pulse_amplitudes
):
discr_func(trace[trigger_slice], edges=channel_edges,
amplitudes=channel_amplitudes, **channel_params)
with timing(f'find_edges, {det_name}'):
psh.map(find_edges, dc.select(det_sourcekeys))
edges_by_det[det_name] = edges
avg_traces_by_det[det_name] = avg_traces.sum(axis=0) / len(dc.train_ids)
if not np.isfinite(edges).any():
warning(f'No edges found for {det_name}')
with np.printoptions(precision=2, suppress=True):
print(edges[:5, :, :8])
fig, (ux, bx) = plt.subplots(num=110+i, ncols=1, nrows=2, figsize=(9.5, 8), clear=True,
gridspec_kw=dict(left=0.1, right=0.98, top=0.98, bottom=0.1, hspace=0.25))
fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')
for edge_idx, edge_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):
ux.hist(finite_flattened_slice(amplitudes, np.s_[:, edge_idx, :]),
bins=1000, range=(0, 2048), histtype='step', lw=1,
color=f'C{edge_idx}' if edge_idx < 6 else 'k', label=edge_name)
cur_edges = finite_flattened_slice(edges, np.s_[:, edge_idx, :])
bx.hist(cur_edges - np.floor(cur_edges), bins=500, range=(0, 1), histtype='step',
lw=1, color=f'C{edge_idx}' if edge_idx < 6 else 'k', label=edge_name)
ux.legend()
ux.set_title('Pulse height distributions')
ux.set_xlabel('Pulse height')
ux.set_yscale('log')
ux.set_xlim(0, 2048)
ux.set_ylim(10, 1.5*ux.get_xlim()[1])
bx.set_title('Fractional edge distributions')
bx.set_xlabel('Edge positions - ⌊edge positions⌋')
bx.set_yscale('log')
bx.set_xlim(-0.05, 1.2)
bx.legend()
# Properly offset edges to their trigger offset and convert to time.
# This is not done earlier to preserve the information for plotting.
edges += triggers['offset'][:, None, None]
edges *= remi.get_time_calibration()
det_data[det_name] = {
'edges': edges,
'amplitudes': amplitudes,
'avg_trace': avg_traces.sum(axis=0) / len(dc.train_ids)
}
```
%% Cell type:markdown id: tags:
### Global average of analog signals
%% Cell type:code id: tags:
``` python
for i, det_name in enumerate(remi['detector'].keys()):
fig, axs = plt.subplots(num=10+i, nrows=7, figsize=(9.5, 8), clear=True,
gridspec_kw=dict(left=0.1, right=0.98, top=0.98, bottom=0.1))
fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')
for edge_idx, edge_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):
axs[edge_idx].plot(avg_traces_by_det[det_name][edge_idx], lw=1)
axs[edge_idx].plot(det_data[det_name]['avg_trace'][edge_idx], lw=1)
axs[edge_idx].tick_params(labelbottom=False)
axs[edge_idx].set_ylabel(edge_name)
axs[-1].tick_params(labelbottom=True)
pass
```
%% Cell type:markdown id: tags:
### Sample for digitized traces
### Sample for found edges
%% Cell type:code id: tags:
``` python
for i, det_name in enumerate(remi['detector'].keys()):
edges = edges_by_det[det_name]
edges = det_data[det_name]['edges']
fig = plt.figure(num=100+i, figsize=(9.5, 8))
grid = fig.add_gridspec(ncols=2, nrows=4, left=0.1, right=0.98, top=0.98, bottom=0.1)
fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')
for signal_idx, signal_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):
row = (1 + signal_idx // 2) if signal_idx < 6 else 0
col = (signal_idx % 2) if signal_idx < 6 else np.s_[:]
ax = fig.add_subplot(grid[row, col])
finite_edges = np.isfinite(edges[:, signal_idx, 0])
if not finite_edges.any():
warning(f'No edges found for {det_name}/{signal_name}')
continue
pulse_idx = np.uint64(finite_edges.nonzero()[0][0]) # Is combined with other uint64 values below.
train_idx = (pulse_idx >= pulse_offsets).nonzero()[0][-1]
trigger = triggers[pulse_idx]
sourcekey = remi.get_channel_sourcekey(
remi['detector'][det_name]['channels'][signal_idx])
train_trace = dc[sourcekey].select_trains(np.s_[train_idx:train_idx+1]).ndarray()[0]
corr_trace = np.zeros_like(train_trace, dtype=np.float64)
remi.get_baseline_corrector()(
train_trace, corr_trace,
np.empty(remi['digitizer']['baseline_symmetry'], dtype=np.float64),
*remi.get_baseline_limits(len(train_trace))[:2])
pulse_trace = corr_trace[np.s_[trigger['start']:trigger['stop']]]
x_time = remi.get_time_calibration() * (np.arange(len(pulse_trace)) + trigger['offset'])
ax.plot(x_time, pulse_trace, lw=1)
ax.set_xlim(x_time[0], x_time[-1])
ax.set_ylim(-200, pulse_trace.max()*1.1)
ax.text(x_time[-1], pulse_trace.max(),
f'T{train_idx} P{pulse_idx - pulse_offsets[train_idx]} ',
va='top', ha='right')
ax.tick_params(labelbottom=False)
ax.set_ylabel(signal_name)
ax.vlines(edges[pulse_idx, signal_idx, :], *ax.get_ylim(), color='red', linewidth=1)
pass
```
%% Cell type:markdown id: tags:
### Digitized channel spectra
%% Cell type:code id: tags:
``` python
for i, det_name in enumerate(remi['detector'].keys()):
fig = plt.figure(num=20+i, figsize=(9.5, 6))
edges = edges_by_det[det_name]
edges = det_data[det_name]['edges']
min_edge = edges[np.isfinite(edges)].min()
max_edge = edges[np.isfinite(edges)].max()
min_edge = np.nanmin(edges)
max_edge = np.nanmax(edges)
grid = fig.add_gridspec(ncols=3, nrows=3, left=0.08, right=0.98, top=0.95, hspace=0.4)
fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')
numx = fig.add_subplot(grid[0, 0])
numx.set_title('Edges per pulse')
agg_window = num_pulses // 60
max_num_edges = 0.0
max_spectral_intensity = 0
hist_axs = []
for edge_idx, edge_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):
if edge_idx < 6:
row = 1 + edge_idx % 2
col = edge_idx // 2
else:
row = 0
col = np.s_[1:3]
ax = fig.add_subplot(grid[row, col])
ax.set_title(f'TOF spectrum: {edge_name}')
num_edges = np.isfinite(edges[:, edge_idx, :]).sum(axis=1)
num_edges = num_edges[:((len(num_edges) // agg_window) * agg_window)]
num_edges = num_edges.reshape(-1, agg_window).mean(axis=1)
if (num_edges == 0).all():
warning(f'No edges found for {det_name}/{edge_name}')
continue
if edge_idx < 6:
plot_kwargs = dict(c=f'C{edge_idx}', ls='solid', lw=1.0)
else:
plot_kwargs = dict(c='k', ls='dashed', lw=1.0)
numx.plot(np.arange(len(num_edges)) * agg_window, num_edges, label=edge_name, **plot_kwargs)
max_num_edges = max(max_num_edges, num_edges.max())
cur_edges = edges[:, edge_idx, :].flatten()
if edge_idx < 6:
row = 1 + edge_idx % 2
col = edge_idx // 2
else:
row = 0
col = np.s_[1:3]
ax = fig.add_subplot(grid[row, col])
ax.set_title(f'TOF spectrum: {edge_name}')
y, _, _ = ax.hist(cur_edges[np.isfinite(cur_edges)], bins=int((max_edge - min_edge) // 5),
range=(min_edge, max_edge), color=plot_kwargs['c'], histtype='step', linewidth=1)
y, _, _ = ax.hist(finite_flattened_slice(edges, np.s_[:, edge_idx, :]),
bins=int((max_edge - min_edge) // 5), range=(min_edge, max_edge),
color=plot_kwargs['c'], histtype='step', linewidth=1)
hist_axs.append(ax)
max_spectral_intensity = max(max_spectral_intensity, y.max())
numx.tick_params(labelbottom=False)
numx.set_ylim(0, 1.2*max_num_edges)
for ax in hist_axs:
ax.set_ylim(0, max_spectral_intensity*1.1)
ax.ticklabel_format(axis='y', style='sci', scilimits=(0, 3))
pass
```
%% Cell type:markdown id: tags:
# Detector diagnostics
%% Cell type:code id: tags:
``` python
for i, det_name in enumerate(remi['detector'].keys()):
edges = edges_by_det[det_name]
edges = det_data[det_name]['edges']
sort = remi.get_dld_sorter(det_name)
sum_shifts = sort.sum_shifts if sort.sum_shifts != (0.0, 0.0, 0.0) else None
is_valid = remi.get_presort_mask(edges, edge_idx=0, w=not quad_anode,
sum_limit=max(sort.uncorrected_time_sum_half_widths),
sum_shifts=sum_shifts)
if not is_valid.any():
warning(f'No valid preliminary edge combinations found for {det_name}')
signals, sums = remi.get_signals_and_sums(edges, indices=sort.channel_indices, sum_shifts=sum_shifts,
mask=is_valid)
fig = plot_detector_diagnostics(signals=signals, sums=sums, fig_num=30+i, im_scale=1.5,
sum_range=max(sort.uncorrected_time_sum_half_widths),
sorter=sort)
fig.text(0.02, 0.98, det_name.upper() + ' before corrections', rotation=90, ha='left', va='top', size='x-large')
if remi['detector'][det_name]['use_sum_correction'] or remi['detector'][det_name]['use_pos_correction']:
n_masked = is_valid.sum()
signals = np.full((n_masked, 3), np.nan, dtype=np.float64)
sums = np.full((n_masked, 3), np.nan, dtype=np.float64)
sort.correct(edges[is_valid], signals, sums)
fig = plot_detector_diagnostics(signals=signals, sums=sums, fig_num=40+i, im_scale=1.5,
sum_range=max(sort.uncorrected_time_sum_half_widths),
sorter=sort)
fig.text(0.02, 0.98, det_name.upper() + ' after corrections', rotation=90, ha='left', va='top', size='x-large')
pass
```
%% Cell type:markdown id: tags:
# Hit reconstruction
%% Cell type:code id: tags:
``` python
psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_rec_hits))
signals_by_det = {}
hits_by_det = {}
hit_counts_by_det = {}
for det_name, det in remi['detector'].items():
edges = edges_by_det[det_name]
edges = det_data[det_name]['edges']
signals = psh.alloc(shape=(num_pulses, 50), dtype=signal_dt, fill=np.nan)
hits = psh.alloc(shape=(num_pulses, 50), dtype=hit_dt, fill=(np.nan, np.nan, np.nan, -1))
hit_counts = psh.alloc(shape=len(dc.train_ids), dtype=np.uint32)
def prepare_hit_worker(worker_id):
sort = remi.get_dld_sorter(det_name)
yield
@psh.with_init(prepare_hit_worker)
def reconstruct_hits(worker_id, index, train_id):
hit_counts[index] += sort.run_on_train(
edges, signals, hits, pulse_offsets[index], pulse_offsets[index] + pulse_counts[index])
with timing(f'rec_hits, {det_name}'):
psh.map(reconstruct_hits, dc.train_ids)
signals_by_det[det_name] = signals
hits_by_det[det_name] = hits
hit_counts_by_det[det_name] = hit_counts
det_data[det_name].update(signals=signals, hits=hits, hit_counts=hit_counts)
```
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=50+i, figsize=(9.5, 4), ncols=1, clear=True,
gridspec_kw=dict(top=0.92, right=0.98, left=0.05, bottom=0.12))
max_num_hits = 0.0
for det_name in remi['detector'].keys():
agg_window = num_pulses // 1000
num_hits = np.isfinite(hits_by_det[det_name]['x']).sum(axis=1)
num_hits = np.isfinite(det_data[det_name]['hits']['x']).sum(axis=1)
num_hits = num_hits[:(len(num_hits) // agg_window) * agg_window]
num_hits = num_hits.reshape(-1, agg_window).mean(axis=1)
max_num_hits = max(max_num_hits, num_hits.max())
ax.plot(np.arange(0, (num_pulses // agg_window) * agg_window, agg_window), num_hits,
lw=1, label=det_name.upper())
ax.set_title('Hits per pulse')
ax.set_xlabel('Pulse index')
ax.set_ylim(0, max_num_hits*1.1)
ax.legend()
pass
```
%% Cell type:markdown id: tags:
### Reconstruction methods
Each hit may be reconstructed by one of 19 different methods. These differ by the number of real signals across the channels, which could be combined to form the hit. Each of these methods is designed by a number between `0` and `19` (with empty hits using `-1`), which can be found in the `m` key of a hit, e.g.:
* `0`: All six anode signals and the corresponding MCP signal were found.
* `4`: One signal on layer `u` is missing, all other signals for this event were found.
* `18`: Only one anode signal on each layer was found and the MCP signal is missing. There is no way to check whether this combination of signals is actually valid.
| Method | `u+v+w +mcp` |
| - | - |
| 0 | `2+2+2 +1` |
| 1 | `0+2+2 +1` |
| 2 | `2+0+2 +1` |
| 3 | `2+2+0 +1` |
| 4 | `1+2+2 +1` (2 permutations) |
| 5 | `2+1+2 +1` (2 permutations) |
| 6 | `2+2+1 +1` (2 permutations) |
| 7 | `2+2+2 +0` |
| 8 | `0+2+2 +0` |
| 9 | `2+0+2 +0` |
| 10 | `2+2+0 +0` |
| 11 | `1+2+2 +0` (2 permutations) |
| 12 | `2+1+2 +0` (2 permutations) |
| 13 | `2+2+1 +0` (2 permutations) |
| 14 | `2+1+1 +1` `1+2+1 +1` `1+1+2 +1` (12 permutations) |
| 15 | `2+1+0 +1` `2+0+1 +1` `1+2+0 +1` `1+0+2 +1` `0+2+1 +1` `0+1+2 +1` (12 permutations) |
| 16 | `1+1+1 +1` (8 permutations) |
| 17 | `2+1+1 +0` `1+2+1 +0` `1+1+2 +0` (12 permutations) |
| 18 | `1+1+1 +0` (8 permutations) |
| 19 | `2+1+0 +0` `2+0+1 +0` `1+2+0 +0` `1+0+2 +0` `0+1+2 +0` `0+2+1 +0` (12 permutations) |
* For hits reconstructed with method `> 10`, extra attention should be given to ensure they add meaningful signal.
* Any method `> 14` has to considered risky, because neither a time sum nor the position can be checked. If the scale factors and/or `w` shift are not correct, then the number of events reconstructed with the risky methods will increase. They will most likely be *ghost hits*, which do not correspond to actual impacts on the detector.
%% Cell type:code id: tags:
``` python
for i, det_name in enumerate(remi['detector'].keys()):
hits = hits_by_det[det_name]
hits = det_data[det_name]['hits']
fig, ax = plt.subplots(num=60+i, figsize=(9.5, 5), ncols=1, clear=True,
gridspec_kw=dict(left=0.08, right=0.91, top=0.8))
fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')
if not (hits['m'] >= 0).any():
warning(f'No hits found for {det_name}')
continue
method_bins = np.bincount(hits['m'][hits['m'] >= 0], minlength=20)
ax.bar(np.arange(20), method_bins, width=0.5)
ax.set_xlabel('Reconstruction method')
ax.set_xlim(-0.5, 19.5)
ax.set_xticks(np.arange(20))
ax.set_ylabel('Number of hits')
ax.set_ylim(0, method_bins.max()*1.05)
ylims = ax.get_ylim()
ax.tick_params(which='both', right=True, labelright=True)
num_risky = method_bins[15:].sum()
num_total = method_bins.sum()
ax.text(14.2, method_bins.max(), f'{(100*(num_total-num_risky)/num_total):.2g}%',
va='top', ha='right', color='black')
ax.text(14.8, method_bins.max(), f'{(100*num_risky/num_total):.2g}%',
va='top', ha='left', color='red')
ax.fill([14.5, 19.5, 19.5, 14.5], [ylims[0], ylims[0], ylims[1], ylims[1]], c='r', alpha=0.2)
labelx = ax.twiny()
labelx.set_xlim(*ax.get_xlim())
labelx.set_xticks(ax.get_xticks())
labelx.set_xticklabels([
'2+2+2 +1',
'0+2+2 +1', '2+0+2 +1', '2+2+0 +1',
'1+2+2 +1', '2+1+2 +1', '2+2+1 +1',
'2+2+2 +0',
'0+2+2 +0', '2+0+2 +0', '2+2+0 +0', '1+2+2 +0', '2+1+2 +0', '2+2+1 +0',
'2+1+1 +1',
'2+1+0 +1',
'1+1+1 +1',
'2+1+1 +0',
'1+1+1 +0',
'2+1+0 +0',
], rotation=90)
min_rel_tick = np.ceil((ax.get_ylim()[0] / num_total) / 0.1) * 0.1
max_rel_tick = np.floor((method_bins.max() / num_total) / 0.1) * 0.1
rely = ax.twinx()
rely.set_ylim(*ax.get_ylim())
rely.set_yticks(np.arange(0.0, max_rel_tick+0.01, 0.1)*num_total)
rely.set_yticks(np.arange(0.0, ylims[1]/num_total, 0.02)*num_total, minor=True)
rely.set_yticklabels([f'{(y/num_total)*100:.0f}%' for y in rely.get_yticks()])
rely.set_ylabel('Percentage of total hits')
pass
```
%% Cell type:markdown id: tags:
### Detector image and fishes
%% Cell type:code id: tags:
``` python
for i, det_name in enumerate(remi['detector'].keys()):
flat_hits = hits_by_det[det_name].reshape(-1)
flat_hits = det_data[det_name]['hits'].reshape(-1)
flat_hits = flat_hits[np.isfinite(flat_hits[:]['x'])]
flat_hits = flat_hits[flat_hits['m'] < 10]
flat_hits = flat_hits[flat_hits['m'] <= 10]
fig = plt.figure(num=70+i, figsize=(9, 13.5))
fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')
fig.text(0.02, 0.02, det_name.upper(), rotation=90, ha='left', va='bottom', size='x-large')
imp = fig.add_axes([0.1 + 0.25/2, 0.56, 0.6, 0.4])
txp = fig.add_axes([0.1, 0.28, 0.85, 0.22])
typ = fig.add_axes([0.1, 0.04, 0.85, 0.22])
im_radius = remi['detector'][det_name]['mcp_radius']*1.1
if flat_hits.size == 0:
warning(f'No hits found for {det_name}')
continue
min_tof = flat_hits['t'].min()
max_tof = flat_hits['t'].max()
im_radius = remi['detector'][det_name]['mcp_radius']*1.1
imp.hist2d(flat_hits['x'], flat_hits['y'], bins=(256, 256),
range=[[-im_radius, im_radius], [-im_radius, im_radius]], norm=LogNorm())
imp.xaxis.set_label_position('top')
imp.set_xlabel('X / mm')
imp.set_ylabel('Y / mm')
imp.tick_params(right=True, labelright=True, top=True, labeltop=True)
imp.grid()
min_tof = flat_hits['t'].min()
max_tof = flat_hits['t'].max()
num_tof_bins = int((max_tof - min_tof) // 5)
if num_tof_bins == 0:
warning(f'All TOFs limited to single bin for {det_name}')
continue
for ax, dim_label in zip([txp, typ], ['x', 'y']):
ax.hist2d(flat_hits['t'], flat_hits[dim_label], bins=(int((max_tof - min_tof) // 5), 256),
ax.hist2d(flat_hits['t'], flat_hits[dim_label], bins=(num_tof_bins, 256),
range=[[min_tof, max_tof], [-im_radius, im_radius]], norm=LogNorm())
ax.set_ylabel(f'{dim_label.upper()} / mm')
typ.set_xlabel('Time-of-flight / ns')
txp.tick_params(bottom=True, labelbottom=False, top=True, labeltop=True, right=True, labelright=True)
typ.tick_params(right=True, labelright=True, top=True)
pass
```
%% Cell type:markdown id: tags:
# Transformed data files
%% Cell type:code id: tags:
``` python
# Try to figure out proposal number from in_folder to work with older files.
m = re.match(r'p(\d{6})', Path(in_folder).parts[-2])
if not proposal and m is not None:
proposal = int(m[1])
seq_len = out_seq_len if out_seq_len > 0 else len(dc.files[0].train_ids)
dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}
control_sources = [det_device_id.format(karabo_id=karabo_id, det_name=det_name.upper())
for det_name in remi['detector']]
channels = []
if save_raw_triggers or save_raw_edges:
channels.append('raw')
if save_rec_signals or save_rec_hits:
channels.append('rec')
instrument_channels = [
f'{device_id}:{det_output_key}/{channel}'
for device_id in control_sources
for channel in channels
]
```
%% Cell type:code id: tags:
``` python
Path(out_folder).mkdir(parents=True, exist_ok=True)
print('Writing sequence files', flush=True, end='')
t_write = timing('write_files')
t_write.__enter__()
for seq_id, train_mask, pulse_mask in sequence_pulses(dc.train_ids, pulse_counts, pulse_offsets, seq_len):
seq_train_ids = dc.train_ids[train_mask]
with DataFile.from_details(out_folder, out_aggregator, run, seq_id) as outp:
outp.create_metadata(like=dc, proposal=proposal, run=run, sequence=seq_id,
control_sources=control_sources, instrument_channels=instrument_channels)
outp.create_index(seq_train_ids)
for det_name in remi['detector']:
cur_device_id = det_device_id.format(karabo_id=karabo_id, det_name=det_name.upper())
cur_control_data = outp.create_control_source(cur_device_id)
# Manually manipulate the file here, still creates the index properly.
remi.attach_detector_config(det_name, cur_control_data.get_run_group())
cur_control_data.create_index(len(seq_train_ids))
cur_fast_data = outp.create_instrument_source(f'{cur_device_id}:{det_output_key}')
cur_data = det_data[det_name]
if save_raw_triggers:
cur_fast_data.create_key('raw.triggers', triggers[pulse_mask],
chunks=tuple(chunks_triggers), **dataset_kwargs)
if save_raw_edges:
cur_fast_data.create_key('raw.edges', edges_by_det[det_name][pulse_mask],
cur_fast_data.create_key('raw.edges', cur_data['edges'][pulse_mask],
chunks=tuple(chunks_edges), **dataset_kwargs)
if save_raw_amplitudes:
cur_fast_data.create_key('raw.amplitudes', cur_data['amplitudes'][pulse_mask],
chunks=tuple(chunks_amplitudes), **dataset_kwargs)
if save_rec_signals:
cur_fast_data.create_key('rec.signals', signals_by_det[det_name][pulse_mask],
cur_fast_data.create_key('rec.signals', cur_data['signals'][pulse_mask],
chunks=tuple(chunks_signals), **dataset_kwargs)
if save_rec_hits:
cur_fast_data.create_key('rec.hits', hits_by_det[det_name][pulse_mask],
cur_fast_data.create_key('rec.hits', cur_data['hits'][pulse_mask],
chunks=tuple(chunks_hits), **dataset_kwargs)
cur_fast_data.create_index(raw=pulse_counts[train_mask], rec=pulse_counts[train_mask])
print('.', flush=True, end='')
print('')
t_write.__exit__()
```
......
%% Cell type:markdown id: tags:
# ePix100 Data Correction
Author: European XFEL Detector Group, Version: 2.0
The following notebook provides data correction of images acquired with the ePix100 detector.
The sequence of correction applied are:
Offset --> Common Mode Noise --> Relative Gain --> Charge Sharing --> Absolute Gain.
Offset, common mode and gain corrected data is saved to /data/image/pixels in the CORR files.
If pattern classification is applied (charge sharing correction), this data will be saved to /data/image/pixels_classified, while the corresponding patterns will be saved to /data/image/patterns in the CORR files.
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/HED/202202/p003121/raw" # input folder, required
out_folder = "" # output folder, required
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
sequences = [-1] # sequences to correct, set to -1 for all, range allowed
sequences_per_node = 1 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel
run = 156 # which run to read data from, required
# Parameters for accessing the raw data.
karabo_id = "HED_IA1_EPX100-1" # karabo karabo_id
karabo_da = "EPIX01" # data aggregators
db_module = "" # module id in the database
receiver_template = "RECEIVER" # detector receiver template for accessing raw data files
path_template = 'RAW-R{:04d}-{}-S{{:05d}}.h5' # the template to use to access data
instrument_source_template = '{}/DET/{}:daqOutput' # instrument detector data source in h5files
# Parameters affecting writing corrected data.
chunk_size_idim = 1 # H5 chunking size of output data
limit_trains = 0 # Process only first N images, 0 - process all.
# Parameters for the calibration database.
cal_db_interface = "tcp://max-exfl016:8015#8025" # calibration DB interface to use
cal_db_timeout = 300000 # timeout on caldb requests
creation_time = "" # The timestamp to use with Calibration DBe. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
# Conditions for retrieving calibration constants.
bias_voltage = 200 # bias voltage
in_vacuum = False # detector operated in vacuum
integration_time = -1 # Detector integration time, Default value -1 to use the value from the slow data.
fix_temperature = -1 # fixed temperature value in Kelvin, Default value -1 to use the value from files.
gain_photon_energy = 8.048 # Photon energy used for gain calibration
photon_energy = 0. # Photon energy to calibrate in number of photons, 0 for calibration in keV
# Flags to select type of applied corrections.
pattern_classification = True # do clustering.
relative_gain = True # Apply relative gain correction.
absolute_gain = True # Apply absolute gain correction (implies relative gain).
common_mode = True # Apply common mode correction.
# Parameters affecting applied correction.
cm_min_frac = 0.25 # No CM correction is performed if after masking the ratio of good pixels falls below this
cm_noise_sigma = 5. # CM correction noise standard deviation
split_evt_primary_threshold = 7. # primary threshold for split event correction
split_evt_secondary_threshold = 5. # secondary threshold for split event correction
split_evt_mip_threshold = 1000. # minimum ionizing particle threshold
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da)
```
%% Cell type:code id: tags:
``` python
import tabulate
import warnings
from logging import warning
from sys import exit
import h5py
import pasha as psh
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import Latex, display
from IPython.display import Latex, Markdown, display
from extra_data import RunDirectory, H5File
from pathlib import Path
import cal_tools.restful_config as rest_cfg
from XFELDetAna import xfelpyanatools as xana
from XFELDetAna import xfelpycaltools as xcal
from cal_tools.calcat_interface import EPIX100_CalibrationData
from cal_tools.epix100 import epix100lib
from cal_tools.files import DataFile
from cal_tools.restful_config import restful_config
from cal_tools.tools import (
calcat_creation_time,
CalibrationMetadata,
write_constants_fragment,
)
from cal_tools.step_timing import StepTimer
warnings.filterwarnings('ignore')
prettyPlotting = True
%matplotlib inline
```
%% Cell type:code id: tags:
``` python
x = 708 # rows of the ePix100
y = 768 # columns of the ePix100
if absolute_gain:
relative_gain = True
plot_unit = 'ADU'
```
%% Cell type:code id: tags:
``` python
in_folder = Path(in_folder)
out_folder = Path(out_folder)
out_folder.mkdir(parents=True, exist_ok=True)
run_folder = in_folder / f"r{run:04d}"
instrument_src = instrument_source_template.format(
karabo_id, receiver_template)
print(f"Correcting run: {run_folder}")
print(f"Instrument H5File source: {instrument_src}")
print(f"Data corrected files are stored at: {out_folder}")
```
%% Cell type:code id: tags:
``` python
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f"Using {creation_time.isoformat()} as creation time")
```
%% Cell type:code id: tags:
``` python
run_dc = RunDirectory(run_folder, _use_voview=False)
seq_files = [Path(f.filename) for f in run_dc.select(f"*{karabo_id}*").files]
# If a set of sequences requested to correct,
# adapt seq_files list.
if sequences != [-1]:
seq_files = [f for f in seq_files if any(f.match(f"*-S{s:05d}.h5") for s in sequences)]
if not len(seq_files):
raise IndexError("No sequence files available for the selected sequences.")
print(f"Processing a total of {len(seq_files)} sequence files")
```
%% Cell type:code id: tags:
``` python
step_timer = StepTimer()
```
%% Cell type:code id: tags:
``` python
step_timer.start()
sensorSize = [x, y]
# Sensor area will be analysed according to blocksize
blockSize = [sensorSize[0]//2, sensorSize[1]//2]
xcal.defaultBlockSize = blockSize
memoryCells = 1 # ePIX has no memory cells
run_parallel = False
# Read control data.
ctrl_data = epix100lib.epix100Ctrl(
run_dc=run_dc,
instrument_src=instrument_src,
ctrl_src=f"{karabo_id}/DET/CONTROL",
)
if integration_time < 0:
integration_time = ctrl_data.get_integration_time()
integration_time_str_add = ""
else:
integration_time_str_add = "(manual input)"
if fix_temperature < 0:
temperature = ctrl_data.get_temprature()
temperature_k = temperature + 273.15
temp_str_add = ""
else:
temperature_k = fix_temperature
temperature = fix_temperature - 273.15
temp_str_add = "(manual input)"
print(f"Bias voltage is {bias_voltage} V")
print(f"Detector integration time is set to {integration_time} \u03BCs {integration_time_str_add}")
print(f"Mean temperature: {temperature:0.2f}°C / {temperature_k:0.2f} K {temp_str_add}")
print(f"Operated in vacuum: {in_vacuum}")
```
%% Cell type:code id: tags:
``` python
# Table of sequence files to process
table = [(k, f) for k, f in enumerate(seq_files)]
if len(table):
md = display(Latex(tabulate.tabulate(
table,
tablefmt='latex',
headers=["#", "file"]
)))
```
%% Cell type:markdown id: tags:
## Retrieving calibration constants
As a first step, dark maps have to be loaded.
As a first step, calibration constants have to be loaded.
%% Cell type:code id: tags:
``` python
constant_names = ["OffsetEPix100", "NoiseEPix100"]
if relative_gain:
constant_names += ["RelativeGainEPix100"]
epix_cal = EPIX100_CalibrationData(
detector_name=karabo_id,
sensor_bias_voltage=bias_voltage,
integration_time=integration_time,
sensor_temperature=temperature_k,
in_vacuum=in_vacuum,
source_energy=gain_photon_energy,
event_at=creation_time,
client=rest_cfg.calibration_client(),
)
const_metadata = epix_cal.metadata(calibrations=constant_names)
mod_to_pdu = epix_cal.mod_to_pdu
ccvs_url = "https://in.xfel.eu/calibration/calibration_constant_versions/"
for mod, mod_md in const_metadata.items():
display(Markdown(f"{mod}({mod_to_pdu[mod]}):"))
for cname, c_mdata in mod_md.items():
display(Markdown(
f"- [{cname}]({ccvs_url}/{c_mdata['ccv_id']}): {c_mdata['begin_validity_at']}"))
# Load the constant data from files
const_data = epix_cal.ndarray_map(metadata=const_metadata)[karabo_da]
# Validate the constants availability and raise/warn correspondingly.
missing_dark_constants = {"OffsetEPix100", "NoiseEPix100"} - set(const_data)
if missing_dark_constants:
raise ValueError(
f"Dark constants {missing_dark_constants} are not available to correct {karabo_da}."
"No correction is performed!")
if relative_gain and "RelativeGainEPix100" not in const_data.keys():
warning("RelativeGainEPix100 is not found in the calibration database.")
relative_gain = False
absolute_gain = False
```
%% Cell type:code id: tags:
``` python
# Record constant details in YAML metadata
epix_metadata = const_metadata[karabo_da]
CalibrationMetadata(metadata_folder or out_folder).add_fragment({
"retrieved-constants": {
karabo_da: {
"constants": {
cname: {
"path": str(epix_cal.caldb_root / ccv_metadata["path"]),
"dataset": ccv_metadata["dataset"],
"creation-time": ccv_metadata["begin_validity_at"],
"ccv_id": ccv_metadata["ccv_id"],
} for cname, ccv_metadata in epix_metadata.items()
},
"physical-name": list(epix_metadata.values())[0]["physical_name"],
}
}
})
write_constants_fragment(
out_folder=(metadata_folder or out_folder),
det_metadata=const_metadata,
caldb_root=epix_cal.caldb_root,
)
```
%% Cell type:code id: tags:
``` python
# Initializing some parameters.
hscale = 1
stats = True
hrange = np.array([-50, 1000])
nbins = hrange[1] - hrange[0]
commonModeBlockSize = [x//2, y//2]
```
%% Cell type:code id: tags:
``` python
histCalOffsetCor = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize
)
# *****************Histogram Calculators****************** #
histCalCor = xcal.HistogramCalculator(
sensorSize,
bins=1050,
range=[-50, 1000],
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize
)
```
%% Cell type:code id: tags:
``` python
if common_mode:
histCalCMCor = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize,
)
cmCorrectionB = xcal.CommonModeCorrection(
shape=sensorSize,
blockSize=commonModeBlockSize,
orientation='block',
nCells=memoryCells,
noiseMap=const_data['NoiseEPix100'],
runParallel=run_parallel,
parallel=run_parallel,
stats=stats,
minFrac=cm_min_frac,
noiseSigma=cm_noise_sigma,
)
cmCorrectionR = xcal.CommonModeCorrection(
shape=sensorSize,
blockSize=commonModeBlockSize,
orientation='row',
nCells=memoryCells,
noiseMap=const_data['NoiseEPix100'],
runParallel=run_parallel,
parallel=run_parallel,
stats=stats,
minFrac=cm_min_frac,
noiseSigma=cm_noise_sigma,
)
cmCorrectionC = xcal.CommonModeCorrection(
shape=sensorSize,
blockSize=commonModeBlockSize,
orientation='col',
nCells=memoryCells,
noiseMap=const_data['NoiseEPix100'],
runParallel=run_parallel,
parallel=run_parallel,
stats=stats,
minFrac=cm_min_frac,
noiseSigma=cm_noise_sigma,
)
```
%% Cell type:code id: tags:
``` python
if relative_gain:
gain_cnst = np.median(const_data["RelativeGainEPix100"])
hscale = gain_cnst
plot_unit = 'keV'
if photon_energy > 0:
plot_unit = '$\gamma$'
hscale /= photon_energy
gainCorrection = xcal.RelativeGainCorrection(
sensorSize,
gain_cnst/const_data["RelativeGainEPix100"][..., None],
nCells=memoryCells,
parallel=run_parallel,
blockSize=blockSize,
gains=None,
)
histCalRelGainCor = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize
)
if absolute_gain:
histCalAbsGainCor = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange*hscale,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize
)
```
%% Cell type:code id: tags:
``` python
if pattern_classification :
patternClassifier = xcal.PatternClassifier(
[x, y],
const_data["NoiseEPix100"],
split_evt_primary_threshold,
split_evt_secondary_threshold,
split_evt_mip_threshold,
tagFirstSingles=0,
nCells=memoryCells,
allowElongated=False,
blockSize=[x, y],
parallel=run_parallel,
)
histCalCSCor = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize,
)
histCalGainCorClusters = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange*hscale,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize
)
histCalGainCorSingles = xcal.HistogramCalculator(
sensorSize,
bins=nbins,
range=hrange*hscale,
parallel=run_parallel,
nCells=memoryCells,
blockSize=blockSize
)
```
%% Cell type:markdown id: tags:
## Applying corrections
%% Cell type:code id: tags:
``` python
def correct_train(wid, index, tid, d):
d = d[..., np.newaxis].astype(np.float32)
d = np.compress(
np.any(d > 0, axis=(0, 1)), d, axis=2)
# Offset correction.
d -= const_data["OffsetEPix100"]
histCalOffsetCor.fill(d)
# Common Mode correction.
if common_mode:
# Block CM
d = cmCorrectionB.correct(d)
# Row CM
d = cmCorrectionR.correct(d)
# COL CM
d = cmCorrectionC.correct(d)
histCalCMCor.fill(d)
# relative gain correction.
if relative_gain:
d = gainCorrection.correct(d)
histCalRelGainCor.fill(d)
"""The gain correction is currently applying
an absolute correction (not a relative correction
as the implied by the name);
it changes the scale (the unit of measurement)
of the data from ADU to either keV or n_of_photons.
But the pattern classification relies on comparing
data with the NoiseEPix100 map, which is still in ADU.
The best solution is to do a relative gain
correction first and apply the global absolute
gain to the data at the end, after clustering.
"""
if pattern_classification:
d_clu, patterns = patternClassifier.classify(d)
d_clu[d_clu < (split_evt_primary_threshold*const_data["NoiseEPix100"])] = 0
data_clu[index, ...] = np.squeeze(d_clu)
data_patterns[index, ...] = np.squeeze(patterns)
histCalCSCor.fill(d_clu)
# absolute gain correction
# changes data from ADU to keV (or n. of photons)
if absolute_gain:
d = d * gain_cnst
if photon_energy > 0:
d /= photon_energy
histCalAbsGainCor.fill(d)
if pattern_classification:
# Modify pattern classification.
d_clu = d_clu * gain_cnst
if photon_energy > 0:
d_clu /= photon_energy
data_clu[index, ...] = np.squeeze(d_clu)
histCalGainCorClusters.fill(d_clu)
d_sing = d_clu[patterns==100] # pattern 100 corresponds to single photons events
if len(d_sing):
histCalGainCorSingles.fill(d_sing)
data[index, ...] = np.squeeze(d)
histCalCor.fill(d)
```
%% Cell type:code id: tags:
``` python
# 10 is a number chosen after testing 1 ... 71 parallel threads
context = psh.context.ThreadContext(num_workers=10)
```
%% Cell type:code id: tags:
``` python
empty_seq = 0
for f in seq_files:
seq_dc = H5File(f)
# Save corrected data in an output file with name
# of corresponding raw sequence file.
out_file = out_folder / f.name.replace("RAW", "CORR")
# Data shape in seq_dc excluding trains with empty images.
ishape = seq_dc[instrument_src, "data.image.pixels"].shape
corr_ntrains = ishape[0]
all_train_ids = seq_dc.train_ids
# Raise a WARNING if this sequence has no trains to correct.
# Otherwise, print number of trains with no data.
if corr_ntrains == 0:
warning(f"No trains to correct for {f.name}: "
"Skipping the processing of this file.")
empty_seq += 1
continue
elif len(all_train_ids) != corr_ntrains:
print(f"{f.name} has {len(all_train_ids) - corr_ntrains} trains with missing data.")
# This parameter is only used for testing.
if limit_trains > 0:
print(f"\nCorrected trains are limited to: {limit_trains} trains")
corr_ntrains = min(corr_ntrains, limit_trains)
oshape = (corr_ntrains, *ishape[1:])
data = context.alloc(shape=oshape, dtype=np.float32)
if pattern_classification:
data_clu = context.alloc(shape=oshape, dtype=np.float32)
data_patterns = context.alloc(shape=oshape, dtype=np.int32)
step_timer.start() # Correct data.
# Overwrite seq_dc after eliminating empty trains or/and applying limited images.
seq_dc = seq_dc.select(
instrument_src, "*", require_all=True).select_trains(np.s_[:corr_ntrains])
pixel_data = seq_dc[instrument_src, "data.image.pixels"]
context.map(correct_train, pixel_data)
step_timer.done_step(f'Correcting {corr_ntrains} trains.')
step_timer.start() # Write corrected data.
# Create CORR files and add corrected data sections.
image_counts = seq_dc[instrument_src, "data.image.pixels"].data_counts(labelled=False)
# Write corrected data.
with DataFile(out_file, "w") as ofile:
dataset_chunk = ((chunk_size_idim,) + oshape[1:]) # e.g. (1, pixels_x, pixels_y)
# Create INDEX datasets.
ofile.create_index(seq_dc.train_ids, from_file=seq_dc.files[0])
# Create METDATA datasets
ofile.create_metadata(
like=seq_dc,
sequence=seq_dc.run_metadata()["sequenceNumber"],
instrument_channels=(f'{instrument_src}/data',)
)
# Create Instrument section to later add corrected datasets.
outp_source = ofile.create_instrument_source(instrument_src)
# Create count/first datasets at INDEX source.
outp_source.create_index(data=image_counts)
# Store uncorrected RAW image datasets for the corrected trains.
data_raw_fields = [ # /data/
"ambTemp", "analogCurr", "analogInputVolt", "backTemp",
"digitalInputVolt", "guardCurr", "relHumidity", "digitalCurr"
]
for field in data_raw_fields:
field_arr = seq_dc[instrument_src, f"data.{field}"].ndarray()
outp_source.create_key(
f"data.{field}", data=field_arr,
chunks=(chunk_size_idim, *field_arr.shape[1:]))
image_raw_fields = [ # /data/image/
"binning", "bitsPerPixel", "dimTypes", "dims",
"encoding", "flipX", "flipY", "roiOffsets", "rotation",
]
for field in image_raw_fields:
field_arr = seq_dc[instrument_src, f"data.image.{field}"].ndarray()
outp_source.create_key(
f"data.image.{field}", data=field_arr,
chunks=(chunk_size_idim, *field_arr.shape[1:]))
# Add main corrected `data.image.pixels` dataset and store corrected data.
outp_source.create_key(
"data.image.pixels", data=data, chunks=dataset_chunk)
outp_source.create_key(
"data.trainId", data=seq_dc.train_ids, chunks=min(50, len(seq_dc.train_ids)))
if pattern_classification:
# Add main corrected `data.image.pixels` dataset and store corrected data.
outp_source.create_key(
"data.image.pixels_classified", data=data_clu, chunks=dataset_chunk)
outp_source.create_key(
"data.image.patterns", data=data_patterns, chunks=dataset_chunk)
step_timer.done_step('Storing data.')
if empty_seq == len(seq_files):
warning("No valid trains for RAW data to correct.")
exit(0)
```
%% Cell type:code id: tags:
``` python
ho, eo, co, so = histCalCor.get()
d = [{
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Total corr.'
}]
ho, eo, co, so = histCalOffsetCor.get()
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Offset corr.'
})
if common_mode:
ho, eo, co, so = histCalCMCor.get()
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'CM corr.'
})
if relative_gain :
ho, eo, co, so = histCalRelGainCor.get()
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Relative gain corr.'
})
if pattern_classification:
ho, eo, co, so = histCalCSCor.get()
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Charge sharing corr.'
})
fig = xana.simplePlot(
d, aspect=1, x_label=f'Energy (ADU)',
y_label='Number of occurrences', figsize='2col',
y_log=True, x_range=(-50, 500),
legend='top-center-frame-2col',
)
plt.title(f'run {run} - {karabo_da}')
plt.grid()
```
%% Cell type:code id: tags:
``` python
if absolute_gain :
d=[]
d = []
ho, eo, co, so = histCalAbsGainCor.get()
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Absolute gain corr.'
})
if pattern_classification:
ho, eo, co, so = histCalGainCorClusters.get()
if co is not None: # avoid adding None array, if calculator is empty.
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Charge sharing corr.'
'label': 'Absolute gain corr.'
})
if pattern_classification:
ho, eo, co, so = histCalGainCorClusters.get()
if co is not None: # avoid adding None array, if calculator is empty.
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Charge sharing corr.'
})
ho, eo, co, so = histCalGainCorSingles.get()
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Isolated photons (singles)'
})
if co is not None: # avoid adding None array, if calculator is empty.
d.append({
'x': co,
'y': ho,
'y_err': np.sqrt(ho[:]),
'drawstyle': 'steps-mid',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Isolated photons (singles)'
})
fig = xana.simplePlot(
d, aspect=1, x_label=f'Energy ({plot_unit})',
y_label='Number of occurrences', figsize='2col',
y_log=True,
x_range=np.array((-50, 500))*hscale,
legend='top-center-frame-2col',
)
plt.grid()
plt.title(f'run {run} - {karabo_da}')
```
%% Cell type:markdown id: tags:
## Mean Image of the corrected data
%% Cell type:code id: tags:
``` python
step_timer.start()
fig = xana.heatmapPlot(
np.nanmedian(data, axis=0),
x_label='Columns', y_label='Rows',
lut_label=f'Signal ({plot_unit})',
x_range=(0, y),
y_range=(0, x),
vmin=-50, vmax=50)
step_timer.done_step(f'Plotting mean image of {data.shape[0]} trains.')
```
%% Cell type:markdown id: tags:
## Single Shot of the corrected data
%% Cell type:code id: tags:
``` python
step_timer.start()
fig = xana.heatmapPlot(
data[0, ...],
x_label='Columns', y_label='Rows',
lut_label=f'Signal ({plot_unit})',
x_range=(0, y),
y_range=(0, x),
vmin=-50, vmax=50)
step_timer.done_step(f'Plotting single shot of corrected data.')
```
......
%% Cell type:markdown id: tags:
# pnCCD Data Correction #
Authors: DET Group, Modified by Kiana Setoodehnia - Version 5.0
The following notebook provides offset, common mode, relative gain, split events and pattern classification corrections of images acquired with the pnCCD. This notebook *does not* yet correct for charge transfer inefficiency.
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SQS/202031/p900166/raw" # input folder
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/remove/pnccd_correct" # output folder
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 347 # which run to read data from
sequences = [-1] # sequences to correct, set to -1 for all, range allowed
sequences_per_node = 1 # number of sequences running on the same slurm node.
karabo_da = 'PNCCD01' # data aggregators
karabo_id = "SQS_NQS_PNCCD1MP" # karabo prefix of PNCCD devices
receiver_id = "PNCCD_FMT-0" # inset for receiver devices
path_template = 'RAW-R{:04d}-{}-S{:05d}.h5' # the template to use to access data
instrument_source_template = '{}/CAL/{}:output' # template for data source name, will be filled with karabo_id and receiver_id.
# Parameters affecting data correction.
commonModeAxis = 0 # axis along which common mode will be calculated, 0 = row, and 1 = column
commonModeBlockSize = [512, 512] # size of the detector in pixels for common mode calculations
split_evt_primary_threshold = 4. # primary threshold for split event classification in terms of n sigma noise
split_evt_secondary_threshold = 3. # secondary threshold for split event classification in terms of n sigma noise
saturated_threshold = 32000. # full well capacity in ADU
# Conditions for retrieving calibration constants
fix_temperature_top = 0. # fix temperature for top sensor in K, set to 0. to use value from slow data.
fix_temperature_bot = 0. # fix temperature for bottom senspr in K, set to 0. to use value from slow data.
gain = -1 # the detector's gain setting. Set to -1 to use the value from the slow data.
bias_voltage = 0. # the detector's bias voltage. set to 0. to use value from slow data.
integration_time = 70 # detector's integration time
photon_energy = 1.6 # Al fluorescence in keV
# Parameters for the calibration database.
cal_db_interface = "tcp://max-exfl016:8015" # calibration DB interface to use
cal_db_timeout = 300000 # timeout on caldb requests
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
# Booleans for selecting corrections to apply.
only_offset = False # Only, apply offset.
common_mode = True # Apply common mode correction
relgain = True # Apply relative gain correction
pattern_classification = True # classify split events
# parameters affecting stored output data.
chunk_size_idim = 1 # H5 chunking size of output data
limit_trains = 0 # this parameter is used for limiting number of images to correct from a sequence file.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da)
```
%% Cell type:code id: tags:
``` python
# Here the herarichy and dependability for correction booleans are defined
corr_bools = {}
corr_bools["only_offset"] = only_offset
# Apply offset only.
if not only_offset:
corr_bools["relgain"] = relgain
corr_bools["common_mode"] = common_mode
corr_bools["pattern_class"] = pattern_classification
```
%% Cell type:code id: tags:
``` python
import os
import sys
import warnings
from logging import warning
from pathlib import Path
warnings.filterwarnings('ignore')
import h5py
import matplotlib.pyplot as plt
import numpy as np
import pasha as psh
from IPython.display import Markdown, display
from extra_data import H5File, RunDirectory
from prettytable import PrettyTable
%matplotlib inline
import cal_tools.restful_config as rest_cfg
from XFELDetAna import xfelpyanatools as xana
from XFELDetAna import xfelpycaltools as xcal
from cal_tools import pnccdlib
from cal_tools.files import DataFile
from cal_tools.calcat_interface import CalCatError, PNCCD_CalibrationData
from cal_tools.tools import (
calcat_creation_time,
get_dir_creation_date,
get_constant_from_db_and_time,
get_random_db_interface,
load_specified_constants,
CalibrationMetadata,
write_constants_fragment,
)
from cal_tools.step_timing import StepTimer
from cal_tools import h5_copy_except
from iCalibrationDB import Conditions, Constants
from iCalibrationDB.detectors import DetectorTypes
```
%% Cell type:code id: tags:
``` python
# Calibration Database Settings, and Some Initial Run Parameters & Paths:
display(Markdown('### Initial Settings and Paths'))
# Sensor size and block size definitions (important for common mode and other corrections):
pixels_x = 1024 # rows of pnCCD in pixels
pixels_y = 1024 # columns of pnCCD in pixels
in_folder = Path(in_folder)
sensorSize = [pixels_x, pixels_y]
# For xcal.HistogramCalculators.
blockSize = [pixels_x//2, pixels_y//2] # sensor area will be analysed according to blockSize.
print(f"pnCCD size is: {pixels_x}x{pixels_y} pixels.")
print(f'Calibration database interface selected: {cal_db_interface}')
# Paths to the data:
instrument_src = instrument_source_template.format(karabo_id, receiver_id)
print(f"Instrument H5File source: {instrument_src}\n")
# Run's creation time:
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f"Creation time: {creation_time}")
```
%% Cell type:code id: tags:
``` python
step_timer = StepTimer()
```
%% Cell type:code id: tags:
``` python
run_dc = RunDirectory(in_folder / f"r{run:04d}", _use_voview=False)
# Output Folder Creation:
os.makedirs(out_folder, exist_ok=True)
# NOTE: this notebook shouldn't overwrite calibration metadata file.
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths are saved under retrieved-constants in calibration_metadata.yml
const_yaml = metadata.get("retrieved-constants", {})
# extract control data
step_timer.start()
ctrl_data = pnccdlib.PnccdCtrl(run_dc, karabo_id)
if bias_voltage == 0.:
bias_voltage = ctrl_data.get_bias_voltage()
if gain == -1:
gain = ctrl_data.get_gain()
if fix_temperature_top == 0:
fix_temperature_top = ctrl_data.get_fix_temperature_top()
if fix_temperature_bot == 0:
fix_temperature_bot = ctrl_data.get_fix_temperature_bot()
step_timer.done_step("Reading control parameters.")
# Printing the Parameters Read from the Data File:
display(Markdown('### Detector Parameters'))
print(f"Bias voltage is {bias_voltage:0.1f} V.")
print(f"Detector gain is set to 1/{int(gain)}.")
print(f"Detector integration time is set to {integration_time} ms")
print(f"Top pnCCD sensor is at temperature of {fix_temperature_top:0.2f} K")
print(f"Bottom pnCCD sensor is at temperature of {fix_temperature_bot:0.2f} K")
```
%% Cell type:code id: tags:
``` python
seq_files = []
for f in run_dc.select(instrument_src).files:
fpath = Path(f.filename)
if fpath.match(f"*{karabo_da}*.h5"):
seq_files.append(fpath)
if sequences != [-1]:
seq_files = sorted([f for f in seq_files if any(f.match(f"*-S{s:05d}.h5") for s in sequences)])
print(f"Processing a total of {len(seq_files)} sequence files:")
print(*seq_files, sep='\n')
```
%% Cell type:code id: tags:
``` python
gain_k = [k for k, v in pnccdlib.VALID_GAINS.items() if v == gain][0]
if gain_k == 'a':
split_evt_mip_threshold = 1000. # MIP threshold in ADU for event classification (10 times average noise)
# Each xcal.HistogramCalculator requires a total number of bins and a binning range. We define these
# using a dictionary:
# For all xcal histograms:
Hist_Bin_Dict = {
"bins": 35000, # number of bins
"bin_range": [0, 35000]
}
# For the numpy histograms on the last cell of the notebook:
Event_Bin_Dict = {
"event_bins": 1000, # number of bins
"b_range": [0, 35000] # bin range
}
elif gain_k == 'b':
split_evt_mip_threshold = 270. # 10 times the average noise
Hist_Bin_Dict = {
"bins": 10000,
"bin_range": [0, 10000]
}
Event_Bin_Dict = {
"event_bins": 1000,
"b_range": [0, 10000]
}
elif gain_k == 'c':
split_evt_mip_threshold = 110. # 10 times the average noise
Hist_Bin_Dict = {
"bins": 3000,
"bin_range": [0, 3000]
}
Event_Bin_Dict = {
"event_bins": 1000,
"b_range": [0, 3000]
}
elif gain_k == 'd':
split_evt_mip_threshold = 90. # 10 times the average noise
Hist_Bin_Dict = {
"bins": 1000,
"bin_range": [0, 1000]
}
Event_Bin_Dict = {
"event_bins": 1000,
"b_range": [0, 1000]
}
elif gain_k == 'e':
split_evt_mip_threshold = 90. # 10 times the average noise
Hist_Bin_Dict = {
"bins": 500,
"bin_range": [0, 500]
}
Event_Bin_Dict = {
"event_bins": 500,
"b_range": [0, 500]
}
else:
split_evt_mip_threshold = 90. # 10 times the average noise
Hist_Bin_Dict = {
"bins": 220,
"bin_range": [0, 220]
}
Event_Bin_Dict = {
"event_bins": 220,
"b_range": [0, 220]
}
bins = Hist_Bin_Dict["bins"]
bin_range = Hist_Bin_Dict["bin_range"]
event_bins = Event_Bin_Dict["event_bins"]
b_range = Event_Bin_Dict["b_range"]
```
%% Cell type:markdown id: tags:
As a first step, dark constants have to be retrieved from the calibration database
As a first step, calibration constants have to be retrieved from the calibration database
%% Cell type:code id: tags:
``` python
display(Markdown("### Constants retrieval"))
step_timer.start()
constant_names = ["OffsetCCD", "NoiseCCD", "BadPixelsDarkCCD"]
if relgain:
constant_names += ["RelativeGainCCD"]
pnccd_cal = PNCCD_CalibrationData(
detector_name=karabo_id,
sensor_bias_voltage=bias_voltage,
integration_time=integration_time,
sensor_temperature=fix_temperature_top,
gain_setting=gain,
event_at=creation_time,
source_energy=photon_energy,
client=rest_cfg.calibration_client(),
)
try:
pnccd_metadata = pnccd_cal.metadata(calibrations=constant_names)
except CalCatError as e:
warning(e)
mod_to_pdu = pnccd_cal.mod_to_pdu
ccvs_url = "https://in.xfel.eu/calibration/calibration_constant_versions/"
for mod, mod_md in pnccd_metadata.items():
display(Markdown(f"{mod}({mod_to_pdu[mod]}):"))
for cname, c_mdata in mod_md.items():
display(Markdown(
f"- [{cname}]({ccvs_url}/{c_mdata['ccv_id']}): {c_mdata['begin_validity_at']}"))
constants = pnccd_cal.ndarray_map(metadata=pnccd_metadata).get(karabo_da, {})
# Validate the constants availability and raise/warn correspondingly.
missing_dark_constants = set(
c for c in ["OffsetCCD", "NoiseCCD", "BadPixelsDarkCCD"] if c not in constants.keys())
if missing_dark_constants:
raise KeyError(
f"Dark constants {missing_dark_constants} are not available for correction.")
if corr_bools.get('relgain') and "RelativeGainCCD" not in constants.keys():
warning("RelativeGainEPix100 is not found in the calibration database.")
corr_bools['relgain'] = False
step_timer.done_step("Constants retrieval")
```
conditions_dict = {
"bias_voltage": bias_voltage,
"integration_time": integration_time,
"gain_setting": gain,
"temperature": fix_temperature_top,
"pixels_x": pixels_x,
"pixels_y": pixels_y,
}
# Dark condition
dark_condition = Conditions.Dark.CCD(**conditions_dict)
# Add photon energy.
conditions_dict.update({"photon_energy": photon_energy})
illum_condition = Conditions.Illuminated.CCD(**conditions_dict)
# A dictionary for initializing constants. {cname: empty constant array}
empty_constants = {
"Offset": np.zeros((pixels_x, pixels_y, 1), dtype=np.float32),
"Noise": np.zeros((pixels_x, pixels_y, 1), dtype=np.float32),
"BadPixelsDark": np.zeros((pixels_x, pixels_y, 1), dtype=np.uint32),
"RelativeGain": np.zeros((pixels_x, pixels_y), dtype=np.float32),
}
if const_yaml: # Used while reproducing corrected data.
print(f"Using stored constants in {metadata.filename}")
constants, when = load_specified_constants(
const_yaml[karabo_da]["constants"], empty_constants
%% Cell type:code id: tags:
``` python
# Record constant details in YAML metadata
write_constants_fragment(
out_folder=(metadata_folder or out_folder),
det_metadata=pnccd_metadata,
caldb_root=pnccd_cal.caldb_root,
)
else:
constants = dict()
when = dict()
for cname, cempty in empty_constants.items():
# No need for retrieving RelativeGain, if not used for correction.
if not corr_bools.get("relgain") and cname == "RelativeGain":
continue
constants[cname], when[cname] = get_constant_from_db_and_time(
karabo_id,
karabo_da,
constant=getattr(Constants.CCD(DetectorTypes.pnCCD), cname)(),
condition=illum_condition if cname == "RelativeGain" else dark_condition,
empty_constant=cempty,
cal_db_interface=get_random_db_interface(cal_db_interface),
creation_time=creation_time,
)
```
%% Cell type:code id: tags:
``` python
fig = xana.heatmapPlot(constants["Offset"][:,:,0], x_label='Columns', y_label='Rows', lut_label='Offset (ADU)',
fig = xana.heatmapPlot(constants["OffsetCCD"][:,:,0], x_label='Columns', y_label='Rows', lut_label='Offset (ADU)',
aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x), vmax=16000,
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Dark Offset Map')
fig = xana.heatmapPlot(constants["Noise"][:,:,0], x_label='Columns', y_label='Rows',
fig = xana.heatmapPlot(constants["NoiseCCD"][:,:,0], x_label='Columns', y_label='Rows',
lut_label='Corrected Noise (ADU)',
aspect=1, x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Dark Noise Map')
fig = xana.heatmapPlot(np.log2(constants["BadPixelsDark"][:,:,0]), x_label='Columns', y_label='Rows',
fig = xana.heatmapPlot(np.log2(constants["BadPixelsDarkCCD"][:,:,0]), x_label='Columns', y_label='Rows',
lut_label='Bad Pixel Value (ADU)',
aspect=1, x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Dark Bad Pixels Map')
if corr_bools.get('relgain'):
fig = xana.heatmapPlot(constants["RelativeGain"], figsize=(8, 8), x_label='Columns', y_label='Rows',
fig = xana.heatmapPlot(constants["RelativeGainCCD"], figsize=(8, 8), x_label='Columns', y_label='Rows',
lut_label='Relative Gain',
aspect=1, x_range=(0, pixels_y), y_range=(0, pixels_x), vmin=0.8, vmax=1.2,
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
panel_top_low_lim = 0.5, panel_top_high_lim = 1.5, panel_side_low_lim = 0.5,
panel_side_high_lim = 1.5,
title = f'Relative Gain Map for pnCCD (Gain = 1/{int(gain)})')
step_timer.done_step("Constants retrieval")
```
%% Cell type:code id: tags:
``` python
#************************ Calculators ************************#
if corr_bools.get('common_mode'):
# Common Mode Correction Calculator:
cmCorrection = xcal.CommonModeCorrection([pixels_x, pixels_y],
commonModeBlockSize,
commonModeAxis,
parallel=False, dType=np.float32, stride=1,
noiseMap=constants["Noise"].astype(np.float32), minFrac=0.25)
noiseMap=constants["NoiseCCD"].astype(np.float32), minFrac=0.25)
if corr_bools.get('pattern_class'):
# Pattern Classifier Calculator:
# Left Hemisphere:
patternClassifierLH = xcal.PatternClassifier([pixels_x, pixels_y//2],
constants["Noise"][:, :pixels_y//2],
constants["NoiseCCD"][:, :pixels_y//2],
split_evt_primary_threshold,
split_evt_secondary_threshold,
split_evt_mip_threshold,
tagFirstSingles=3, # track along y-axis, left to right (see
nCells=1, # split_event.py file in pydetlib/lib/src/
allowElongated=False, # XFELDetAna/algorithms)
blockSize=[pixels_x, pixels_y//2],
parallel=False)
# Right Hemisphere:
patternClassifierRH = xcal.PatternClassifier([pixels_x, pixels_y//2],
constants["Noise"][:, pixels_y//2:],
constants["NoiseCCD"][:, pixels_y//2:],
split_evt_primary_threshold,
split_evt_secondary_threshold,
split_evt_mip_threshold,
tagFirstSingles=4, # track along y-axis, right to left
nCells=1,
allowElongated=False,
blockSize=[pixels_x, pixels_y//2],
parallel=False)
patternClassifierLH._imagesPerChunk = 1
patternClassifierRH._imagesPerChunk = 1
patternClassifierLH._noisemap = constants["Noise"][:, :pixels_x//2]
patternClassifierRH._noisemap = constants["Noise"][:, pixels_x//2:]
patternClassifierLH._noisemap = constants["NoiseCCD"][:, :pixels_x//2]
patternClassifierRH._noisemap = constants["NoiseCCD"][:, pixels_x//2:]
# Setting bad pixels:
patternClassifierLH.setBadPixelMask(constants["BadPixelsDark"][:, :pixels_x//2] != 0)
patternClassifierRH.setBadPixelMask(constants["BadPixelsDark"][:, pixels_x//2:] != 0)
patternClassifierLH.setBadPixelMask(constants["BadPixelsDarkCCD"][:, :pixels_x//2] != 0)
patternClassifierRH.setBadPixelMask(constants["BadPixelsDarkCCD"][:, pixels_x//2:] != 0)
```
%% Cell type:code id: tags:
``` python
#***************** Histogram Calculators ******************#
# Will contain uncorrected data:
histCalRaw = xcal.HistogramCalculator(sensorSize,
bins=bins,
range=bin_range,
nCells=1,
parallel=False,
blockSize=blockSize)
# Will contain offset corrected data:
histCalOffsetCor = xcal.HistogramCalculator(sensorSize,
bins=bins,
range=bin_range,
nCells=1,
parallel=False,
blockSize=blockSize)
if corr_bools.get('common_mode'):
# Will contain common mode corrected data:
histCalCommonModeCor = xcal.HistogramCalculator(sensorSize,
bins=bins,
range=bin_range,
nCells=1,
parallel=False,
blockSize=blockSize)
if corr_bools.get('pattern_class'):
# Will contain split events pattern data:
histCalPcorr = xcal.HistogramCalculator(sensorSize,
bins=bins,
range=bin_range,
nCells=1,
parallel=False,
blockSize=blockSize)
# Will contain singles events data:
histCalPcorrS = xcal.HistogramCalculator(sensorSize,
bins=bins,
range=bin_range,
nCells=1,
parallel=False,
blockSize=blockSize)
if corr_bools.get('relgain'):
# Will contain gain corrected data:
histCalGainCor = xcal.HistogramCalculator(sensorSize,
bins=bins,
range=bin_range,
nCells=1,
parallel=False,
blockSize=blockSize)
```
%% Cell type:markdown id: tags:
## Applying corrections to the raw data
%% Cell type:code id: tags:
``` python
def offset_correction(wid, index, d):
"""offset correction.
Equating bad pixels' values to np.nan,
so that the pattern classifier ignores them:
"""
d = d.copy()
# TODO: To clear this up. Is it on purpose to save corrected data with nans?
d[bpix != 0] = np.nan
d -= offset # offset correction
# TODO: to clear this up. why save the badpixels map in the corrected data?
bpix_data[index, ...] = bpix
data[index, ...] = d
def common_mode(wid, index, d):
"""common-mode correction.
Discarding events caused by saturated pixels:
"""
d = np.squeeze(cmCorrection.correct(d, cellTable=np.zeros(pixels_y, np.int32)))
# we equate these values to np.nan so that the pattern classifier ignores them:
d[d >= saturated_threshold] = np.nan
data[index, ...] = d
def gain_correction(wid, index, d):
"""relative gain correction."""
d /= relativegain
data[index, ...] = d
def pattern_classification_correction(wid, index, d):
"""pattern classification correction.
data set to save split event corrected images
The calculation of the cluster map:]
Dividing the data into left and right hemispheres:
"""
# pattern classification on corrected data
dataLH, patternsLH = patternClassifierLH.classify(d[:, :pixels_x//2])
dataRH, patternsRH = patternClassifierRH.classify(d[:, pixels_x//2:])
d[:, :pixels_x//2] = np.squeeze(dataLH)
d[:, pixels_x//2:] = np.squeeze(dataRH)
patterns = np.zeros(d.shape, patternsLH.dtype)
patterns[:, :pixels_x//2] = np.squeeze(patternsLH)
patterns[:, pixels_x//2:] = np.squeeze(patternsRH)
d[d < split_evt_primary_threshold*noise] = 0
data[index, ...] = d
ptrn_data[index, ...] = patterns
d[patterns != 100] = np.nan # Discard doubles, triples, quadruple, clusters, first singles
filtered_data[index, ...] = d
```
%% Cell type:code id: tags:
``` python
# 10 is a number chosen after testing 1 ... 71 parallel threads for a node with 72 cpus.
parallel_num_threads = 10
context = psh.context.ThreadContext(num_workers=parallel_num_threads)
data_path = "INSTRUMENT/"+instrument_src+"/data/"
offset = np.squeeze(constants["Offset"])
noise = np.squeeze(constants["Noise"])
bpix = np.squeeze(constants["BadPixelsDark"])
relativegain = constants.get("RelativeGain")
offset = np.squeeze(constants["OffsetCCD"])
noise = np.squeeze(constants["NoiseCCD"])
bpix = np.squeeze(constants["BadPixelsDarkCCD"])
relativegain = constants.get("RelativeGainCCD")
```
%% Cell type:code id: tags:
``` python
def write_datasets(seq_dc, corr_arrays, out_file, instrument_src):
"""
Creating datasets first then adding data.
To have metadata together available at the start of the file,
so it's quick to see what the file contains.
"""
# Create CORR files and add corrected data sections.
image_counts = seq_dc[instrument_src, "data.image"].data_counts(labelled=False)
dataset_chunk = ((chunk_size_idim,) + corr_arrays["pixels"].shape[1:]) # e.g. (1, pixels_x, pixels_y)
with DataFile(out_file, 'w') as ofile:
# Create INDEX datasets.
ofile.create_index(seq_dc.train_ids, from_file=seq_dc.files[0])
# Create METDATA datasets
ofile.create_metadata(
like=seq_dc,
sequence=seq_dc.run_metadata()["sequenceNumber"],
instrument_channels=(f"{instrument_src}/data",)
)
# Create Instrument section to later add corrected datasets.
outp_source = ofile.create_instrument_source(instrument_src)
# Create count/first datasets at INDEX source.
outp_source.create_index(data=image_counts)
# Store uncorrected trainId in the corrected file.
outp_source.create_key(
f"data.trainId", data=seq_dc.train_ids,
chunks=min(50, len(seq_dc.train_ids))
)
# TODO: gain dataset is just the RelativeGain constant
# and it doesn't make sense to write it into corrected data.
comp_fields = ["gain", "patterns", "pixels_classified"]
# TODO: to clear this up: why save corrected data
# in data/pixels rather than data/image.
for field, data in corr_arrays.items():
if field in comp_fields: # Write compressed corrected data.
outp_source.create_compressed_key(f"data.{field}", data=data)
else:
outp_source.create_key(
f"data.{field}", data=data,
chunks=dataset_chunk
)
```
%% Cell type:code id: tags:
``` python
# Data corrections and event classifications happen here.
# Also, the corrected data are written to datasets:
empty_seq = 0
for seq_n, seq_f in enumerate(seq_files):
seq_dc = H5File(seq_f)
out_file = f"{out_folder}/{seq_f.name}".replace("RAW", "CORR")
step_timer.start()
img_dc = seq_dc[instrument_src, "data.image"]
dshape = seq_dc[instrument_src, "data.image"].shape
n_trains = dshape[0]
corr_ntrains = dshape[0] # number of available trains to correct.
all_train_ids = img_dc.train_ids # All trains including trains with no data.
# Raise a WARNING if this sequence has no trains to correct.
# Otherwise, print number of trains with no data.
if corr_ntrains == 0:
warning(f"No trains to correct for {seq_f.name}: "
"Skipping the processing of this file.")
empty_seq += 1
continue
elif len(all_train_ids) != corr_ntrains:
print(
f"{seq_f.name} has {len(all_train_ids) - corr_ntrains} "
"trains with missing data."
)
# If you want to analyze only a certain number of frames
# instead of all available good frames.
if limit_trains > 0:
print(f"\nCorrected trains are limited to: {limit_trains} trains")
corr_ntrains = min(corr_ntrains, limit_trains)
data_shape = (corr_ntrains, dshape[1], dshape[2])
print(f"Correcting file {seq_f} of {corr_ntrains} trains.")
# Overwrite seq_dc after eliminating empty trains or/and applying limited images.
seq_dc = seq_dc.select(
instrument_src, "*", require_all=True).select_trains(np.s_[:corr_ntrains])
raw_data = seq_dc[instrument_src, "data.image"].ndarray().astype(np.float32)
to_store_arrays = {"image": raw_data}
# TODO: move the parts for reading data to plot to later cells.
if seq_n == 0:
raw_plt = raw_data.copy() # plot first sequence only
step_timer.start()
# Allocating shared arrays for data arrays for each correction stage.
data = context.alloc(shape=data_shape, dtype=np.float32)
bpix_data = context.alloc(shape=data_shape, dtype=np.uint32)
histCalRaw.fill(raw_data) # filling histogram with raw uncorrected data
# Applying offset correction
context.map(offset_correction, raw_data)
histCalOffsetCor.fill(data) # filling histogram with offset corrected data
if seq_n == 0:
off_data = data.copy() # plot first sequence only
to_store_arrays["pixels"] = data.copy()
to_store_arrays["mask"] = bpix_data
step_timer.done_step(f'offset correction.')
if corr_bools.get('common_mode'):
step_timer.start()
# Applying common mode correction
context.map(common_mode, data)
if seq_n == 0:
cm_data = data.copy() # plot first sequence only
to_store_arrays["pixels_cm"] = data.copy()
histCalCommonModeCor.fill(data) # filling histogram with common mode corrected data
step_timer.done_step(f'common-mode correction.')
if corr_bools.get('relgain'):
step_timer.start()
# Applying gain correction
context.map(gain_correction, data)
if seq_n == 0:
rg_data = data.copy() # plot first sequence only
# TODO: Why storing a repeated constant for each image in corrected files.
to_store_arrays["gain"] = np.repeat(relativegain[np.newaxis, ...], corr_ntrains, axis=0).astype(np.float32) # noqa
histCalGainCor.fill(data) # filling histogram with gain corrected data
step_timer.done_step(f'gain correction.')
if corr_bools.get('pattern_class'):
step_timer.start()
ptrn_data = context.alloc(shape=data_shape, dtype=np.int32)
filtered_data = context.alloc(shape=data_shape, dtype=np.int32)
# Applying pattern classification correction
# Even though data is indeed of dtype np.float32,
# not specifying this again screw with the data quality.
context.map(pattern_classification_correction, data.astype(np.float32))
if seq_n == 0:
cls_data = data.copy() # plot first sequence only
# split event corrected images plotted for first sequence only
# (also these events are only singles events):
to_store_arrays["pixels_classified"] = data.copy()
to_store_arrays["patterns"] = ptrn_data
histCalPcorr.fill(data) # filling histogram with split events corrected data
# filling histogram with corr data after discarding doubles, triples, quadruple, clusters, and first singles
histCalPcorrS.fill(filtered_data)
step_timer.done_step(f'pattern classification correction.')
step_timer.start()
# Storing corrected data sources.
write_datasets(
seq_dc=seq_dc,
corr_arrays=to_store_arrays,
out_file=out_file,
instrument_src=instrument_src,
)
step_timer.done_step(f'Storing data.')
# Exit and raise warning if there are no data to correct for all sequences.
if empty_seq == len(seq_files):
warning("No valid trains for RAW data to correct.")
sys.exit(0)
```
%% Cell type:code id: tags:
``` python
print("In addition to offset correction, the following corrections were performed:")
for k, v in corr_bools.items():
if v:
print(" -", k.upper())
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
%% Cell type:code id: tags:
``` python
print("In addition to offset correction, the following corrections were performed:")
for k, v in corr_bools.items():
if v:
print(" -", k.upper())
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
%% Cell type:code id: tags:
``` python
# Histograming the resulting spectra:
# First _ refers to the bin edges and second _ refers to statistics and we ignore both.
# if you use histCalRaw.get(cumulatative = True) and so on, the cumulatative = True turns the counts array such as
# RawHistVals and so on into a 1D array instead of keeping the original shape:
RawHistVals, _, RawHistMids, _ = histCalRaw.get()
off_cor_HistVals, _, off_cor_HistMids, _ = histCalOffsetCor.get()
if corr_bools.get('common_mode'):
cm_cor_HistVals, _, cm_HistMids, _ = histCalCommonModeCor.get()
if corr_bools.get('relgain'):
gain_cor_HistVals, _, gain_cor_HistMids, _ = histCalGainCor.get()
if corr_bools.get('pattern_class'):
split_HistVals, _, split_HistMids, _ = histCalPcorr.get() # split events corrected
singles_HistVals, _, singles_HistMids, _ = histCalPcorrS.get() # last s in variable names: singles events
```
%% Cell type:code id: tags:
``` python
# Saving intermediate data to disk:
step_timer.start()
np.savez(os.path.join(out_folder, 'Raw_Events.npz'), RawHistMids, RawHistVals)
np.savez(os.path.join(out_folder, 'Offset_Corrected_Events.npz'), off_cor_HistMids, off_cor_HistVals)
if corr_bools.get('common_mode'):
np.savez(os.path.join(out_folder, 'Common_Mode_Corrected_Events.npz'), cm_HistMids, cm_cor_HistVals)
if corr_bools.get('relgain'):
np.savez(os.path.join(out_folder, 'Gain_Corrected_Events.npz'), gain_cor_HistMids, gain_cor_HistVals)
if corr_bools.get('pattern_class'):
np.savez(os.path.join(out_folder, 'Split_Events_Corrected_Events.npz'), split_HistMids, split_HistVals)
np.savez(os.path.join(out_folder, 'Singles_Events.npz'), singles_HistMids, singles_HistVals)
step_timer.done_step(f'Saving intermediate data to disk.')
print("Various spectra are saved to disk in the form of histograms. Please check {}".format(out_folder))
```
%% Cell type:code id: tags:
``` python
display(Markdown('### Raw vs. Corrected Spectra'))
step_timer.start()
figure = [{'x': RawHistMids,
'y': RawHistVals,
'y_err': np.sqrt(RawHistVals[:]),
'drawstyle': 'steps-post',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Uncorrected'
},
{'x': off_cor_HistMids,
'y': off_cor_HistVals,
'y_err': np.sqrt(off_cor_HistVals[:]),
'drawstyle': 'steps-post',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Offset Corrected'
}]
if corr_bools.get('common_mode'):
figure.append({'x': cm_HistMids,
'y': cm_cor_HistVals,
'y_err': np.sqrt(cm_cor_HistVals[:]),
'drawstyle': 'steps-post',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Common Mode Corrected'})
if corr_bools.get('relgain'):
xrange = bin_range
figure.append({'x': gain_cor_HistMids,
'y': gain_cor_HistVals,
'y_err': np.sqrt(gain_cor_HistVals[:]),
'drawstyle': 'steps-post',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Gain Corrected'})
if corr_bools.get('pattern_class'):
figure.extend([{'x': split_HistMids,
'y': split_HistVals,
'y_err': np.sqrt(split_HistVals[:]),
'drawstyle': 'steps-post',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Split Events Corrected'
},
{'x': singles_HistMids,
'y': singles_HistVals,
'y_err': np.sqrt(singles_HistVals[:]),
'drawstyle': 'steps-post',
'errorstyle': 'bars',
'errorcoarsing': 2,
'label': 'Singles Events'
}])
fig = xana.simplePlot(figure, aspect=1, x_label='ADU', y_label='Number of Occurrences', figsize='2col',
y_log=True, x_range=bin_range, title = '1 ADU per bin is used.',
legend='top-right-frame-1col')
step_timer.done_step('Plotting')
```
%% Cell type:code id: tags:
``` python
# This function plots pattern statistics:
def classification_plot(patternStats, hemisphere):
print("****************** {} HEMISPHERE ******************\n"
.format(hemisphere))
fig = plt.figure(figsize=(15, 15))
ax = fig.add_subplot(4, 4, 1)
sfields = ["singles", "first singles", "clusters"]
mfields = ["doubles", "triples", "quads"]
relativeOccurances = []
labels = []
for i, f in enumerate(sfields):
relativeOccurances.append(patternStats[f])
labels.append(f)
for i, f in enumerate(mfields):
for k in range(len(patternStats[f])):
relativeOccurances.append(patternStats[f][k])
labels.append("{}({})".format(f, k))
relativeOccurances = np.array(relativeOccurances, np.float)
relativeOccurances /= np.sum(relativeOccurances)
pie = ax.pie(relativeOccurances, labels=labels, autopct='%1.1f%%', shadow=True)
ax.set_title("Pattern Occurrence")
# Set aspect ratio to be equal so that pie is drawn as a circle.
a = ax.axis('equal')
smaps = ["singlemap", "firstsinglemap", "clustermap"]
for i, m in enumerate(smaps):
ax = fig.add_subplot(4, 4, 2+i)
pmap = ax.imshow(patternStats[m], interpolation="nearest", vmax=2*np.nanmedian(patternStats[m]))
ax.set_title(m)
cb = fig.colorbar(pmap)
mmaps = ["doublemap", "triplemap", "quadmap"]
k = 0
for i, m in enumerate(mmaps):
for j in range(4):
ax = fig.add_subplot(4, 4, 2+len(smaps)+k)
pmap = ax.imshow(patternStats[m][j], interpolation="nearest", vmax=2*np.median(patternStats[m][j]))
ax.set_title("{}({})".format(m,j))
cb = fig.colorbar(pmap)
k+=1
```
%% Cell type:code id: tags:
``` python
# The next two cells plot the classification results for left and right hemispheres, respectively:
display(Markdown('### Classification Results - Plots'))
if corr_bools.get('pattern_class'):
patternStatsLH = patternClassifierLH.getPatternStats()
classification_plot(patternStatsLH, 'Left')
```
%% Cell type:code id: tags:
``` python
if corr_bools.get('pattern_class'):
patternStatsRH = patternClassifierRH.getPatternStats()
classification_plot(patternStatsRH, 'Right')
```
%% Cell type:code id: tags:
``` python
display(Markdown('### Classification Results - Tabulated Statistics'))
if corr_bools.get('pattern_class'):
step_timer.start()
t0 = PrettyTable()
t0.title = "Total Number of Counts after All Corrections"
t0.field_names = ["Hemisphere", "Singles", "First-Singles", "Clusters"]
t0.add_row(["LH", patternStatsLH['singles'], patternStatsLH['first singles'], patternStatsLH['clusters']])
t0.add_row(["RH", patternStatsRH['singles'], patternStatsRH['first singles'], patternStatsRH['clusters']])
print(t0)
print("Abbreviations: D (Doubles), T (Triples), Q (Quadruples), L (Left), R (Right), and H (Hemisphere).")
t1 = PrettyTable()
t1.field_names = ["Index", "D-LH", "D-RH", "T-LH", "T-RH", "Q-LH", "Q-RH"]
t1.add_row([0, patternStatsLH['doubles'][0], patternStatsRH['doubles'][0], patternStatsLH['triples'][0],
patternStatsRH['triples'][0], patternStatsLH['quads'][0], patternStatsRH['quads'][0]])
t1.add_row([1, patternStatsLH['doubles'][1], patternStatsRH['doubles'][1], patternStatsLH['triples'][1],
patternStatsRH['triples'][1], patternStatsLH['quads'][1], patternStatsRH['quads'][1]])
t1.add_row([2, patternStatsLH['doubles'][2], patternStatsRH['doubles'][2], patternStatsLH['triples'][2],
patternStatsRH['triples'][2], patternStatsLH['quads'][2], patternStatsRH['quads'][2]])
t1.add_row([3, patternStatsLH['doubles'][3], patternStatsRH['doubles'][3], patternStatsLH['triples'][3],
patternStatsRH['triples'][3], patternStatsLH['quads'][3], patternStatsRH['quads'][3]])
print(t1)
step_timer.done_step('Classification Results - Tabulated Statistics')
```
%% Cell type:code id: tags:
``` python
if corr_bools.get('pattern_class'):
doublesLH = patternStatsLH['doubles'][0] + patternStatsLH['doubles'][1] + patternStatsLH['doubles'][2] + \
patternStatsLH['doubles'][3]
triplesLH = patternStatsLH['triples'][0] + patternStatsLH['triples'][1] + patternStatsLH['triples'][2] + \
patternStatsLH['triples'][3]
quadsLH = patternStatsLH['quads'][0] + patternStatsLH['quads'][1] + patternStatsLH['quads'][2] + \
patternStatsLH['quads'][3]
allsinglesLH = patternStatsLH['singles'] + patternStatsLH['first singles']
eventsLH = allsinglesLH + doublesLH + triplesLH + quadsLH
doublesRH = patternStatsRH['doubles'][0] + patternStatsRH['doubles'][1] + patternStatsRH['doubles'][2] + \
patternStatsRH['doubles'][3]
triplesRH = patternStatsRH['triples'][0] + patternStatsRH['triples'][1] + patternStatsRH['triples'][2] + \
patternStatsRH['triples'][3]
quadsRH = patternStatsRH['quads'][0] + patternStatsRH['quads'][1] + patternStatsRH['quads'][2] + \
patternStatsRH['quads'][3]
allsinglesRH = patternStatsRH['singles'] + patternStatsRH['first singles']
eventsRH = allsinglesRH + doublesRH + triplesRH + quadsRH
if eventsLH > 0.:
reloccurLH = np.array([allsinglesLH/eventsLH, doublesLH/eventsLH, triplesLH/eventsLH, quadsLH/eventsLH])
else:
reloccurLH = np.array([0]*4)
if eventsRH > 0.:
reloccurRH = np.array([allsinglesRH/eventsRH, doublesRH/eventsRH, triplesRH/eventsRH, quadsRH/eventsRH])
else:
reloccurRH = np.array([0]*4)
```
%% Cell type:code id: tags:
``` python
display(Markdown('### Classification Results - Pie Charts'))
if corr_bools.get('pattern_class'):
step_timer.start()
fig = plt.figure(figsize=(12, 7))
ax = fig.add_subplot(1, 2, 1)
labels = ['Singles', 'Doubles', 'Triples', 'Quads']
pie = ax.pie(reloccurLH, labels=labels, autopct='%1.1f%%', shadow=True)
ax.set_title("Pattern Occurrence in LH")
# Set aspect ratio to be equal so that pie is drawn as a circle.
a = ax.axis('equal')
ax = fig.add_subplot(1, 2, 2)
pie = ax.pie(reloccurRH, labels=labels, autopct='%1.1f%%', shadow=True)
ax.set_title("Pattern Occurrence in RH")
# Set aspect ratio to be equal so that pie is drawn as a circle.
a = ax.axis('equal')
step_timer.done_step('Classification Results - Pie Charts')
```
%% Cell type:markdown id: tags:
### Various Images Averaged Over All Frames of Only the First Sequence ###
%% Cell type:code id: tags:
``` python
step_timer.start()
uncor_mean_im = np.nanmean(raw_data, axis=0)
offset_mean_im = np.nanmean(off_data, axis=0)
if corr_bools.get('common_mode'):
cm_mean_im = np.nanmean(cm_data, axis=0)
if corr_bools.get('relgain'):
gain_mean_im = np.nanmean(rg_data, axis=0)
if corr_bools.get('pattern_class'):
mean_im_cc = np.nanmean(cls_data, axis=0)
fig = xana.heatmapPlot(uncor_mean_im, x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Uncorrected Image Averaged over Frames in the First Sequence')
fig = xana.heatmapPlot(offset_mean_im, x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Offset Corrected Image Averaged over Frames in the First Sequence')
if corr_bools.get('common_mode'):
fig = xana.heatmapPlot(cm_mean_im, x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Common Mode Corrected Image Averaged over Frames in the First Sequence')
if corr_bools.get('relgain'):
fig = xana.heatmapPlot(gain_mean_im, x_label='Columns', y_label='Rows',
lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Gain Corrected Image Averaged over Frames in the First Sequence')
if corr_bools.get('pattern_class'):
fig = xana.heatmapPlot(mean_im_cc, x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x), vmin=0, vmax= 18000,
title = 'Image of Single Events Averaged over Frames in the First Sequence')
step_timer.done_step("Plotting")
```
%% Cell type:markdown id: tags:
### Images of the First Frame of the First Sequence ###
%% Cell type:code id: tags:
``` python
step_timer.start()
fig = xana.heatmapPlot(raw_data[0, :, :], x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Uncorrected Image (First Frame of the First Sequence)')
fig = xana.heatmapPlot(off_data[0, :, :], x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Offset Corrected Image (First Frame of the First Sequence)')
if corr_bools.get('common_mode'):
fig = xana.heatmapPlot(cm_data[0, :, :], x_label='Columns', y_label='Rows', lut_label='Signal (ADU)',
aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Common Mode Corrected Image (First Frame of the First Sequence)')
if corr_bools.get('relgain'):
fig = xana.heatmapPlot(rg_data[0, :, :], x_label='Columns', y_label='Rows', lut_label='Signal (ADU)',
aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Gain Corrected Image (First Frame of the First Sequence)')
if corr_bools.get('pattern_class'):
fig = xana.heatmapPlot(cls_data[0, :, :], x_label='Columns', y_label='Rows', lut_label='Signal (ADU)', aspect=1,
x_range=(0, pixels_y), y_range=(0, pixels_x),
panel_x_label='Row Stat (ADU)', panel_y_label='Column Stat (ADU)',
title = 'Image of Single Events (First Frame of the First Sequence)')
step_timer.done_step("Plotting")
```
%% Cell type:code id: tags:
``` python
# Resetting the histogram calculators:
histCalRaw.reset()
histCalOffsetCor.reset()
if corr_bools.get('common_mode'):
histCalCommonModeCor.reset()
if corr_bools.get('relgain'):
histCalGainCor.reset()
if corr_bools.get('pattern_class'):
histCalPcorr.reset()
histCalPcorrS.reset()
```
%% Cell type:markdown id: tags:
Next, the corrected event patterns are read from the patterns/dataset created previously and are separated into 4 different categories (singles, doubles, triples and quadruples) using the pattern indices. However, this is done only for one sequence, corresponding to the seq_num variable, as an example.
Note that the number of bins and the bin range for the following histograms may be different from those presented above (depending on gain) to make the counts more noticible and the peaks more defined.
If you are interested in plotting the events from all sequences or the spectra of half of the sensor, execute the spectra_pnCCD_NBC.ipynb notebook.
%% Cell type:code id: tags:
``` python
if corr_bools.get('pattern_class'):
singles = []
doubles = []
triples = []
quads = []
with H5File(f"{out_folder}/{seq_files[0].name.replace('RAW', 'CORR')}") as dc: # noqa
data = dc[instrument_src, "data.pixels_classified"].ndarray()
patterns = dc[instrument_src, "data.patterns"].ndarray()
# events' patterns indices are as follows: 100 (singles), 101 (first singles), 200 - 203 (doubles),
# 300 - 303 (triples), and 400 - 403 (quadruples). Note that for the last three types of patterns,
# there are left, right, up, and down indices.
# Separating the events:
# Singles and First Singles:
for s in range(100, 102):
single = data.copy()
single[patterns != s] = np.nan
singles.append(single)
for d in range(200, 204):
double = data.copy()
double[patterns != d] = np.nan
doubles.append(double)
for t in range(300, 304):
triple = data.copy()
triple[patterns != t] = np.nan
triples.append(triple)
for q in range(400, 404):
quad = data.copy()
quad[patterns != q] = np.nan
quads.append(quad)
```
%% Cell type:code id: tags:
``` python
if corr_bools.get('pattern_class'):
step_timer.start()
hA = 0
h = 0
for single in singles:
hs, e = np.histogram(single.flatten(), bins=event_bins, range=b_range) # h: histogram counts, e: bin edges
h += hs
hA += hs # hA: counts all events (see below)
# bin edges array has one extra element => need to plot from 0 to the one before the last element to have the
# same size as h-array => in what follows, we use e[:-1] (-1 means one before the last element)
display(Markdown('### Histograms of Corrected Events for One Sequence Only'))
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111)
ax.step(e[:-1], h, color='blue', label='Events Involving Single Pixels Only')
ax.semilogy() # y-axis is log, x-axis is linear
ax.set_xlabel("Energy (ADU) [{} bins per {} ADU]".format(event_bins, b_range[1]-b_range[0]))
ax.set_ylabel("Corrected Events for One Sequence (counts)")
ax.set_xlim(b_range)
h = 0
for double in doubles:
hd, e = np.histogram(double.flatten(), bins=event_bins, range=b_range)
h += hd
hA += hd
ax.step(e[:-1], h, color='red', label='Events Splitting on Double Pixels')
h = 0
for triple in triples:
ht, e = np.histogram(triple.flatten(), bins=event_bins, range=b_range)
h += ht
hA += ht
ax.step(e[:-1], h, color='green', label='Events Splitting on Triple Pixels')
h = 0
for quad in quads:
hq, e = np.histogram(quad.flatten(), bins=event_bins, range=b_range)
h += hq
hA += hq
ax.step(e[:-1], h, color='purple', label='Events Splitting on Quadruple Pixels')
ax.step(e[:-1], hA, color='grey', label='All Valid Events')
l = ax.legend()
step_timer.done_step("Plotting")
```
%% Cell type:code id: tags:
``` python
print(f"Total processing time {step_timer.timespan():.01f} s")
step_timer.print_summary()
```
......
%% Cell type:markdown id: tags:
# pnCCD retrieve constants precorrection
Author: European XFEL Detector Group, Version: 1.0
The following notebook provides constants for the selected pnCCD modules before executing correction on the selected sequence files.
%% Cell type:code id: tags:
``` python
in_folder = "/gpfs/exfel/exp/SQS/202031/p900166/raw" # input folder
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/remove/pnccd_correct" # output folder
metadata_folder = "" # Directory containing calibration_metadata.yml when run by xfel-calibrate
run = 347 # which run to read data from
sequences = [0] # sequences to correct, set to -1 for all, range allowed
karabo_da = 'PNCCD01' # data aggregators
karabo_id = "SQS_NQS_PNCCD1MP" # detector Karabo_ID
# Conditions for retrieving calibration constants
fix_temperature_top = 0. # fix temperature for top sensor in K, set to 0. to use value from slow data.
fix_temperature_bot = 0. # fix temperature for bottom sensor in K, set to 0. to use value from slow data.
gain = -1 # the detector's gain setting. Set to -1 to use the value from the slow data.
bias_voltage = 0. # the detector's bias voltage. set to 0. to use value from slow data.
integration_time = 70 # detector's integration time
photon_energy = 1.6 # Al fluorescence in keV
# Parameters for the calibration database.
cal_db_interface = "tcp://max-exfl016:8015" # calibration DB interface to use
cal_db_timeout = 300000 # timeout on CalibrationDB requests
creation_time = "" # The timestamp to use with Calibration DBe. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
# Booleans for selecting corrections to apply.
only_offset = False # Only, apply offset.
relgain = True # Apply relative gain correction
# parameters affecting stored output data.
overwrite = True # keep this as True to not overwrite the output
```
%% Cell type:code id: tags:
``` python
import datetime
from pathlib import Path
from IPython.display import Markdown, display
from extra_data import RunDirectory
from cal_tools import pnccdlib
from cal_tools.tools import (
calcat_creation_time,
get_dir_creation_date,
get_from_db,
get_random_db_interface,
save_constant_metadata,
CalibrationMetadata,
)
from iCalibrationDB import Conditions, Constants
from iCalibrationDB.detectors import DetectorTypes
```
%% Cell type:code id: tags:
``` python
metadata = CalibrationMetadata(metadata_folder or out_folder)
# NOTE: this notebook will not overwrite calibration metadata file,
# if it already contains details about which constants to use.
retrieved_constants = metadata.setdefault("retrieved-constants", {})
if karabo_da in retrieved_constants:
print(
f"Constant for {karabo_da} already in {metadata.filename}, won't query again."
) # noqa
import sys
sys.exit(0)
```
%% Cell type:code id: tags:
``` python
# Here the correction booleans dictionary is defined
corr_bools = {}
corr_bools["only_offset"] = only_offset
# Apply offset only.
if not only_offset:
corr_bools["relgain"] = relgain
```
%% Cell type:code id: tags:
``` python
print(f"Calibration database interface selected: {cal_db_interface}")
# Run's creation time:
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f"Creation time: {creation_time}")
```
%% Cell type:code id: tags:
``` python
run_dc = RunDirectory(Path(in_folder) / f"r{run:04d}", _use_voview=False)
ctrl_data = pnccdlib.PnccdCtrl(run_dc, karabo_id)
# extract control data
if bias_voltage == 0.0:
bias_voltage = ctrl_data.get_bias_voltage()
if gain == -1:
gain = ctrl_data.get_gain()
if fix_temperature_top == 0:
fix_temperature_top = ctrl_data.get_fix_temperature_top()
# Printing the Parameters Read from the Data File:
display(Markdown("### Detector Parameters"))
print(f"Bias voltage: {bias_voltage:0.1f} V.")
print(f"Detector gain: {int(gain)}.")
print(f"Detector integration time: {integration_time} ms")
print(f"Top pnCCD sensor temperature: {fix_temperature_top:0.2f} K")
```
%% Cell type:code id: tags:
``` python
display(Markdown("### Constants retrieval"))
conditions_dict = {
"bias_voltage": bias_voltage,
"integration_time": integration_time,
"gain_setting": gain,
"temperature": fix_temperature_top,
"pixels_x": 1024,
"pixels_y": 1024,
}
# Dark condition
dark_condition = Conditions.Dark.CCD(**conditions_dict)
# Add photon energy.
conditions_dict.update({"photon_energy": photon_energy})
illum_condition = Conditions.Illuminated.CCD(**conditions_dict)
mdata_dict = dict()
mdata_dict["constants"] = dict()
for cname in ["Offset", "Noise", "BadPixelsDark", "RelativeGain"]:
# No need for retrieving RelativeGain, if not used for correction.
if not corr_bools.get("relgain") and cname == "RelativeGain":
continue
_, mdata = get_from_db(
karabo_id=karabo_id,
karabo_da=karabo_da,
constant=getattr(Constants.CCD(DetectorTypes.pnCCD), cname)(),
condition=illum_condition if cname == "RelativeGain" else dark_condition,
empty_constant=None,
cal_db_interface=get_random_db_interface(cal_db_interface),
creation_time=creation_time,
verbosity=1,
load_data=False,
)
save_constant_metadata(mdata_dict["constants"], mdata, cname)
mdata_dict["physical-detector-unit"] = mdata.calibration_constant_version.device_name
retrieved_constants[karabo_da] = mdata_dict
metadata.save()
print(f"Stored retrieved constants in {metadata.filename}")
```
......@@ -524,6 +524,22 @@ class CalibrationData:
self.detector["id"], self.pdu_snapshot_at, self.module_naming
)
@property
def mod_to_pdu(self):
"""Get the physical detector units and create a dictionary
mapping each module name to physical name (physical detector unit).
Returns:
DICT: mapping module to physical detector unit name.
"""
return {
mod: pdu_md["physical_name"] for mod, pdu_md in self._api.physical_detector_units( # noqa
self.detector["id"],
self.pdu_snapshot_at,
self.module_naming,
).items()
}
@property
def condition(self):
return self._build_condition(self.parameters)
......@@ -755,9 +771,9 @@ class CalibrationData:
try:
creation_date = data.files[0].metadata()["creationDate"]
except KeyError:
from warnings import warning
from warnings import warn
warning(
warn(
"Last file modification time used as creation date for old "
"DAQ file format may be unreliable"
)
......@@ -767,9 +783,9 @@ class CalibrationData:
)
else:
if not data.is_single_run:
from warnings import warning
from warnings import warn
warning(
warn(
"Sample file used to determine creation date for multi "
"run data"
)
......
......@@ -962,6 +962,36 @@ def load_specified_constants(
return const_data, when
def write_constants_fragment(
out_folder: Path,
det_metadata: dict,
caldb_root: Path,
):
"""Record calibration constants metadata to a fragment file.
Args:
out_folder (Path): The output folder to store the fragment file.
det_metadata (dict): A dictionary with the desired detector metadata.
{karabo_da: {constant_name: metadata}}
caldb_root (Path): The calibration database root path for constant files.
"""
metadata = {"retrieved-constants": {}}
for karabo_da, const_metadata in det_metadata.items():
mod_metadata = {}
mod_metadata["constants"] = {
cname: {
"path": str(caldb_root / ccv_metadata["path"]),
"dataset": ccv_metadata["dataset"],
"creation-time": ccv_metadata["begin_validity_at"],
"ccv_id": ccv_metadata["ccv_id"],
} for cname, ccv_metadata in const_metadata.items()
}
mod_metadata["physical-name"] = list(
const_metadata.values())[0]["physical_name"]
metadata["retrieved-constants"][karabo_da] = mod_metadata
CalibrationMetadata(out_folder).add_fragment(metadata)
def write_compressed_frames(
arr: np.ndarray,
ofile: h5py.File,
......
......@@ -112,7 +112,6 @@ notebooks = {
"cluster cores": 32},
},
"CORRECT": {
"pre_notebooks": ["notebooks/pnCCD/pnCCD_retrieve_constants_precorrection.ipynb"],
"notebook": "notebooks/pnCCD/Correct_pnCCD_NBC.ipynb",
"concurrency": {"parameter": "sequences",
"default concurrency": [-1],
......
......@@ -4,6 +4,7 @@ from unittest.mock import patch
import numpy as np
import pytest
import yaml
import zmq
from extra_data import open_run
from iCalibrationDB import Conditions, ConstantMetaData, Constants
......@@ -18,8 +19,9 @@ from cal_tools.tools import (
get_pdu_from_db,
map_seq_files,
module_index_to_qm,
send_to_db,
recursive_update,
send_to_db,
write_constants_fragment,
)
# AGIPD operating conditions.
......@@ -484,3 +486,131 @@ def test_recursive_update():
src = {"a": {"b": 3}, "e": 4}
assert recursive_update(tgt, src) is True
assert tgt == {"a": {"b": 1}, "c": 2, "e": 4}
def test_write_constants_fragment(tmp_path: Path):
"""Test `write_constants_fragment` with jungfrau.
This metadata is from constants used to correct FXE_XAD_JF1M
detector from proposal 900226, run 106.
tmp_path:
tmp_path (pathlib.Path): Temporary directory for file tests.
https://docs.pytest.org/en/7.1.x/how-to/tmp_path.html
"""
jf_metadata = {
"JNGFR01": {
"Offset10Hz": {
"cc_id": 7064,
"cc_name": "jungfrau-Type_Offset10Hz_Jungfrau DefiFE6iJX",
"condition_id": 2060,
"ccv_id": 41876,
"ccv_name": "20200304_152733_sIdx=0",
"path": Path("xfel/cal/jungfrau-type/jungfrau_m233/cal.1583335651.8084984.h5"),
"dataset": "/Jungfrau_M233/Offset10Hz/0",
"begin_validity_at": "2020-03-04T15:16:34.000+01:00",
"end_validity_at": None,
"raw_data_location": "proposal:p900121 runs:136 137 138",
"start_idx": 0,
"end_idx": 0,
"physical_name": "Jungfrau_M233"},
"BadPixelsDark10Hz": {
"cc_id": 7066,
"cc_name": "jungfrau-Type_BadPixelsDark10Hz_Jungfrau DefiFE6iJX",
"condition_id": 2060,
"ccv_id": 41878,
"ccv_name": "20200304_152740_sIdx=0",
"path": Path("xfel/cal/jungfrau-type/jungfrau_m233/cal.1583335658.6813955.h5"),
"dataset": "/Jungfrau_M233/BadPixelsDark10Hz/0",
"begin_validity_at": "2020-03-04T15:16:34.000+01:00",
"end_validity_at": None,
"raw_data_location": "proposal:p900121 runs:136 137 138",
"start_idx": 0,
"end_idx": 0,
"physical_name": "Jungfrau_M233"
}
},
"JNGFR02": {
"Offset10Hz": {
"cc_id": 7067,
"cc_name": "jungfrau-Type_Offset10Hz_Jungfrau DefzgIVHz1",
"condition_id": 2061,
"ccv_id": 41889,
"ccv_name": "20200304_154434_sIdx=0",
"path": Path("xfel/cal/jungfrau-type/jungfrau_m125/cal.1583336672.760199.h5"),
"dataset": "/Jungfrau_M125/Offset10Hz/0",
"begin_validity_at": "2020-03-04T15:16:34.000+01:00",
"end_validity_at": None,
"raw_data_location": "proposal:p900121 runs:136 137 138",
"start_idx": 0,
"end_idx": 0,
"physical_name": "Jungfrau_M125",
},
"BadPixelsDark10Hz": {
"cc_id": 7069,
"cc_name": "jungfrau-Type_BadPixelsDark10Hz_Jungfrau DefzgIVHz1",
"condition_id": 2061,
"ccv_id": 41893,
"ccv_name": "20200304_154441_sIdx=0",
"path": Path("xfel/cal/jungfrau-type/jungfrau_m125/cal.1583336679.5835564.h5"),
"dataset": "/Jungfrau_M125/BadPixelsDark10Hz/0",
"begin_validity_at": "2020-03-04T15:16:34.000+01:00",
"end_validity_at": None,
"raw_data_location": "proposal:p900121 runs:136 137 138",
"start_idx": 0,
"end_idx": 0,
"physical_name": "Jungfrau_M125",
}
}
}
write_constants_fragment(
tmp_path,
jf_metadata,
Path("/gpfs/exfel/d/cal/caldb_store")
)
fragments = list(tmp_path.glob("metadata_frag*yml"))
assert len(fragments) == 1
# Open YAML file
with open(fragments[0], "r") as file:
# Load YAML content into dictionary
yaml_dict = yaml.safe_load(file)
assert yaml_dict == {
"retrieved-constants":{
"JNGFR01": {
"constants": {
"BadPixelsDark10Hz": {
"ccv_id": 41878,
"creation-time": "2020-03-04T15:16:34.000+01:00",
"dataset": "/Jungfrau_M233/BadPixelsDark10Hz/0",
"path": "/gpfs/exfel/d/cal/caldb_store/xfel/cal/jungfrau-type/jungfrau_m233/cal.1583335658.6813955.h5", # noqa
},
"Offset10Hz": {
"ccv_id": 41876,
"creation-time": "2020-03-04T15:16:34.000+01:00",
"dataset": "/Jungfrau_M233/Offset10Hz/0",
"path": "/gpfs/exfel/d/cal/caldb_store/xfel/cal/jungfrau-type/jungfrau_m233/cal.1583335651.8084984.h5", # noqa
},
},
"physical-name": "Jungfrau_M233",
},
"JNGFR02": {
"constants": {
"BadPixelsDark10Hz": {
"ccv_id": 41893,
"creation-time": "2020-03-04T15:16:34.000+01:00",
"dataset": "/Jungfrau_M125/BadPixelsDark10Hz/0",
"path": "/gpfs/exfel/d/cal/caldb_store/xfel/cal/jungfrau-type/jungfrau_m125/cal.1583336679.5835564.h5", # noqa
},
"Offset10Hz": {
"ccv_id": 41889,
"creation-time": "2020-03-04T15:16:34.000+01:00",
"dataset": "/Jungfrau_M125/Offset10Hz/0",
"path": "/gpfs/exfel/d/cal/caldb_store/xfel/cal/jungfrau-type/jungfrau_m125/cal.1583336672.760199.h5", # noqa
},
},
"physical-name": "Jungfrau_M125",
},
}
}
"""Usage: check_run_status.py <proposal> <run>
e.g. check_run_status.py 3279 168
"""
import sqlite3
import sys
from config import webservice as config
proposal, run = sys.argv[1:3]
proposal = proposal.zfill(6)
run = int(run)
conn = sqlite3.connect(config['web-service']['job-db'])
req_cur = conn.execute("""
SELECT req_id, timestamp FROM requests
WHERE proposal = ? AND run = ? AND action = 'CORRECT'
""", (proposal, run))
for req_id, req_time in req_cur:
print(f"Request {req_id} at {req_time}")
for exec_id, karabo_id in conn.execute(
"SELECT exec_id, karabo_id FROM executions WHERE req_id = ?", (req_id,)
):
print(f"- {karabo_id}")
jobs_by_status = {}
for job_id, status in conn.execute(
"SELECT job_id, status FROM slurm_jobs WHERE exec_id = ?", (exec_id,)
):
jobs_by_status.setdefault(status, []).append(job_id)
for status, job_ids in sorted(jobs_by_status.items()):
print(f" {status}:", *job_ids)
print()
......@@ -15,11 +15,11 @@ from kafka.errors import KafkaError
try:
from .config import webservice as config
from .messages import MDC, Errors, MigrationError, Success
from .webservice import init_job_db, init_md_client
from .webservice import init_job_db, init_md_client, time_db_transaction
except ImportError:
from config import webservice as config
from messages import MDC, Errors, MigrationError, Success
from webservice import init_job_db, init_md_client
from webservice import init_job_db, init_md_client, time_db_transaction
log = logging.getLogger(__name__)
......@@ -148,17 +148,18 @@ class JobsMonitor:
Newly completed executions are present with an empty list.
"""
c = self.job_db.cursor()
c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0")
statii = slurm_status()
# Check that slurm is giving proper feedback
if statii is None:
return {}
log.debug(f"SLURM info {statii}")
jobs_to_check = self.job_db.execute(
"SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0"
).fetchall()
ongoing_jobs_by_exn = {}
for r in c.fetchall():
updates = []
for r in jobs_to_check:
log.debug(f"Job in DB before update: %s", tuple(r))
execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], [])
......@@ -173,12 +174,14 @@ class JobsMonitor:
_, runtime, slstatus = slurm_job_status(r['job_id'])
finished = True
c.execute(
updates.append((finished, runtime, slstatus, r['job_id']))
with time_db_transaction(self.job_db, 'Update jobs'):
self.job_db.executemany(
"UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?",
(finished, runtime, slstatus, r['job_id'])
updates
)
self.job_db.commit()
return ongoing_jobs_by_exn
def process_request_still_going(self, req_id, running_jobs_info):
......@@ -216,7 +219,7 @@ class JobsMonitor:
"WHERE exec_id = ?",
(exec_id,)
).fetchone()
with self.job_db:
with time_db_transaction(self.job_db, 'Update execution'):
self.job_db.execute(
"UPDATE executions SET success = ? WHERE exec_id = ?",
(success, exec_id)
......
......@@ -43,3 +43,4 @@ class Success:
REPROD_QUEUED = "SUCCESS: Queued proposal {}, run {} for reproducing previous offline calibration"
DONE_CORRECTION = "SUCCESS: Finished correction: proposal {}. run {}"
DONE_CHAR = "SUCCESS: Finished dark characterization: proposal {}, run {}"
ALREADY_REQUESTED = "SUCCESS: Correction already queued/running for proposal {}, run {}"
import time
import argparse
import ast
import asyncio
......@@ -60,6 +62,7 @@ def init_job_db(config):
action,
timestamp
);
CREATE INDEX IF NOT EXISTS req_by_run ON requests(proposal, run, action);
CREATE TABLE IF NOT EXISTS executions(
exec_id INTEGER PRIMARY KEY,
req_id REFERENCES requests(req_id),
......@@ -301,6 +304,33 @@ def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]:
return cmd
class time_db_transaction:
"""Record time taken to write to the database
Use as a context manager. When leaving the block, the transaction will be
committed (or rolled back, on error), and the time taken logged.
"""
t_start = 0
def __init__(self, conn: sqlite3.Connection, label: str):
self.conn = conn
self.label = label
def __enter__(self):
self.conn.__enter__()
self.t_start = time.perf_counter()
return
def __exit__(self, exc_type, exc_val, exc_tb):
t1 = time.perf_counter()
self.conn.__exit__(exc_type, exc_val, exc_tb)
t2 = time.perf_counter()
t_open = (t1 - self.t_start) * 1000
t_finish = (t2 - t1) * 1000
op = 'commit' if exc_val is None else 'rollback'
logging.debug("DB change (%s): %.1f ms in transaction, %.1f ms %s",
self.label, t_open, t_finish, op)
return False
async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str:
......@@ -332,7 +362,6 @@ async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str:
message = Success.START_CORRECTION.format(proposal, run)
# Save submitted jobs to persistent database.
c = job_db.cursor() # FIXME: asyncio
rstr = stdout.decode()
for r in rstr.split("\n"):
......@@ -342,11 +371,11 @@ async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str:
jobs = []
for jobid in jobids.split(','):
jobs.append((int(jobid.strip()), exec_id))
c.executemany(
"INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)",
jobs
)
job_db.commit()
with time_db_transaction(job_db, 'Insert jobs'):
job_db.executemany(
"INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)",
jobs
)
else: # mode == "sim"
if "DARK" in cmd:
......@@ -934,8 +963,14 @@ class ActionsServer:
proposal = self._normalise_proposal_num(proposal)
pconf_full = self.load_proposal_config(cycle, proposal)
with self.job_db:
cur = self.job_db.execute(
if self.check_unfinished_correction(proposal, int(runnr)):
# A correction is already running for this run
msg = Success.ALREADY_REQUESTED.format(proposal, runnr)
logging.debug(msg)
return msg.encode()
with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute( # 2
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
(rid, proposal, int(runnr), request_time)
)
......@@ -1054,7 +1089,13 @@ class ActionsServer:
request_time = datetime.now()
try:
with self.job_db:
if self.check_unfinished_correction(proposal, int(runnr)):
# A correction is already running for this run
msg = Success.ALREADY_REQUESTED.format(proposal, runnr)
logging.debug(msg)
return msg.encode()
with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute(
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)",
(rid, proposal, int(runnr), request_time.strftime('%Y-%m-%dT%H:%M:%S'))
......@@ -1125,7 +1166,7 @@ class ActionsServer:
f'{reports_dir}/{karabo_id}_RECORRECT_{request_time:%y%m%d_%H%M%S}'
]
with self.job_db:
with time_db_transaction(self.job_db, 'Insert execution'):
cur = self.job_db.execute(
"INSERT INTO executions VALUES (NULL, ?, ?, NULL, ?, NULL)",
(req_id, shlex.join(cmd), karabo_id)
......@@ -1185,7 +1226,7 @@ class ActionsServer:
karabo_id=karabo_id,
)
with self.job_db:
with time_db_transaction(self.job_db, 'Insert request'):
cur = self.job_db.execute(
"INSERT INTO requests VALUES (NULL, ?, ?, ?, 'DARK', ?)",
(rid, proposal, int(wait_runs[-1]), request_time)
......@@ -1340,6 +1381,16 @@ class ActionsServer:
# Helper methods for handlers ---------------------------------------------
def check_unfinished_correction(self, proposal: str, runnr: int):
row = self.job_db.execute(
"SELECT job_id FROM slurm_jobs "
"INNER JOIN executions USING (exec_id) "
"INNER JOIN requests USING (req_id) "
"WHERE proposal = ? AND run = ? AND action = 'CORRECT' "
" AND slurm_jobs.finished = 0", (proposal, runnr)
).fetchone()
return row is not None
@staticmethod
def _normalise_proposal_num(p: str) -> str:
return "{:06d}".format(int(p.strip('p')))
......@@ -1393,8 +1444,8 @@ class ActionsServer:
).split()
cmd = parse_config(cmd, dconfig)
with self.job_db:
cur = self.job_db.execute(
with time_db_transaction(self.job_db, 'Insert execution'):
cur = self.job_db.execute( # 2
"INSERT INTO executions VALUES (NULL, ?, ?, ?, ?, NULL)",
(req_id, shlex.join(cmd), detector, karabo_id)
)
......