Skip to content
Snippets Groups Projects
Commit 5f3dcdaf authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Add placeholders for parallel gain arguments to correct_lpd_frames in LPDMini

parent d6de823a
No related branches found
No related tags found
1 merge request!1113[LPDMini][CORRECT] Add placeholders for parallel gain arguments to correct_lpd_frames in LPDMini
%% Cell type:markdown id: tags:
# LPD Mini Offline Correction #
Author: European XFEL Data Analysis Group
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202321/p004576/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/kluyvert/correct-lpdmini-p4576-r48" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all
karabo_da = [''] # Data aggregators names to correct, e.g. 'LPDMINI00/8', use [''] for all
run = 48 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD_MINI' # Karabo domain for detector.
input_source = '{karabo_id}/DET/0CH0:xtdf' # Input fast data source.
output_source = '{karabo_id}/CORR/0CH0:output' # Output fast data source, empty to use same as input.
control_source = '{karabo_id}/FPGA/FEM' # Control source
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store'
# Operating conditions
bias_voltage_0 = -1 # bias voltage for minis 1, 3, 5, 7; Setting -1 will read the value from files
bias_voltage_1 = -1 # bias voltage for minis 2, 4, 6, 8; Setting -1 will read the value from files
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.3 # Photon energy in keV.
use_cell_order = 'auto' # Whether to use memory cell order as a detector condition (auto = used only when memory cells wrap around)
# Correction parameters
offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant.
# Output options
overwrite = True # set to True if existing data should be overwritten
chunks_data = 1 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
# Parallelization options
sequences_per_node = 1 # Sequence files to process per node
max_nodes = 8 # Maximum number of SLURM jobs to split correction work into
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)
```
%% Cell type:code id: tags:
``` python
from pathlib import Path
from time import perf_counter
import re
import warnings
from IPython.display import Markdown
import numpy as np
import h5py
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%matplotlib inline
from calibration_client.modules import CalibrationConstantVersion
import extra_data as xd
import extra_geom as xg
import pasha as psh
from cal_tools.calcat_interface import CalCatApi
from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition
from cal_tools.tools import CalibrationMetadata, calcat_creation_time
from cal_tools.files import DataFile
from cal_tools.restful_config import calibration_client
```
%% Cell type:markdown id: tags:
# Prepare environment
%% Cell type:code id: tags:
``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source
input_source = input_source.format(karabo_id=karabo_id)
output_source = output_source.format(karabo_id=karabo_id)
control_source = control_source.format(karabo_id=karabo_id)
cal_db_root = Path(cal_db_root)
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all sequences or those selected.
if not sequences or sequences == [-1]:
do_sequence = lambda seq: True
else:
do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id)]
mem_cells = 512
```
%% Cell type:code id: tags:
``` python
if -1 in {bias_voltage_0, bias_voltage_1}:
run_data = xd.RunDirectory(Path(in_folder, f"r{run:04d}"))
if bias_voltage_0 == -1:
bias_voltage_0 = run_data[control_source, 'sensorBiasVoltage0'].as_single_value(atol=5.)
if bias_voltage_1 == -1:
bias_voltage_1 = run_data[control_source, 'sensorBiasVoltage1'].as_single_value(atol=5.)
print(f"Using bias voltages {bias_voltage_0}V & {bias_voltage_1}V")
```
%% Cell type:markdown id: tags:
# Select data to process
%% Cell type:code id: tags:
``` python
calcat_client = calibration_client()
calcat = CalCatApi(client=calcat_client)
# Look up PDUs
detector_id = calcat.detector(karabo_id)['id']
pdus_by_da = calcat.physical_detector_units(detector_id, pdu_snapshot_at=creation_time)
modnos_from_db = set()
if not karabo_da or karabo_da == ['']:
karabo_da = sorted(pdus_by_da.keys())
else:
karabo_da = sorted(set(karabo_da) & pdus_by_da.keys())
print("Modules to correct:", karabo_da)
```
%% Cell type:code id: tags:
``` python
data_to_process = []
data_agg_names = {kda.split('/')[0] for kda in karabo_da}
for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem)
if match[2] not in data_agg_names or not do_sequence(int(match[3])):
continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((inp_path, outp_path))
print('Files to process:')
for inp_path, _ in sorted(data_to_process):
print(inp_path.name)
```
%% Cell type:markdown id: tags:
# Obtain and prepare calibration constants
%% Cell type:code id: tags:
``` python
const_data = {}
retrieved_consts = {} # To be recorded in YAML file
const_load_mp = psh.ProcessContext(num_workers=24)
module_const_shape = (mem_cells, 32, 256, 3) # cells, slow_scan, fast_scan, gain
# Retrieve constants from CALCAT.
dark_calibrations = {
14: 'BadPixelsDark' # np.uint32
}
if offset_corr:
dark_calibrations[1] = 'Offset' # np.float32
base_condition = [
# Bias voltage added below as it differs by module
dict(parameter_name='Feedback capacitor', value=capacitor),
dict(parameter_name='Memory cells', value=mem_cells),
dict(parameter_name='Pixels X', value=256),
dict(parameter_name='Pixels Y', value=32),
]
dark_condition = base_condition.copy()
if use_cell_order != 'never':
# Read the order of memory cells used
raw_data = xd.DataCollection.from_paths([e[0] for e in data_to_process])
cell_ids_pattern_s = make_cell_order_condition(
use_cell_order, get_mem_cell_pattern(raw_data, det_inp_sources)
)
if cell_ids_pattern_s is not None:
print("Memory cells order:", cell_ids_pattern_s)
dark_condition.append(
dict(parameter_name='Memory cell order', value=cell_ids_pattern_s),
)
illuminated_calibrations = {}
if rel_gain:
illuminated_calibrations[44] = 'RelativeGain' # np.float32
if ff_map:
illuminated_calibrations[43] = 'FFMap' # np.float32
illuminated_calibrations[20] = 'BadPixelsFF' # np.uint32
if gain_amp_map:
illuminated_calibrations[42] = 'GainAmpMap' # np.float32
illuminated_condition = base_condition + [
dict(parameter_name='Source Energy', value=photon_energy),
]
```
%% Cell type:code id: tags:
``` python
print('Querying calibration database', end='', flush=True)
start = perf_counter()
for calibrations, condition in [
(dark_calibrations, dark_condition),
(illuminated_calibrations, illuminated_condition)
]:
if not calibrations:
continue
for karabo_da_m in karabo_da:
mod_num = int(karabo_da_m.split('/')[-1])
# mod_num is from 1 to 8, so b_v_0 applies to odd numbers
bias_voltage = bias_voltage_0 if mod_num % 2 == 1 else bias_voltage_1
condition_w_voltage = [dict(parameter_name='Sensor Bias Voltage', value=bias_voltage)] + condition
resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(
calcat_client, karabo_id, list(calibrations.keys()),
{'parameters_conditions_attributes': condition_w_voltage},
karabo_da=karabo_da_m, event_at=creation_time.isoformat()
)
if not resp['success']:
raise RuntimeError(resp)
for ccv in resp['data']:
cc = ccv['calibration_constant']
calibration_name = calibrations[cc['calibration_id']]
mod_const_metadata = retrieved_consts.setdefault(karabo_da_m, {
'physical-name': ccv['physical_detector_unit']['physical_name']
})
mod_const_metadata.setdefault('constants', {})[calibration_name] = {
"path": str(cal_db_root / ccv['path_to_file'] / ccv['file_name']),
"dataset": ccv['data_set_name'],
"creation-time": ccv["begin_validity_at"],
"ccv-id": ccv["id"],
}
dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32
const_data[(mod_num, calibration_name)] = dict(
path=Path(ccv['path_to_file']) / ccv['file_name'],
dataset=ccv['data_set_name'],
data=const_load_mp.alloc(shape=module_const_shape, dtype=dtype)
)
print('.', end='', flush=True)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
lines = []
for karabo_da_m, mod_const_metadata in retrieved_consts.items():
lines.append(f"- {karabo_da_m} ({mod_const_metadata['physical-name']})")
for const_name, d in mod_const_metadata['constants'].items():
url = f"https://in.xfel.eu/calibration/calibration_constant_versions/{d['ccv-id']}"
lines.append(f" - {const_name}: [{d['creation-time']}]({url})")
Markdown('\n'.join(lines))
```
%% Cell type:code id: tags:
``` python
CalibrationMetadata(metadata_folder or out_folder).add_fragment({
"retrieved-constants": retrieved_consts
})
```
%% Cell type:code id: tags:
``` python
def load_constant_dataset(wid, index, const_descr):
ccv_entry = const_data[const_descr]
with h5py.File(cal_db_root / ccv_entry['path'], 'r') as fp:
fp[ccv_entry['dataset'] + '/data'].read_direct(ccv_entry['data'])
print('.', end='', flush=True)
print('Loading calibration data', end='', flush=True)
start = perf_counter()
const_load_mp.map(load_constant_dataset, list(const_data.keys()))
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
module_nums = sorted({n for n, _ in const_data})
nmods = len(module_nums)
const_type_names = {t for _, t in const_data}
const_shape = (mem_cells, 32 * len(module_nums), 256, 3) # cells, slow_scan, fast_scan, gain
const_slices = [slice(i * 32, (i+1) * 32) for i in range(len(module_nums))]
raw_data_slices = [slice((n-1) * 32, n * 32) for n in module_nums]
def _assemble_constant(arr, calibration_name):
for mod_num, sl in zip(module_nums, const_slices):
arr[:, sl] = const_data[mod_num, calibration_name]['data']
offset_const = np.zeros(const_shape, dtype=np.float32)
if offset_corr:
_assemble_constant(offset_const, 'Offset')
mask_const = np.zeros(const_shape, dtype=np.uint32)
_assemble_constant(mask_const, 'BadPixelsDark')
gain_const = np.ones(const_shape, dtype=np.float32)
if rel_gain:
_assemble_constant(gain_const, 'RelativeGain')
if ff_map:
ff_map_gain = np.ones(const_shape, dtype=np.float32)
_assemble_constant(ff_map_gain, 'FFMap')
gain_const *= ff_map_gain
if 'BadPixelsFF' in const_type_names:
badpix_ff = np.zeros(const_shape, dtype=np.uint32)
_assemble_constant(badpix_ff, 'BadPixelsFF')
mask_const |= badpix_ff
if gain_amp_map:
gain_amp_map = np.zeros(const_shape, dtype=np.float32)
_assemble_constant(gain_amp_map, 'GainAmpMap')
gain_const *= gain_amp_map
```
%% Cell type:code id: tags:
``` python
def correct_file(wid, index, work):
inp_path, outp_path = work
start = perf_counter()
dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)
inp_source = dc[input_source]
open_time = perf_counter() - start
# Load raw data for this file.
# Reshaping gets rid of the extra 1-len dimensions without
# mangling the frame axis for an actual frame count of 1.
start = perf_counter()
in_raw = inp_source['image.data'].ndarray()
if in_raw.ndim > 3:
in_raw = in_raw[:, 0]
in_cell = inp_source['image.cellId'].ndarray().reshape(-1)
in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)
read_time = perf_counter() - start
# Slice modules from input data
data_shape = (in_raw.shape[0], nmods * 32, 256)
in_sliced = np.zeros(data_shape, dtype=in_raw.dtype)
for i, sl in enumerate(raw_data_slices):
in_sliced[:, i*32:(i+1)*32] = in_raw[..., sl, :]
output_shape = (data_shape[0], nmods, 32, 256)
# Allocate output arrays.
out_data = np.zeros(in_sliced.shape, dtype=np.float32)
out_gain = np.zeros(in_sliced.shape, dtype=np.uint8)
out_mask = np.zeros(in_sliced.shape, dtype=np.uint32)
start = perf_counter()
correct_lpd_frames(in_sliced, in_cell,
out_data, out_gain, out_mask,
offset_const, gain_const, mask_const,
offset_const, None, gain_const, mask_const,
None, 0.0, 0.0,
num_threads=num_threads_per_worker)
correct_time = perf_counter() - start
image_counts = inp_source['image.data'].data_counts(labelled=False)
start = perf_counter()
if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:
with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(dc.train_ids, from_file=dc.files[0])
outp_file.create_metadata(like=dc, instrument_channels=(f'{output_source}/image',))
outp_source = outp_file.create_instrument_source(output_source)
outp_source.create_index(image=image_counts)
outp_source.create_key('image.cellId', data=in_cell,
chunks=(min(chunks_ids, in_cell.shape[0]),))
outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(min(chunks_ids, in_pulse.shape[0]),))
outp_source.create_key('image.data', data=out_data.reshape(output_shape),
chunks=(min(chunks_data, out_data.shape[0]), 1, 32, 256))
outp_source.create_compressed_key('image.gain', data=out_gain.reshape(output_shape))
outp_source.create_compressed_key('image.mask', data=out_mask.reshape(output_shape))
write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time
frame_rate = in_raw.shape[0] / total_time
m = file_re.match(inp_path.stem)
seq = int(m[3]) if m else -1
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, seq, open_time, read_time, correct_time, write_time, total_time,
in_raw.shape[0], frame_rate))
in_raw = None
in_cell = None
in_pulse = None
out_data = None
out_gain = None
out_mask = None
print('worker\tseq\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
start = perf_counter()
psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)
total_time = perf_counter() - start
print(f'Total time: {total_time:.1f}s')
```
%% Cell type:markdown id: tags:
# Data preview for first train
%% Cell type:code id: tags:
``` python
# This geometry is arbitrary, we just want to show all the modules
geom = xg.LPD_MiniGeometry.from_module_positions(
[(0, i * 40) for i in range(nmods)]
)
output_paths = [outp_path for _, outp_path in data_to_process if outp_path.exists()]
dc = xd.H5File(sorted(output_paths)[0]).select_trains(np.s_[0])
det = dc[output_source.format(karabo_id=karabo_id)]
data = det['image.data'].ndarray()
```
%% Cell type:markdown id: tags:
### Intensity histogram across all cells
%% Cell type:code id: tags:
``` python
left_edge_ratio = 0.01
right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value):
return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1)
pass
```
%% Cell type:markdown id: tags:
### First memory cell
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[0], ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Train average
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data.mean(axis=0), ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Lowest gain stage per pixel
%% Cell type:code id: tags:
``` python
highest_gain_stage = det['image.gain'].ndarray().max(axis=0)
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment