Skip to content
Snippets Groups Projects
Commit e244204c authored by Karim Ahmed's avatar Karim Ahmed
Browse files

write constants fragment

parent 6181cd30
No related branches found
No related tags found
1 merge request!867[LPD][Correct] Use Fragment file and remove PreCorrection notebook
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# LPD Offline Correction # # LPD Offline Correction #
Author: European XFEL Data Analysis Group Author: European XFEL Data Analysis Group
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Input parameters # Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/schmidtp/random/LPD_test" # the folder to output to, required out_folder = "/gpfs/exfel/data/scratch/schmidtp/random/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate. metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all sequences = [-1] # Sequences to correct, use [-1] for all
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # run to process, required run = 10 # run to process, required
# Source parameters # Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector. karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source. input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
output_source = '' # Output fast data source, empty to use same as input. output_source = '' # Output fast data source, empty to use same as input.
xgm_source = 'SA1_XTD2_XGM/DOOCS/MAIN' xgm_source = 'SA1_XTD2_XGM/DOOCS/MAIN'
xgm_pulse_count_key = 'pulseEnergy.numberOfSa1BunchesActual' xgm_pulse_count_key = 'pulseEnergy.numberOfSa1BunchesActual'
# CalCat parameters # CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41 creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
cal_db_interface = '' # Not needed, compatibility with current webservice. cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice. cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store' # The calibration database root path to access constant files. For example accessing constants from the test database. cal_db_root = '/gpfs/exfel/d/cal/caldb_store' # The calibration database root path to access constant files. For example accessing constants from the test database.
# Operating conditions # Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells. mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage. bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV. photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame. category = 0 # Whom to blame.
use_cell_order = 'auto' # Whether to use memory cell order as a detector condition; auto/always/never use_cell_order = 'auto' # Whether to use memory cell order as a detector condition; auto/always/never
# Correction parameters # Correction parameters
offset_corr = True # Offset correction. offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant. rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant. ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant. gain_amp_map = True # Gain correction based on GainAmpMap constant.
# Output options # Output options
ignore_no_frames_no_pulses = False # Whether to run without SA1 pulses AND frames. ignore_no_frames_no_pulses = False # Whether to run without SA1 pulses AND frames.
overwrite = True # set to True if existing data should be overwritten overwrite = True # set to True if existing data should be overwritten
chunks_data = 1 # HDF chunk size for pixel data in number of frames. chunks_data = 1 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets. chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence). create_virtual_cxi_in = '' # Folder to create virtual CXI files in (for each sequence).
# Parallelization options # Parallelization options
sequences_per_node = 1 # Sequence files to process per node sequences_per_node = 1 # Sequence files to process per node
max_nodes = 8 # Maximum number of SLURM jobs to split correction work into max_nodes = 8 # Maximum number of SLURM jobs to split correction work into
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G. num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker. num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes): def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes):
from xfel_calibrate.calibrate import balance_sequences as bs from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes) return bs(in_folder, run, sequences, sequences_per_node, karabo_da, max_nodes=max_nodes)
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
from logging import warning from logging import warning
from pathlib import Path from pathlib import Path
from time import perf_counter from time import perf_counter
import gc import gc
import re import re
import numpy as np import numpy as np
import h5py import h5py
import matplotlib import matplotlib
matplotlib.use('agg') matplotlib.use('agg')
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
%matplotlib inline %matplotlib inline
import extra_data as xd import extra_data as xd
import extra_geom as xg import extra_geom as xg
import pasha as psh import pasha as psh
from extra_data.components import LPD1M from extra_data.components import LPD1M
import cal_tools.restful_config as rest_cfg import cal_tools.restful_config as rest_cfg
from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData
from cal_tools.lpdalgs import correct_lpd_frames from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition
from cal_tools.tools import CalibrationMetadata, calcat_creation_time from cal_tools.tools import (
CalibrationMetadata,
calcat_creation_time,
write_constants_fragment,
)
from cal_tools.files import DataFile from cal_tools.files import DataFile
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Prepare environment # Prepare environment
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}' run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder) out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True) out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source output_source = output_source or input_source
creation_time = calcat_creation_time(in_folder, run, creation_time) creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time') print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected. # Pick all modules/aggregators or those selected.
if karabo_da == ['']: if karabo_da == ['']:
if modules == [-1]: if modules == [-1]:
modules = list(range(16)) modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules] karabo_da = [f'LPD{i:02d}' for i in modules]
else: else:
modules = [int(x[-2:]) for x in karabo_da] modules = [int(x[-2:]) for x in karabo_da]
# Pick all sequences or those selected. # Pick all sequences or those selected.
if not sequences or sequences == [-1]: if not sequences or sequences == [-1]:
do_sequence = lambda seq: True do_sequence = lambda seq: True
else: else:
do_sequence = [int(x) for x in sequences].__contains__ do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources. # List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da] det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
if use_cell_order not in {'auto', 'always', 'never'}: if use_cell_order not in {'auto', 'always', 'never'}:
raise ValueError("use_cell_order must be auto/always/never") raise ValueError("use_cell_order must be auto/always/never")
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Select data to process # Select data to process
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
data_to_process = [] data_to_process = []
for inp_path in run_folder.glob('RAW-*.h5'): for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem) match = file_re.match(inp_path.stem)
if match[2] not in karabo_da or not do_sequence(int(match[3])): if match[2] not in karabo_da or not do_sequence(int(match[3])):
continue continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format( outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3])) run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((match[2], inp_path, outp_path)) data_to_process.append((match[2], inp_path, outp_path))
print('Files to process:') print('Files to process:')
for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'): for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):
print(f'{data_descr[0]}\t{data_descr[1]}') print(f'{data_descr[0]}\t{data_descr[1]}')
# Collect the train ID contained in the input LPD files. # Collect the train ID contained in the input LPD files.
inp_lpd_dc = xd.DataCollection.from_paths([x[1] for x in data_to_process]) inp_lpd_dc = xd.DataCollection.from_paths([x[1] for x in data_to_process])
frame_count = sum([ frame_count = sum([
int(inp_lpd_dc[source, 'image.data'].data_counts(labelled=False).sum()) int(inp_lpd_dc[source, 'image.data'].data_counts(labelled=False).sum())
for source in inp_lpd_dc.all_sources], 0) for source in inp_lpd_dc.all_sources], 0)
if frame_count == 0: if frame_count == 0:
inp_dc = xd.RunDirectory(run_folder) \ inp_dc = xd.RunDirectory(run_folder) \
.select_trains(xd.by_id[inp_lpd_dc.train_ids]) .select_trains(xd.by_id[inp_lpd_dc.train_ids])
try: try:
pulse_count = int(inp_dc[xgm_source, xgm_pulse_count_key].ndarray().sum()) pulse_count = int(inp_dc[xgm_source, xgm_pulse_count_key].ndarray().sum())
except xd.SourceNameError: except xd.SourceNameError:
warning(f'Missing XGM source `{xgm_source}`') warning(f'Missing XGM source `{xgm_source}`')
pulse_count = None pulse_count = None
except xd.PropertyNameError: except xd.PropertyNameError:
warning(f'Missing XGM pulse count key `{xgm_pulse_count_key}`') warning(f'Missing XGM pulse count key `{xgm_pulse_count_key}`')
pulse_count = None pulse_count = None
if pulse_count == 0 and not ignore_no_frames_no_pulses: if pulse_count == 0 and not ignore_no_frames_no_pulses:
warning(f'Affected files contain neither LPD frames nor SA1 pulses ' warning(f'Affected files contain neither LPD frames nor SA1 pulses '
f'according to {xgm_source}, processing is skipped. If this ' f'according to {xgm_source}, processing is skipped. If this '
f'incorrect, please contact da-support@xfel.eu') f'incorrect, please contact da-support@xfel.eu')
from sys import exit from sys import exit
exit(0) exit(0)
elif pulse_count is None: elif pulse_count is None:
raise ValueError('Affected files contain no LPD frames and SA1 pulses ' raise ValueError('Affected files contain no LPD frames and SA1 pulses '
'could not be inferred from XGM data') 'could not be inferred from XGM data')
else: else:
raise ValueError('Affected files contain no LPD frames but SA1 pulses') raise ValueError('Affected files contain no LPD frames but SA1 pulses')
else: else:
print(f'Total number of LPD pulses across all modules: {frame_count}') print(f'Total number of LPD pulses across all modules: {frame_count}')
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Obtain and prepare calibration constants # Obtain and prepare calibration constants
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
const_yaml = metadata.setdefault("retrieved-constants", {})
```
%% Cell type:code id: tags:
``` python
const_data = dict() # {"ModuleName": {"ConstantName": ndarray}}
start = perf_counter() start = perf_counter()
if const_yaml:
const_load_mp = psh.ProcessContext(num_workers=24)
for mod, constants in const_yaml.items():
const_data[mod] = {} # An empty dictionary stays for a module with no constants.
for cname, cmdata in constants["constants"].items():
const_data[mod][cname] = const_load_mp.alloc(
shape=(256, 256, mem_cells, 3), # All LPD constants have the same shape.
dtype=np.uint32 if cname.startswith('BadPixels') else np.float32
)
def load_constant_dataset(wid, index, mod):
for cname, mdata in const_yaml[mod]["constants"].items():
with h5py.File(mdata["path"], "r") as cf:
cf[f"{mdata['dataset']}/data"].read_direct(const_data[mod][cname])
const_load_mp.map(load_constant_dataset, karabo_da) cell_ids_pattern_s = None
else: if use_cell_order != 'never':
cell_ids_pattern_s = None # Read the order of memory cells used
if use_cell_order != 'never': raw_data = xd.DataCollection.from_paths([e[1] for e in data_to_process])
# Read the order of memory cells used cell_ids_pattern_s = make_cell_order_condition(
raw_data = xd.DataCollection.from_paths([e[1] for e in data_to_process]) use_cell_order, get_mem_cell_pattern(raw_data, det_inp_sources)
cell_ids_pattern_s = make_cell_order_condition(
use_cell_order, get_mem_cell_pattern(raw_data, det_inp_sources)
)
print("Memory cells order:", cell_ids_pattern_s)
lpd_cal = LPD_CalibrationData(
detector_name=karabo_id,
modules=karabo_da,
sensor_bias_voltage=bias_voltage,
memory_cells=mem_cells,
feedback_capacitor=capacitor,
source_energy=photon_energy,
memory_cell_order=cell_ids_pattern_s,
category=category,
event_at=creation_time,
client=rest_cfg.calibration_client(),
caldb_root=Path(cal_db_root),
) )
const_data = lpd_cal.ndarray_map(["Offset", "BadPixelsDark"]) print("Memory cells order:", cell_ids_pattern_s)
try:
illum_const_data = lpd_cal.ndarray_map(lpd_cal.illuminated_calibrations) lpd_cal = LPD_CalibrationData(
for key, value in illum_const_data.items(): detector_name=karabo_id,
const_data.setdefault(key, {}).update(value) modules=karabo_da,
except CalCatError as e: # TODO: replace when API errors are improved. sensor_bias_voltage=bias_voltage,
warning(f"CalCatError: {e}") memory_cells=mem_cells,
feedback_capacitor=capacitor,
source_energy=photon_energy,
memory_cell_order=cell_ids_pattern_s,
category=category,
event_at=creation_time,
client=rest_cfg.calibration_client(),
caldb_root=Path(cal_db_root),
)
lpd_metadata = lpd_cal.metadata(["Offset", "BadPixelsDark"])
try:
illum_metadata = lpd_cal.ndarray_map(lpd_cal.illuminated_calibrations)
for key, value in illum_metadata.items():
lpd_metadata.setdefault(key, {}).update(value)
except CalCatError as e: # TODO: replace when API errors are improved.
warning(f"CalCatError: {e}")
total_time = perf_counter() - start total_time = perf_counter() - start
print(f'Loading constants {total_time:.1f}s') print(f'Loading constants {total_time:.1f}s')
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# Validate the constants availability and raise/warn accordingly. # Validate the constants availability and raise/warn accordingly.
for mod, calibrations in const_data.items(): for mod, calibrations in lpd_metadata.items():
missing_offset = {"Offset"} - set(calibrations) missing_offset = {"Offset"} - set(calibrations)
warn_missing_constants = { warn_missing_constants = {
"BadPixelsDark", "BadPixelsFF", "GainAmpMap", "BadPixelsDark", "BadPixelsFF", "GainAmpMap",
"FFMap", "RelativeGain"} - set(calibrations) "FFMap", "RelativeGain"} - set(calibrations)
if missing_offset: if missing_offset:
warning(f"Offset constant is not available to correct {mod}.") warning(f"Offset constant is not available to correct {mod}.")
karabo_da.remove(mod) karabo_da.remove(mod)
if warn_missing_constants: if warn_missing_constants:
warning(f"Constants {warn_missing_constants} were not retrieved for {mod}.") warning(f"Constants {warn_missing_constants} were not retrieved for {mod}.")
if not karabo_da: # Offsets are missing for all modules. if not karabo_da: # Offsets are missing for all modules.
raise Exception("Could not find offset constants for any modules, will not correct data.") raise Exception("Could not find offset constants for any modules, will not correct data.")
# Remove skipped correction modules from data_to_process # Remove skipped correction modules from data_to_process
data_to_process = [(mod, in_f, out_f) for mod, in_f, out_f in data_to_process if mod in karabo_da] data_to_process = [(mod, in_f, out_f) for mod, in_f, out_f in data_to_process if mod in karabo_da]
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
# write constants metadata to fragment YAML
write_constants_fragment(
out_folder=(metadata_folder or out_folder),
det_metadata=lpd_metadata,
caldb_root=lpd_cal.caldb_root,
)
# Load constants data for all constants
const_data = lpd_cal.ndarray_map(metadata=lpd_metadata)
```
%% Cell type:code id: tags:
``` python
# These are intended in order cell, X, Y, gain # These are intended in order cell, X, Y, gain
ccv_offsets = {} ccv_offsets = {}
ccv_gains = {} ccv_gains = {}
ccv_masks = {} ccv_masks = {}
ccv_shape = (mem_cells, 256, 256, 3) ccv_shape = (mem_cells, 256, 256, 3)
constant_order = { constant_order = {
'Offset': (2, 1, 0, 3), 'Offset': (2, 1, 0, 3),
'BadPixelsDark': (2, 1, 0, 3), 'BadPixelsDark': (2, 1, 0, 3),
'RelativeGain': (2, 0, 1, 3), 'RelativeGain': (2, 0, 1, 3),
'FFMap': (2, 0, 1, 3), 'FFMap': (2, 0, 1, 3),
'BadPixelsFF': (2, 0, 1, 3), 'BadPixelsFF': (2, 0, 1, 3),
'GainAmpMap': (2, 0, 1, 3), 'GainAmpMap': (2, 0, 1, 3),
} }
def prepare_constants(wid, index, aggregator): def prepare_constants(wid, index, aggregator):
consts = const_data.get(aggregator, {}) consts = const_data.get(aggregator, {})
def _prepare_data(calibration_name, dtype): def _prepare_data(calibration_name, dtype):
return consts[calibration_name] \ return consts[calibration_name] \
.transpose(constant_order[calibration_name]) \ .transpose(constant_order[calibration_name]) \
.astype(dtype, copy=True) # Make sure array is contiguous. .astype(dtype, copy=True) # Make sure array is contiguous.
if offset_corr and 'Offset' in consts: if offset_corr and 'Offset' in consts:
ccv_offsets[aggregator] = _prepare_data('Offset', np.float32) ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)
else: else:
ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32) ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32) ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)
if 'BadPixelsDark' in consts: if 'BadPixelsDark' in consts:
ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32) ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)
else: else:
ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32) ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)
if rel_gain and 'RelativeGain' in consts: if rel_gain and 'RelativeGain' in consts:
ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32) ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)
if ff_map and 'FFMap' in consts: if ff_map and 'FFMap' in consts:
ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32) ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)
if 'BadPixelsFF' in consts: if 'BadPixelsFF' in consts:
np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32), np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),
out=ccv_masks[aggregator]) out=ccv_masks[aggregator])
if gain_amp_map and 'GainAmpMap' in consts: if gain_amp_map and 'GainAmpMap' in consts:
ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32) ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)
print('.', end='', flush=True) print('.', end='', flush=True)
print('Preparing constants', end='', flush=True) print('Preparing constants', end='', flush=True)
start = perf_counter() start = perf_counter()
psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da) psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)
total_time = perf_counter() - start total_time = perf_counter() - start
print(f'{total_time:.1f}s') print(f'{total_time:.1f}s')
const_data.clear() # Clear raw constants data now to save memory. const_data.clear() # Clear raw constants data now to save memory.
gc.collect(); gc.collect();
``` ```
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
def correct_file(wid, index, work): def correct_file(wid, index, work):
aggregator, inp_path, outp_path = work aggregator, inp_path, outp_path = work
module_index = int(aggregator[-2:]) module_index = int(aggregator[-2:])
start = perf_counter() start = perf_counter()
dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True) dc = xd.H5File(inp_path, inc_suspect_trains=False).select('*', 'image.*', require_all=True)
inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)] inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]
open_time = perf_counter() - start open_time = perf_counter() - start
# Load raw data for this file. # Load raw data for this file.
# Reshaping gets rid of the extra 1-len dimensions without # Reshaping gets rid of the extra 1-len dimensions without
# mangling the frame axis for an actual frame count of 1. # mangling the frame axis for an actual frame count of 1.
start = perf_counter() start = perf_counter()
in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256) in_raw = inp_source['image.data'].ndarray().reshape(-1, 256, 256)
in_cell = inp_source['image.cellId'].ndarray().reshape(-1) in_cell = inp_source['image.cellId'].ndarray().reshape(-1)
in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1) in_pulse = inp_source['image.pulseId'].ndarray().reshape(-1)
read_time = perf_counter() - start read_time = perf_counter() - start
# Allocate output arrays. # Allocate output arrays.
out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32) out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)
out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8) out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)
out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32) out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)
start = perf_counter() start = perf_counter()
correct_lpd_frames(in_raw, in_cell, correct_lpd_frames(in_raw, in_cell,
out_data, out_gain, out_mask, out_data, out_gain, out_mask,
ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator], ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],
num_threads=num_threads_per_worker) num_threads=num_threads_per_worker)
correct_time = perf_counter() - start correct_time = perf_counter() - start
image_counts = inp_source['image.data'].data_counts(labelled=False) image_counts = inp_source['image.data'].data_counts(labelled=False)
start = perf_counter() start = perf_counter()
if (not outp_path.exists() or overwrite) and image_counts.sum() > 0: if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:
outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index) outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)
with DataFile(outp_path, 'w') as outp_file: with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(dc.train_ids, from_file=dc.files[0]) outp_file.create_index(dc.train_ids, from_file=dc.files[0])
outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',)) outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))
outp_source = outp_file.create_instrument_source(outp_source_name) outp_source = outp_file.create_instrument_source(outp_source_name)
outp_source.create_index(image=image_counts) outp_source.create_index(image=image_counts)
outp_source.create_key('image.cellId', data=in_cell, outp_source.create_key('image.cellId', data=in_cell,
chunks=(min(chunks_ids, in_cell.shape[0]),)) chunks=(min(chunks_ids, in_cell.shape[0]),))
outp_source.create_key('image.pulseId', data=in_pulse, outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(min(chunks_ids, in_pulse.shape[0]),)) chunks=(min(chunks_ids, in_pulse.shape[0]),))
outp_source.create_key('image.data', data=out_data, outp_source.create_key('image.data', data=out_data,
chunks=(min(chunks_data, out_data.shape[0]), 256, 256)) chunks=(min(chunks_data, out_data.shape[0]), 256, 256))
outp_source.create_compressed_key('image.gain', data=out_gain) outp_source.create_compressed_key('image.gain', data=out_gain)
outp_source.create_compressed_key('image.mask', data=out_mask) outp_source.create_compressed_key('image.mask', data=out_mask)
write_time = perf_counter() - start write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time total_time = open_time + read_time + correct_time + write_time
frame_rate = in_raw.shape[0] / total_time frame_rate = in_raw.shape[0] / total_time
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format( print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, aggregator, open_time, read_time, correct_time, write_time, total_time, wid, aggregator, open_time, read_time, correct_time, write_time, total_time,
in_raw.shape[0], frame_rate)) in_raw.shape[0], frame_rate))
in_raw = None in_raw = None
in_cell = None in_cell = None
in_pulse = None in_pulse = None
out_data = None out_data = None
out_gain = None out_gain = None
out_mask = None out_mask = None
gc.collect() gc.collect()
print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate') print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
start = perf_counter() start = perf_counter()
psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process) psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)
total_time = perf_counter() - start total_time = perf_counter() - start
print(f'Total time: {total_time:.1f}s') print(f'Total time: {total_time:.1f}s')
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
# Data preview for first train # Data preview for first train
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
geom = xg.LPD_1MGeometry.from_quad_positions( geom = xg.LPD_1MGeometry.from_quad_positions(
[(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)]) [(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])
output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()] output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]
if not output_paths: if not output_paths:
warning('Data preview is skipped as there are no existing output paths') warning('Data preview is skipped as there are no existing output paths')
from sys import exit from sys import exit
exit(0) exit(0)
dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0]) dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])
det = LPD1M(dc, detector_name=karabo_id) det = LPD1M(dc, detector_name=karabo_id)
data = det.get_array('image.data') data = det.get_array('image.data')
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Intensity histogram across all cells ### Intensity histogram across all cells
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
left_edge_ratio = 0.01 left_edge_ratio = 0.01
right_edge_ratio = 0.99 right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6)) fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000)) values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value): def find_nearest_index(array, value):
return (np.abs(array - value)).argmin() return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values) cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)] vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)] vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max() max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2) ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%', ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large') color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%', ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large') color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval', ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large') color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1) ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1) ax.set_ylim(0, max_value*1.1)
pass pass
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### First memory cell ### First memory cell
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1) fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax) geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)
pass pass
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Train average ### Train average
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1) fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax) geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)
pass pass
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Lowest gain stage per pixel ### Lowest gain stage per pixel
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2)) highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1) fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2); p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2]) cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain']) cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
``` ```
%% Cell type:markdown id: tags: %% Cell type:markdown id: tags:
### Create virtual CXI file ### Create virtual CXI file
%% Cell type:code id: tags: %% Cell type:code id: tags:
``` python ``` python
if create_virtual_cxi_in: if create_virtual_cxi_in:
vcxi_folder = Path(create_virtual_cxi_in.format( vcxi_folder = Path(create_virtual_cxi_in.format(
run=run, proposal_folder=str(Path(in_folder).parent))) run=run, proposal_folder=str(Path(in_folder).parent)))
vcxi_folder.mkdir(parents=True, exist_ok=True) vcxi_folder.mkdir(parents=True, exist_ok=True)
def sort_files_by_seq(by_seq, outp_path): def sort_files_by_seq(by_seq, outp_path):
by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path) by_seq.setdefault(int(outp_path.stem[-5:]), []).append(outp_path)
return by_seq return by_seq
from functools import reduce from functools import reduce
reduce(sort_files_by_seq, output_paths, output_by_seq := {}) reduce(sort_files_by_seq, output_paths, output_by_seq := {})
for seq_number, seq_output_paths in output_by_seq.items(): for seq_number, seq_output_paths in output_by_seq.items():
# Create data collection and detector components only for this sequence. # Create data collection and detector components only for this sequence.
try: try:
det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4) det = LPD1M(xd.DataCollection.from_paths(seq_output_paths), detector_name=karabo_id, min_modules=4)
except ValueError: # Couldn't find enough data for min_modules except ValueError: # Couldn't find enough data for min_modules
continue continue
det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi') det.write_virtual_cxi(vcxi_folder / f'VCXI-LPD-R{run:04d}-S{seq_number:05d}.cxi')
``` ```
......
%% Cell type:markdown id: tags:
# LPD Retrieving Constants Pre-correction #
Author: European XFEL Detector Group, Version: 1.0
The following notebook provides a constants metadata in a YAML file to use while correcting LPD images.
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/ahmedk/test/remove/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # run to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
# CalCat parameters
creation_time = "" # The timestamp to use with Calibration DB. Required Format: "YYYY-MM-DD hh:mm:ss" e.g. 2019-07-04 11:02:41
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
use_cell_order = 'auto' # Whether to use memory cell order as a detector condition (not stored for older constants)
```
%% Cell type:code id: tags:
``` python
from logging import warning
from pathlib import Path
import numpy as np
from calibration_client import CalibrationClient
from calibration_client.modules import CalibrationConstantVersion
import extra_data as xd
from cal_tools.lpdlib import get_mem_cell_pattern, make_cell_order_condition
import cal_tools.restful_config as rest_cfg
from cal_tools.calcat_interface import CalCatError, LPD_CalibrationData
from cal_tools.tools import (
CalibrationMetadata,
calcat_creation_time,
)
from cal_tools.step_timing import StepTimer
```
%% Cell type:code id: tags:
``` python
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
metadata = CalibrationMetadata(metadata_folder or out_folder)
# Constant paths & timestamps are saved under retrieved-constants in calibration_metadata.yml
retrieved_constants = metadata.setdefault("retrieved-constants", {})
creation_time = calcat_creation_time(in_folder, run, creation_time)
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if not karabo_da or karabo_da == ['']:
if not modules or modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
else:
modules = [int(x[-2:]) for x in karabo_da]
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
```
%% Cell type:code id: tags:
``` python
step_timer = StepTimer()
```
%% Cell type:code id: tags:
``` python
step_timer.start()
cell_ids_pattern_s = None
if use_cell_order != 'never':
# Read the order of memory cells used
raw_data = xd.RunDirectory(Path(in_folder, f'r{run:04d}'))
cell_ids_pattern_s = make_cell_order_condition(
use_cell_order, get_mem_cell_pattern(raw_data, det_inp_sources)
)
print("Memory cells order:", cell_ids_pattern_s)
lpd_cal = LPD_CalibrationData(
detector_name=karabo_id,
modules=karabo_da,
sensor_bias_voltage=bias_voltage,
memory_cells=mem_cells,
feedback_capacitor=capacitor,
source_energy=photon_energy,
memory_cell_order=cell_ids_pattern_s,
category=category,
event_at=creation_time,
client=rest_cfg.calibration_client(),
)
# Initialize const_data in case no constants were retrieved.
lpd_cal_metadata = {mod: {"no-constants-retrieved": None} for mod in karabo_da}
constant_data = lpd_cal.metadata(["Offset", "BadPixelsDark"])
try:
illum_const_data = lpd_cal.metadata(lpd_cal.illuminated_calibrations)
for key, value in illum_const_data.items():
const_data.setdefault(key, {}).update(value)
except CalCatError as e: # TODO: replace when API errors are improved.
warning(f"CalCatError: {e}")
# Validate the constants availability and raise/warn correspondingly.
for mod, ccv_dict in lpd_cal_metadata.items():
missing_offset = {"Offset"} - set(ccv_dict)
warn_missing_constants = {
"BadPixelsDark", "BadPixelsFF", "GainAmpMap", "FFMap", "RelativeGain"} - set(ccv_dict)
if missing_offset:
warning(f"Offset constant is not available to correct {mod}")
karabo_da.remove(mod)
if warn_missing_constants:
warning(f"Constants {warn_missing_constants} were not retrieved for {mod}.")
if not karabo_da: # Offsets are missing for all modules.
raise Exception("Could not find offset constants for all modules, will not correct data.")
for mod, ccv_dict in lpd_cal_metadata.items():
mdata_dict = {"constants": dict()}
for cname, ccv_metadata in ccv_dict.items():
mdata_dict["constants"][cname] = {
"path": str(lpd_cal.caldb_root / ccv_metadata["path"]),
"dataset": ccv_metadata["dataset"],
"creation-time": ccv_metadata["begin_validity_at"],
"ccv_id": ccv_metadata["ccv_id"],
}
mdata_dict["physical-name"] = ccv_metadata["physical_name"]
retrieved_constants[mod] = mdata_dict
metadata.save()
step_timer.done_step(f"Stored retrieved constants in {metadata.filename}.")
```
...@@ -76,8 +76,6 @@ notebooks = { ...@@ -76,8 +76,6 @@ notebooks = {
"cluster cores": 8}, "cluster cores": 8},
}, },
"CORRECT": { "CORRECT": {
"pre_notebooks": [
"notebooks/LPD/LPD_retrieve_constants_precorrection.ipynb"],
"notebook": "notebooks/LPD/LPD_Correct_Fast.ipynb", "notebook": "notebooks/LPD/LPD_Correct_Fast.ipynb",
"concurrency": {"parameter": "sequences", "concurrency": {"parameter": "sequences",
"default concurrency": [-1], "default concurrency": [-1],
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment