Skip to content
Snippets Groups Projects
Commit 75853bb7 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

remove unused variables

parent 6950fc59
No related branches found
No related tags found
1 merge request!779[LPD][CORRECT] Using CALCAT interface
%% Cell type:markdown id: tags:
# LPD Offline Correction #
Author: European XFEL Data Analysis Group
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/schmidtp/random/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
output_source = '' # Output fast data source, empty to use same as input.
xgm_source = 'SA1_XTD2_XGM/DOOCS/MAIN'
xgm_pulse_count_key = 'pulseEnergy.numberOfSa1BunchesActual'
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store'
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
use_cell_order = 'auto' # Whether to use memory cell order as a detector condition; auto/always/never
# Correction parameters
offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant.
# Output options
ignore_no_frames_no_pulses = False # Whether to run without SA1 pulses AND frames.
overwrite = True # set to True if existing data should be overwritten
chunks_data = 1 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence).
# Parallelization options
sequences_per_node = 1 # Sequence files to process per node
max_nodes = 8 # Maximum number of SLURM jobs to split correction work into
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)
```
%% Cell type:code id: tags:
``` python
from logging import warning
from pathlib import Path
from time import perf_counter
import gc
import re
import numpy as np
import h5py
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%matplotlib inline
import extra_data as xd
import extra_geom as xg
import pasha as psh
from extra_data.components import LPD1M
import cal_tools.restful_config as rest_cfg
from cal_tools.calcat_interface import LPD_CalibrationData
from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition
from cal_tools.tools import CalibrationMetadata, calcat_creation_time
from cal_tools.files import DataFile
```
%% Cell type:markdown id: tags:
# Prepare environment
%% Cell type:code id: tags:
``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source
cal_db_root = Path(cal_db_root)
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata.setdefault("retrieved-constants", {})
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if karabo_da == ['']:
if modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
# Pick all sequences or those selected.
if not sequences or sequences == [-1]:
do_sequence = lambda seq: True
else:
do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
if use_cell_order not in {'auto', 'always', 'never'}:
raise ValueError("use_cell_order must be auto/always/never")
```
%% Cell type:markdown id: tags:
# Select data to process
%% Cell type:code id: tags:
``` python
data_to_process = []
for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem)
if match[2] not in karabo_da or not do_sequence(int(match[3])):
continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((match[2], inp_path, outp_path))
print('Files to process:')
for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):
print(f'{data_descr[0]}\t{data_descr[1]}')
# Collect the train ID contained in the input LPD files.
inp_lpd_dc = xd.DataCollection.from_paths([x[1] for x in data_to_process])
frame_count = sum([
int(inp_lpd_dc[source, 'image.data'].data_counts(labelled=False).sum())
for source in inp_lpd_dc.all_sources], 0)
if frame_count == 0:
inp_dc = xd.RunDirectory(run_folder) \
.select_trains(xd.by_id[inp_lpd_dc.train_ids])
try:
pulse_count = int(inp_dc[xgm_source, xgm_pulse_count_key].ndarray().sum())
except xd.SourceNameError:
warning(f'Missing XGM source `{xgm_source}`')
pulse_count = None
except xd.PropertyNameError:
warning(f'Missing XGM pulse count key `{xgm_pulse_count_key}`')
pulse_count = None
if pulse_count == 0 and not ignore_no_frames_no_pulses:
warning(f'Affected files contain neither LPD frames nor SA1 pulses '
f'according to {xgm_source}, processing is skipped. If this '
f'incorrect, please contact da-support@xfel.eu')
from sys import exit
exit(0)
elif pulse_count is None:
raise ValueError('Affected files contain no LPD frames and SA1 pulses '
'could not be inferred from XGM data')
else:
raise ValueError('Affected files contain no LPD frames but SA1 pulses')
else:
print(f'Total number of LPD pulses across all modules: {frame_count}')
```
%% Cell type:markdown id: tags:
# Obtain and prepare calibration constants
%% Cell type:code id: tags:
``` python
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
const_yaml = metadata.setdefault("retrieved-constants", {})
```
%% Cell type:code id: tags:
``` python
const_data = dict() # {"ModuleName": {"ConstantName": ndarray}}
start = perf_counter()
if const_yaml:
const_load_mp = psh.ProcessContext(num_workers=24)
for mod, constants in const_yaml.items():
const_data[mod] = {} # An empty dictionary stays for a module with no constants.
for cname, cmdata in constants["constants"].items():
const_data[mod][cname] = const_load_mp.alloc( # TODO: MAKE SURE WE ERROR OUT FOR MISSING OFFSET
shape=(256, 256, mem_cells, 3), # All LPD constants have the same shape.
dtype=np.uint32 if cname.startswith('BadPixels') else np.float32
)
def load_constant_dataset(wid, index, mod):
for cname, mdata in const_yaml[mod]["constants"].items():
with h5py.File(mdata["path"], "r") as cf:
cf[f"{mdata['dataset']}/data"].read_direct(const_data[mod][cname])
const_load_mp.map(load_constant_dataset, karabo_da)
else:
cell_ids_pattern_s = None
if use_cell_order != 'never':
# Read the order of memory cells used
raw_data = xd.DataCollection.from_paths([e[1] for e in data_to_process])
cell_ids_pattern_s = make_cell_order_condition(
use_cell_order, get_mem_cell_pattern(raw_data, det_inp_sources)
)
print("Memory cells order:", cell_ids_pattern_s)
lpd_cal = LPD_CalibrationData(
detector_name=karabo_id,
modules=karabo_da,
sensor_bias_voltage=bias_voltage,
memory_cells=mem_cells,
feedback_capacitor=capacitor,
source_energy=photon_energy,
memory_cell_order=cell_ids_pattern_s,
category=category,
event_at=creation_time,
client=rest_cfg.calibration_client(),
)
const_data = lpd_cal.ndarray_map(
[
"Offset", "BadPixelsDark",
"BadPixelsFF", "GainAmpMap",
"FFMap", "RelativeGain",
]
)
```
%% Cell type:code id: tags:
``` python
# Validate the constants availability and raise/warn accordingly.
for mod, calibrations in const_data.items():
missing_offset = {"Offset"} - set(calibrations)
warn_missing_constants = {
"BadPixelsDark", "BadPixelsFF", "GainAmpMap", "FFMap", "RelativeGain"} - set(calibrations)
if missing_offset:
warning(f"Offset constant is not available to correct {mod}") # noqa
karabo_da.remove(mod)
if warn_missing_constants:
warning(f"Gain constants {missing_gain_constants} were not retrieved for {mod}")
if (
calibrations.get("BadPixelsDark") and
calibrations["BadPixelsDark"].dtype != np.uint32
): # Old LPD constants are stored as float32.
calibrations["BadPixelsDark"] = calibrations["BadPixelsDark"].astype(np.uint32, copy=False)
if not karabo_da: # Offsets are missing for all modules.
raise Exception("Could not find offset constants for all modules, will not correct data.")
# Remove skipped correction modules from data_to_process
data_to_process = [(mod, in_f, out_f) for mod, in_f, out_f in data_to_process if mod in [karabo_da]]
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
# These are intended in order cell, X, Y, gain
ccv_offsets = {}
ccv_gains = {}
ccv_masks = {}
ccv_shape = (mem_cells, 256, 256, 3)
constant_order = {
'Offset': (2, 1, 0, 3),
'BadPixelsDark': (2, 1, 0, 3),
'RelativeGain': (2, 0, 1, 3),
'FFMap': (2, 0, 1, 3),
'BadPixelsFF': (2, 0, 1, 3),
'GainAmpMap': (2, 0, 1, 3),
}
def prepare_constants(wid, index, aggregator):
consts = const_data.get(aggregator, {})
def _prepare_data(calibration_name, dtype):
return consts[calibration_name] \
.transpose(constant_order[calibration_name]) \
.astype(dtype, copy=True) # Make sure array is contiguous.
if offset_corr and 'Offset' in consts:
ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)
else:
ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)
if 'BadPixelsDark' in consts:
ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)
else:
ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)
if rel_gain and 'RelativeGain' in consts:
ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)
if ff_map and 'FFMap' in consts:
ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)
if 'BadPixelsFF' in consts:
np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),
out=ccv_masks[aggregator])
if gain_amp_map and 'GainAmpMap' in consts:
ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)
print('.', end='', flush=True)
print('Preparing constants', end='', flush=True)
start = perf_counter()
psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
const_data.clear() # Clear raw constants data now to save memory.
gc.collect();
```
%% Cell type:code id: tags:
``` python
def correct_file(wid, index, work):
aggregator, inp_path, outp_path = work
module_index = int(aggregator[-2:])
start = perf_counter()
dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)
inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]
open_time = perf_counter() - start
# Load raw data for this file.
# Reshaping gets rid of the extra 1-len dimensions without
# mangling the frame axis for an actual frame count of 1.
start = perf_counter()
in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256)
in_cell = inp_source['image.cellId'].ndarray().reshape(-1)
in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)
read_time = perf_counter() - start
# Allocate output arrays.
out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)
out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)
out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)
start = perf_counter()
correct_lpd_frames(in_raw, in_cell,
out_data, out_gain, out_mask,
ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],
num_threads=num_threads_per_worker)
correct_time = perf_counter() - start
image_counts = inp_source['image.data'].data_counts(labelled=False)
start = perf_counter()
if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:
outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)
with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(dc.train_ids, from_file=dc.files[0])
outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))
outp_source = outp_file.create_instrument_source(outp_source_name)
outp_source.create_index(image=image_counts)
outp_source.create_key('image.cellId', data=in_cell,
chunks=(min(chunks_ids, in_cell.shape[0]),))
outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(min(chunks_ids, in_pulse.shape[0]),))
outp_source.create_key('image.data', data=out_data,
chunks=(min(chunks_data, out_data.shape[0]), 256, 256))
outp_source.create_compressed_key('image.gain', data=out_gain)
outp_source.create_compressed_key('image.mask', data=out_mask)
write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time
frame_rate = in_raw.shape[0] / total_time
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, aggregator, open_time, read_time, correct_time, write_time, total_time,
in_raw.shape[0], frame_rate))
in_raw = None
in_cell = None
in_pulse = None
out_data = None
out_gain = None
out_mask = None
gc.collect()
print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
start = perf_counter()
psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)
total_time = perf_counter() - start
print(f'Total time: {total_time:.1f}s')
```
%% Cell type:markdown id: tags:
# Data preview for first train
%% Cell type:code id: tags:
``` python
geom = xg.LPD_1MGeometry.from_quad_positions(
[(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])
output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]
if not output_paths:
warning('Data preview is skipped as there are no existing output paths')
from sys import exit
exit(0)
dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])
det = LPD1M(dc, detector_name=karabo_id)
data = det.get_array('image.data')
```
%% Cell type:markdown id: tags:
### Intensity histogram across all cells
%% Cell type:code id: tags:
``` python
left_edge_ratio = 0.01
right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value):
return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1)
pass
```
%% Cell type:markdown id: tags:
### First memory cell
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Train average
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Lowest gain stage per pixel
%% Cell type:code id: tags:
``` python
highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
```
%% Cell type:markdown id: tags:
### Create virtual CXI file
%% Cell type:code id: tags:
``` python
if create_virtual_cxi_in:
vcxi_folder = Path(create_virtual_cxi_in.format(
run=run, proposal_folder=str(Path(in_folder).parent)))
vcxi_folder.mkdir(parents=True, exist_ok=True)
def sort_files_by_seq(by_seq, outp_path):
by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path)
return by_seq
from functools import reduce
reduce(sort_files_by_seq, output_paths, output_by_seq := {})
for seq_number, seq_output_paths in output_by_seq.items():
# Create data collection and detector components only for this sequence.
try:
det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4)
except ValueError: # Couldn't find enough data for min_modules
continue
det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi')
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment