Skip to content
Snippets Groups Projects
Commit 98bfadda authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

(fixup) enforce available noise constant for threshold

parent b674cd1c
No related branches found
No related tags found
1 merge request!1097[LPD][CORRECT] Add parallel gain support
%% Cell type:markdown id: tags:
# LPD Offline Correction #
Author: European XFEL Data Analysis Group
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202401/p005436/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/kluyvert/lpd-corr-p5436-r167" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 167 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
output_source = '{karabo_id}/CORR/{module_index}CH0:output' # Output fast data source, empty to use same as input.
control_source = '{karabo_id}/COMP/FEM_MDL_COMP' # Control data source.
xgm_source = 'SA1_XTD2_XGM/DOOCS/MAIN'
xgm_pulse_count_key = 'pulseEnergy.numberOfSa1BunchesActual'
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store' # The calibration database root path to access constant files. For example accessing constants from the test database.
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
use_cell_order = 'auto' # Whether to use memory cell order as a detector condition; auto/always/never
# Correction parameters
offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant.
combine_parallel_gain = True # Combine parallel gain images into a single frame.
threshold_sigma_high = 5.0 # Sigma level for threshold between high and medium gain.
threshold_sigma_mid = 100.0 # Sigma level for threshold between medium and low gain.
# Output options
ignore_no_frames_no_pulses = False # Whether to run without SA1 pulses AND frames.
overwrite = True # set to True if existing data should be overwritten
chunks_data = 1 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence).
# Parallelization options
sequences_per_node = 1 # Sequence files to process per node
max_nodes = 8 # Maximum number of SLURM jobs to split correction work into
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)
```
%% Cell type:code id: tags:
``` python
from pathlib import Path
from time import perf_counter
from warnings import warn
import gc
import re
import numpy as np
import h5py
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%matplotlib inline
import extra_data as xd
import extra_geom as xg
import pasha as psh
from extra_data.components import LPD1M
from cal_tools.calcat_interface2 import CalibrationData, LPDConditions
import cal_tools.restful_config as rest_cfg
from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition
from cal_tools.tools import (
calcat_creation_time,
write_constants_fragment_extracal,
)
from cal_tools.files import DataFile
```
%% Cell type:markdown id: tags:
# Prepare environment
%% Cell type:code id: tags:
``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if karabo_da == ['']:
if modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
# Pick all sequences or those selected.
if not sequences or sequences == [-1]:
do_sequence = lambda seq: True
else:
do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
if use_cell_order not in {'auto', 'always', 'never'}:
raise ValueError("use_cell_order must be auto/always/never")
```
%% Cell type:markdown id: tags:
# Select data to process
%% Cell type:code id: tags:
``` python
data_to_process = []
for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem)
if match[2] not in karabo_da or not do_sequence(int(match[3])):
continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((match[2], inp_path, outp_path))
print('Files to process:')
for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):
print(f'{data_descr[0]}\t{data_descr[1]}')
# Collect the train ID contained in the input LPD files.
inp_lpd_dc = xd.DataCollection.from_paths([x[1] for x in data_to_process])
frame_count = sum([
int(inp_lpd_dc[source, 'image.data'].data_counts(labelled=False).sum())
for source in inp_lpd_dc.all_sources], 0)
if frame_count == 0:
inp_dc = xd.RunDirectory(run_folder) \
.select_trains(xd.by_id[inp_lpd_dc.train_ids])
try:
pulse_count = int(inp_dc[xgm_source, xgm_pulse_count_key].ndarray().sum())
except xd.SourceNameError:
warn(f'Missing XGM source `{xgm_source}`')
pulse_count = None
except xd.PropertyNameError:
warn(f'Missing XGM pulse count key `{xgm_pulse_count_key}`')
pulse_count = None
if pulse_count == 0 and not ignore_no_frames_no_pulses:
warn(f'Affected files contain neither LPD frames nor SA1 pulses '
f'according to {xgm_source}, processing is skipped. If this '
f'incorrect, please contact da-support@xfel.eu')
from sys import exit
exit(0)
elif pulse_count is None:
raise ValueError('Affected files contain no LPD frames and SA1 pulses '
'could not be inferred from XGM data')
else:
raise ValueError('Affected files contain no LPD frames but SA1 pulses')
else:
print(f'Total number of LPD pulses across all modules: {frame_count}')
```
%% Cell type:markdown id: tags:
# Obtain and prepare calibration constants
%% Cell type:code id: tags:
``` python
start = perf_counter()
raw_data = xd.RunDirectory(run_folder)
try:
parallel_gain = bool(raw_data[control_source.format(karabo_id=karabo_id)].run_value('femAsicGainOverride'))
except KeyError:
warn('Missing femAsicGainOverride property FEM control device, assuming auto gain')
parallel_gain = False
print('Parallel gain mode:', parallel_gain)
cell_ids_pattern_s = None
if use_cell_order != 'never':
mem_cell_pattern = get_mem_cell_pattern(raw_data, det_inp_sources)
if parallel_gain:
mem_cell_pattern = mem_cell_pattern[:len(mem_cell_pattern) // 3]
# Read the order of memory cells used
cell_ids_pattern_s = make_cell_order_condition(use_cell_order, mem_cell_pattern)
print("Memory cells order:", cell_ids_pattern_s)
conditions = LPDConditions(
sensor_bias_voltage=bias_voltage,
memory_cells=mem_cells,
feedback_capacitor=capacitor,
source_energy=photon_energy,
memory_cell_order=cell_ids_pattern_s,
parallel_gain=parallel_gain,
category=category,
)
expected_constants = {'Offset', 'BadPixelsDark'}
if rel_gain:
expected_constants.add('RelativeGain')
if ff_map:
expected_constants.update(['FFMap', 'BadPixelsFF'])
if gain_amp_map:
expected_constants.add('GainAmpMap')
if parallel_gain and combine_parallel_gain:
expected_constants.add('Noise')
lpd_consts = CalibrationData.from_condition(
conditions,
calibrations=expected_constants,
detector_name=karabo_id,
event_at=creation_time,
client=rest_cfg.extra_calibration_client(),
).select_modules(
aggregator_names=karabo_da
).require_calibrations(
['Offset']
)
total_time = perf_counter() - start
print(f'Looking up constants {total_time:.1f}s')
lpd_consts.summary_table()
```
%% Cell type:code id: tags:
``` python
# Validate the constants availability and raise/warn accordingly.
if not lpd_consts.aggregator_names: # Offset was required above
raise Exception("Could not find offset constants for any modules, will not correct data.")
for mod in karabo_da.copy():
if mod not in lpd_consts["Offset"].aggregator_names:
warn(f"Offset constant is not available to correct {mod}.")
karabo_da.remove(mod)
missing_constants = {c for c in expected_constants
if (c not in lpd_consts) or (mod not in lpd_consts[c].aggregator_names)}
if missing_constants:
warn(f"Constants {sorted(missing_constants)} were not retrieved for {mod}.")
# Remove skipped correction modules from data_to_process
data_to_process = [(mod, in_f, out_f) for mod, in_f, out_f in data_to_process if mod in karabo_da]
```
%% Cell type:code id: tags:
``` python
# write constants metadata to fragment YAML
write_constants_fragment_extracal(
out_folder=(metadata_folder or out_folder),
calib_data=lpd_consts,
caldb_root=cal_db_root,
)
# Load constants data for all constants
start = perf_counter()
const_data = {kda: {} for kda in lpd_consts.aggregator_names}
for cname, multimodconst in lpd_consts.items():
arr = multimodconst.ndarray(cal_db_root, parallel=8)
for i, kda in enumerate(multimodconst.aggregator_names):
const_data[kda][cname] = arr[i]
total_time = perf_counter() - start
print(f'Loading constants {total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
# These are intended in order cell, X, Y, gain
ccv_offsets = {}
ccv_noise = {}
ccv_gains = {}
ccv_masks = {}
ccv_shape = (mem_cells, 256, 256, 3)
constant_order = {
'Offset': (2, 1, 0, 3),
'Noise': (2, 1, 0, 3),
'BadPixelsDark': (2, 1, 0, 3),
'RelativeGain': (2, 0, 1, 3),
'FFMap': (2, 0, 1, 3),
'BadPixelsFF': (2, 0, 1, 3),
'GainAmpMap': (2, 0, 1, 3),
}
def prepare_constants(wid, index, aggregator):
consts = const_data.get(aggregator, {})
def _prepare_data(calibration_name, dtype):
# Some old BadPixels constants have <f8 dtype.
# Convert nan to float 0 to avoid having 2147483648 after
# converting float64 to uint32.
if "BadPixels" in calibration_name and consts[calibration_name].dtype != np.uint32:
consts[calibration_name] = np.nan_to_num(
consts[calibration_name], nan=0.0)
return consts[calibration_name] \
.transpose(constant_order[calibration_name]) \
.astype(dtype, copy=True) # Make sure array is contiguous.
if offset_corr and 'Offset' in consts:
ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)
else:
ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)
if parallel_gain and 'Noise' in consts:
ccv_noise[aggregator] = _prepare_data('Noise', np.float32)
if parallel_gain and combine_parallel_gain:
if 'Noise' in consts:
ccv_noise[aggregator] = _prepare_data('Noise', np.float32)
else:
raise RuntimeError('parallel gain combination requires available noise constant')
else:
ccv_noise[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_noise[aggregator] = None
if 'BadPixelsDark' in consts:
ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)
else:
ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)
if 'RelativeGain' in consts:
ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)
if 'FFMap' in consts:
ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)
if 'BadPixelsFF' in consts:
np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),
out=ccv_masks[aggregator])
if 'GainAmpMap' in consts:
ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)
start = perf_counter()
psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)
total_time = perf_counter() - start
print(f'Preparing constants {total_time:.1f}s')
const_data.clear() # Clear raw constants data now to save memory.
gc.collect();
```
%% Cell type:code id: tags:
``` python
def iter_count_slices(offset_counts, len_counts=None, step=None):
"""Generate slices to index another array based on counts.
Given an array of counts C dividing another flat array A into
different parts such that C.sum() == A.size, this generates the
necessary slices to iterate over each part defined by C:
```
A = np.arange(15)
C = np.array([5, 5, 5])
list(iter_count_slices(C))
> [slice(0, 5, None), slice(5, 10, None), slice(10, 15, None)]
```
The counts used to compute the slice starts, i.e. the offsets
into A, can be chosen independently of the length of each slice:
```
list(iter_count_slices([15, 15, 15], [5, 5, 5]))
> [slice(15, 20, None), slice(30, 35, None), slice(45, 50, None)]
```
Args:
offset_counts (ArrayLike): Counts used to compute slice starts.
len_counts (ArrayLike, optional): Counts used to compute slice
lengths, offset_counts used if omitted.
step (int, optional): Slice step, None if omitted.
Yields:
s (slice): Count-based slices for indexing.
"""
offset_counts = np.asarray(offset_counts)
if offset_counts.size == 0:
return
elif len_counts is None:
len_counts = offset_counts
else:
len_counts = np.asarray(len_counts)
if offset_counts.size != len_counts.size:
raise ValueError('size of count arrays must match')
yield np.s_[0:len_counts[0]:step]
for offset, count in zip(np.cumsum(offset_counts)[:-1], len_counts[1:]):
yield np.s_[offset:offset+count:step]
def correct_file(wid, index, work):
aggregator, inp_path, outp_path = work
module_index = int(aggregator[-2:])
start = perf_counter()
dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)
inp_source_name = input_source.format(karabo_id=karabo_id, module_index=module_index)
inp_source = dc[inp_source_name]
open_time = perf_counter() - start
# Load raw data for this file.
# Reshaping gets rid of the extra 1-len dimensions without
# mangling the frame axis for an actual frame count of 1.
start = perf_counter()
in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256)
in_cell = inp_source['image.cellId'].ndarray().reshape(-1)
in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)
frame_counts = inp_source['image.data'].data_counts(labelled=False).astype(np.int32)
read_time = perf_counter() - start
parallel_gain_indices = None
if parallel_gain:
assert (frame_counts % 3 == 0).all(), 'frame count not divisible by 3 in parallel gain mode;
actual_frame_counts = frame_counts // 3
# Indices map where to find each of the high/medium/low gain images for each actual
# frame event.
parallel_gain_indices = np.zeros((actual_frame_counts.sum(), 3), dtype=np.int32)
# Build indices for high gain as a range in each train, running from the cumulative sum
# of apparent frames from all trains before to the actual number of frames in this train.
np.concatenate([np.r_[s] for s in iter_count_slices(frame_counts, actual_frame_counts)],
out=parallel_gain_indices[:, 0])
# The delta between the gain stages is the number of actual frames.
gain_index_deltas = np.repeat(actual_frame_counts, actual_frame_counts)
# Build indices for medium gain and high gain by adding the gain index deltas in between
# each of them.
np.add(parallel_gain_indices[:, 0], gain_index_deltas, out=parallel_gain_indices[:, 1])
np.add(parallel_gain_indices[:, 1], gain_index_deltas, out=parallel_gain_indices[:, 2])
assert parallel_gain_indices.max() <= in_raw.shape[0], 'gain image indices exceed raw data size'
# Pick cell and pulse IDs from high gain. This is also done if frames are not combined
# in order to correct corrupt tables in medium and low gain, and if needed brought back
# to the original shape further below.
in_cell = np.take(in_cell, parallel_gain_indices[:, 0])
in_pulse = np.take(in_pulse, parallel_gain_indices[:, 0])
if combine_parallel_gain:
# Replace supposed frame counts by actual frame counts.
frame_counts = actual_frame_counts
else:
# Replicate corrected cell and pulse IDs from high gain to other gains.
in_cell = np.concatenate([
np.tile(in_cell[s], 3) for s
in iter_count_slices(actual_frame_counts)])
in_pulse = np.concatenate([
np.tile(in_pulse[s], 3) for s
in iter_count_slices(actual_frame_counts)])
# Disable gain indices to not combine.
parallel_gain_indices = None
# Allocate output arrays.
num_frames = frame_counts.sum()
out_data = np.zeros((num_frames, 256, 256), dtype=np.float32)
out_gain = np.zeros((num_frames, 256, 256), dtype=np.uint8)
out_mask = np.zeros((num_frames, 256, 256), dtype=np.uint32)
start = perf_counter()
correct_lpd_frames(in_raw, in_cell,
out_data, out_gain, out_mask,
ccv_offsets[aggregator], ccv_noise[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],
parallel_gain_indices, threshold_sigma_high, threshold_sigma_mid,
num_threads=16)
correct_time = perf_counter() - start
start = perf_counter()
if (not outp_path.exists() or overwrite) and num_frames > 0:
outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)
with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(dc.train_ids, from_file=dc.files[0])
outp_file.create_metadata(like=dc, instrument_channels=sorted({
f'{outp_source_name}/image', f'{inp_source_name}/image'
}))
outp_source = outp_file.create_instrument_source(outp_source_name)
outp_source.create_index(image=frame_counts)
outp_source.create_key('image.cellId', data=in_cell,
chunks=(min(chunks_ids, in_cell.shape[0]),))
outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(min(chunks_ids, in_pulse.shape[0]),))
outp_source.create_key('image.data', data=out_data,
chunks=(min(chunks_data, out_data.shape[0]), 256, 256))
outp_source.create_compressed_key('image.gain', data=out_gain)
outp_source.create_compressed_key('image.mask', data=out_mask)
if output_source != input_source:
outp_file.create_legacy_source(inp_source_name, outp_source_name)
write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time
frame_rate = num_frames / total_time
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, aggregator, open_time, read_time, correct_time, write_time, total_time,
num_frames, frame_rate))
worker_frame_counts[wid] += num_frames
in_raw = None
in_cell = None
in_pulse = None
out_data = None
out_gain = None
out_mask = None
gc.collect()
print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
ctx = psh.ProcessContext(num_workers=num_workers)
worker_frame_counts = ctx.alloc(shape=(), dtype=np.int32, per_worker=True)
start = perf_counter()
ctx.map(correct_file, data_to_process)
total_time = perf_counter() - start
total_frames = worker_frame_counts.sum()
print(f'Total time: {total_time:.1f}s, Mean rate: {(total_frames / total_time):.1f}s⁻¹')
```
%% Cell type:markdown id: tags:
# Data preview for first train
%% Cell type:code id: tags:
``` python
geom = xg.LPD_1MGeometry.from_quad_positions(
[(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])
output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]
if not output_paths:
warn('Data preview is skipped as there are no existing output paths')
from sys import exit
exit(0)
dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])
det = LPD1M(dc, detector_name=karabo_id)
data = det.get_array('image.data', unstack_pulses=False)
```
%% Cell type:markdown id: tags:
### Intensity histogram across all cells
%% Cell type:code id: tags:
``` python
left_edge_ratio = 0.01
right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value):
return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1)
pass
```
%% Cell type:markdown id: tags:
### First memory cell
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0], ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Train average
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data.mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Lowest gain stage per pixel
%% Cell type:code id: tags:
``` python
highest_gain_stage = det.get_array('image.gain', unstack_pulses=False).max(axis=1)
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
```
%% Cell type:markdown id: tags:
### Create virtual CXI file
%% Cell type:code id: tags:
``` python
if create_virtual_cxi_in and not (parallel_gain and not combine_parallel_gain):
vcxi_folder = Path(create_virtual_cxi_in.format(
run=run, proposal_folder=str(Path(in_folder).parent)))
vcxi_folder.mkdir(parents=True, exist_ok=True)
def sort_files_by_seq(by_seq, outp_path):
by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path)
return by_seq
from functools import reduce
reduce(sort_files_by_seq, output_paths, output_by_seq := {})
for seq_number, seq_output_paths in output_by_seq.items():
# Create data collection and detector components only for this sequence.
try:
det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4)
except ValueError: # Couldn't find enough data for min_modules
continue
det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi')
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment