Skip to content
Snippets Groups Projects
Commit 20254323 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Add warning for missing constants in YAML file

parent 5a67eca9
No related branches found
No related tags found
1 merge request!780[LPD] use memory cell order as condition for constants
%% Cell type:markdown id: tags:
# LPD Offline Correction #
Author: European XFEL Data Analysis Group
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/schmidtp/random/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
output_source = '' # Output fast data source, empty to use same as input.
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store'
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
use_cell_order = True # Whether to use memory cell order as a detector condition (not stored for older constants)
# Correction parameters
offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant.
# Output options
overwrite = True # set to True if existing data should be overwritten
chunks_data = 1 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence).
# Parallelization options
sequences_per_node = 1 # Sequence files to process per node
max_nodes = 8 # Maximum number of SLURM jobs to split correction work into
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)
```
%% Cell type:code id: tags:
``` python
from collections import OrderedDict
from pathlib import Path
from time import perf_counter
import gc
import re
import warnings
import numpy as np
import h5py
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%matplotlib inline
from calibration_client import CalibrationClient
from calibration_client.modules import CalibrationConstantVersion
import extra_data as xd
import extra_geom as xg
import pasha as psh
from extra_data.components import LPD1M
from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.lpdlib import get_mem_cell_order
from cal_tools.tools import CalibrationMetadata, calcat_creation_time
from cal_tools.files import DataFile
from cal_tools.restful_config import restful_config
```
%% Cell type:markdown id: tags:
# Prepare environment
%% Cell type:code id: tags:
``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source
cal_db_root = Path(cal_db_root)
metadata = CalibrationMetadata(metadata_folder or out_folder)
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if not karabo_da or karabo_da == ['']:
if not modules or modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
# Pick all sequences or those selected.
if not sequences or sequences == [-1]:
do_sequence = lambda seq: True
else:
do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
```
%% Cell type:markdown id: tags:
# Select data to process
%% Cell type:code id: tags:
``` python
data_to_process = []
for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem)
if match[2] not in karabo_da or not do_sequence(int(match[3])):
continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((match[2], inp_path, outp_path))
print('Files to process:')
for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):
print(f'{data_descr[0]}\t{data_descr[1]}')
```
%% Cell type:markdown id: tags:
# Obtain and prepare calibration constants
%% Cell type:code id: tags:
``` python
# Connect to CalCat.
calcat_config = restful_config['calcat']
client = CalibrationClient(
base_api_url=calcat_config['base-api-url'],
use_oauth2=calcat_config['use-oauth2'],
client_id=calcat_config['user-id'],
client_secret=calcat_config['user-secret'],
user_email=calcat_config['user-email'],
token_url=calcat_config['token-url'],
refresh_url=calcat_config['refresh-url'],
auth_url=calcat_config['auth-url'],
scope='')
```
%% Cell type:code id: tags:
``` python
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
const_yaml = metadata.setdefault("retrieved-constants", {})
```
%% Cell type:code id: tags:
``` python
const_data = {}
const_load_mp = psh.ProcessContext(num_workers=24)
if const_yaml: # Read constants from YAML file.
start = perf_counter()
for da, ccvs in const_yaml.items():
for calibration_name, ccv in ccvs['constants'].items():
if ccv['file-path'] is None:
warnings.warn(f"Missing {calibration_name} for {da}")
continue
dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32
const_data[(da, calibration_name)] = dict(
path=Path(ccv['file-path']),
dataset=ccv['dataset-name'],
data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)
)
else: # Retrieve constants from CALCAT.
dark_calibrations = {
1: 'Offset', # np.float32
14: 'BadPixelsDark' # should be np.uint32, but is np.float64
}
base_condition = [
dict(parameter_id=1, value=bias_voltage), # Sensor bias voltage
dict(parameter_id=7, value=mem_cells), # Memory cells
dict(parameter_id=15, value=capacitor), # Feedback capacitor
dict(parameter_id=13, value=256), # Pixels X
dict(parameter_id=14, value=256), # Pixels Y
]
if use_cell_order:
# Read the order of memory cells used
raw_data = xd.DataCollection.from_paths([e[1] for e in data_to_process])
cell_ids_pattern_s = get_mem_cell_order(raw_data, det_inp_sources)
print("Memory cells order:", cell_ids_pattern_s)
dark_condition = base_condition + [
dict(parameter_id=142, value=cell_ids_pattern_s), # Memory cell order
]
else:
dark_condition = base_condition.copy()
illuminated_calibrations = {
20: 'BadPixelsFF', # np.uint32
42: 'GainAmpMap', # np.float32
43: 'FFMap', # np.float32
44: 'RelativeGain' # np.float32
}
illuminated_condition = base_condition + [
dict(parameter_id=3, value=photon_energy), # Source energy
dict(parameter_id=25, value=category) # category
]
print('Querying calibration database', end='', flush=True)
start = perf_counter()
for calibrations, condition in [
(dark_calibrations, dark_condition),
(illuminated_calibrations, illuminated_condition)
]:
resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(
client, karabo_id, list(calibrations.keys()),
{'parameters_conditions_attributes': condition},
karabo_da='', event_at=creation_time.isoformat(), snapshot_at=None)
if not resp['success']:
raise RuntimeError(resp)
for ccv in resp['data']:
cc = ccv['calibration_constant']
da = ccv['physical_detector_unit']['karabo_da']
calibration_name = calibrations[cc['calibration_id']]
dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32
const_data[(da, calibration_name)] = dict(
path=Path(ccv['path_to_file']) / ccv['file_name'],
dataset=ccv['data_set_name'],
data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)
)
print('.', end='', flush=True)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
def load_constant_dataset(wid, index, const_descr):
ccv_entry = const_data[const_descr]
with h5py.File(cal_db_root / ccv_entry['path'], 'r') as fp:
fp[ccv_entry['dataset'] + '/data'].read_direct(ccv_entry['data'])
print('.', end='', flush=True)
print('Loading calibration data', end='', flush=True)
start = perf_counter()
const_load_mp.map(load_constant_dataset, list(const_data.keys()))
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
# These are intended in order cell, X, Y, gain
ccv_offsets = {}
ccv_gains = {}
ccv_masks = {}
ccv_shape = (mem_cells, 256, 256, 3)
constant_order = {
'Offset': (2, 1, 0, 3),
'BadPixelsDark': (2, 1, 0, 3),
'RelativeGain': (2, 1, 0, 3),
'FFMap': (2, 0, 1, 3),
'BadPixelsFF': (2, 0, 1, 3),
'GainAmpMap': (2, 0, 1, 3),
}
def prepare_constants(wid, index, aggregator):
consts = {calibration_name: entry['data']
for (aggregator_, calibration_name), entry
in const_data.items()
if aggregator == aggregator_}
def _prepare_data(calibration_name, dtype):
return consts[calibration_name] \
.transpose(constant_order[calibration_name]) \
.astype(dtype, copy=True) # Make sure array is contiguous.
if offset_corr and 'Offset' in consts:
ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)
else:
ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)
if 'BadPixelsDark' in consts:
ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)
else:
ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)
if rel_gain and 'RelativeGain' in consts:
ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)
if ff_map and 'FFMap' in consts:
ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)
if 'BadPixelsFF' in consts:
np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),
out=ccv_masks[aggregator])
if gain_amp_map and 'GainAmpMap' in consts:
ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)
print('.', end='', flush=True)
print('Preparing constants', end='', flush=True)
start = perf_counter()
psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
const_data.clear() # Clear raw constants data now to save memory.
gc.collect();
```
%% Cell type:code id: tags:
``` python
def correct_file(wid, index, work):
aggregator, inp_path, outp_path = work
module_index = int(aggregator[-2:])
start = perf_counter()
dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)
inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]
open_time = perf_counter() - start
# Load raw data for this file.
# Reshaping gets rid of the extra 1-len dimensions without
# mangling the frame axis for an actual frame count of 1.
start = perf_counter()
in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256)
in_cell = inp_source['image.cellId'].ndarray().reshape(-1)
in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)
read_time = perf_counter() - start
# Allocate output arrays.
out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)
out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)
out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)
start = perf_counter()
correct_lpd_frames(in_raw, in_cell,
out_data, out_gain, out_mask,
ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],
num_threads=num_threads_per_worker)
correct_time = perf_counter() - start
image_counts = inp_source['image.data'].data_counts(labelled=False)
start = perf_counter()
if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:
outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)
with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(dc.train_ids, from_file=dc.files[0])
outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))
outp_source = outp_file.create_instrument_source(outp_source_name)
outp_source.create_index(image=image_counts)
outp_source.create_key('image.cellId', data=in_cell,
chunks=(min(chunks_ids, in_cell.shape[0]),))
outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(min(chunks_ids, in_pulse.shape[0]),))
outp_source.create_key('image.data', data=out_data,
chunks=(min(chunks_data, out_data.shape[0]), 256, 256))
outp_source.create_compressed_key('image.gain', data=out_gain)
outp_source.create_compressed_key('image.mask', data=out_mask)
write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time
frame_rate = in_raw.shape[0] / total_time
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, aggregator, open_time, read_time, correct_time, write_time, total_time,
in_raw.shape[0], frame_rate))
in_raw = None
in_cell = None
in_pulse = None
out_data = None
out_gain = None
out_mask = None
gc.collect()
print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
start = perf_counter()
psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)
total_time = perf_counter() - start
print(f'Total time: {total_time:.1f}s')
```
%% Cell type:markdown id: tags:
# Data preview for first train
%% Cell type:code id: tags:
``` python
geom = xg.LPD_1MGeometry.from_quad_positions(
[(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])
output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]
dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])
det = LPD1M(dc, detector_name=karabo_id)
data = det.get_array('image.data')
```
%% Cell type:markdown id: tags:
### Intensity histogram across all cells
%% Cell type:code id: tags:
``` python
left_edge_ratio = 0.01
right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value):
return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1)
pass
```
%% Cell type:markdown id: tags:
### First memory cell
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Train average
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Lowest gain stage per pixel
%% Cell type:code id: tags:
``` python
highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
```
%% Cell type:markdown id: tags:
### Create virtual CXI file
%% Cell type:code id: tags:
``` python
if create_virtual_cxi_in:
vcxi_folder = Path(create_virtual_cxi_in.format(
run=run, proposal_folder=str(Path(in_folder).parent)))
vcxi_folder.mkdir(parents=True, exist_ok=True)
def sort_files_by_seq(by_seq, outp_path):
by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path)
return by_seq
from functools import reduce
reduce(sort_files_by_seq, output_paths, output_by_seq := {})
for seq_number, seq_output_paths in output_by_seq.items():
# Create data collection and detector components only for this sequence.
try:
det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4)
except ValueError: # Couldn't find enough data for min_modules
continue
det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi')
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment