Skip to content
Snippets Groups Projects
Commit 43b6eb6b authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Merge branch 'feat/LPD-revival' into 'master'

[LPD] New correction notebook

See merge request detectors/pycalibration!672
parents 221d02e5 2049c425
No related branches found
Tags 3.5.2
1 merge request!672[LPD] New correction notebook
%% Cell type:markdown id: tags:
# LPD Offline Correction #
Author: European XFEL Data Analysis Group
%% Cell type:code id: tags:
``` python
# Input parameters
in_folder = "/gpfs/exfel/exp/FXE/202201/p003073/raw/" # the folder to read data from, required
out_folder = "/gpfs/exfel/data/scratch/schmidtp/random/LPD_test" # the folder to output to, required
metadata_folder = '' # Directory containing calibration_metadata.yml when run by xfel-calibrate.
sequences = [-1] # Sequences to correct, use [-1] for all
modules = [-1] # Modules indices to correct, use [-1] for all, only used when karabo_da is empty
karabo_da = [''] # Data aggregators names to correct, use [''] for all
run = 10 # runs to process, required
# Source parameters
karabo_id = 'FXE_DET_LPD1M-1' # Karabo domain for detector.
input_source = '{karabo_id}/DET/{module_index}CH0:xtdf' # Input fast data source.
output_source = '' # Output fast data source, empty to use same as input.
# CalCat parameters
use_dir_creation_date = True # Use the creation date of the directory for database time derivation.
cal_db_interface = '' # Not needed, compatibility with current webservice.
cal_db_timeout = 0 # Not needed, compatbility with current webservice.
cal_db_root = '/gpfs/exfel/d/cal/caldb_store'
# Operating conditions
mem_cells = 512 # Memory cells, LPD constants are always taken with 512 cells.
bias_voltage = 250.0 # Detector bias voltage.
capacitor = '5pF' # Capacitor setting: 5pF or 50pF
photon_energy = 9.2 # Photon energy in keV.
category = 0 # Whom to blame.
# Correction parameters
offset_corr = True # Offset correction.
rel_gain = True # Gain correction based on RelativeGain constant.
ff_map = True # Gain correction based on FFMap constant.
gain_amp_map = True # Gain correction based on GainAmpMap constant.
# Output options
overwrite = True # set to True if existing data should be overwritten
chunks_data = 32 # HDF chunk size for pixel data in number of frames.
chunks_ids = 32 # HDF chunk size for cellId and pulseId datasets.
# Parallelization options
sequences_per_node = 1 # Sequence files to process per node
num_workers = 8 # Worker processes per node, 8 is safe on 768G nodes but won't work on 512G.
num_threads_per_worker = 32 # Number of threads per worker.
def balance_sequences(in_folder, run, sequences, sequences_per_node, karabo_da):
from xfel_calibrate.calibrate import balance_sequences as bs
return bs(in_folder, run, sequences, sequences_per_node, karabo_da)
```
%% Cell type:code id: tags:
``` python
from collections import OrderedDict
from pathlib import Path
from time import perf_counter
import gc
import re
import warnings
import numpy as np
import h5py
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
%matplotlib inline
from calibration_client import CalibrationClient
from calibration_client.modules import CalibrationConstantVersion
import extra_data as xd
import extra_geom as xg
import pasha as psh
from extra_data.components import LPD1M
from cal_tools.lpdalgs import correct_lpd_frames
from cal_tools.tools import CalibrationMetadata, get_dir_creation_date, write_compressed_frames
from cal_tools.files import DataFile
from cal_tools.restful_config import restful_config
```
%% Cell type:markdown id: tags:
# Prepare environment
%% Cell type:code id: tags:
``` python
file_re = re.compile(r'^RAW-R(\d{4})-(\w+\d+)-S(\d{5})$') # This should probably move to cal_tools
run_folder = Path(in_folder) / f'r{run:04d}'
out_folder = Path(out_folder)
out_folder.mkdir(exist_ok=True)
output_source = output_source or input_source
cal_db_root = Path(cal_db_root)
metadata = CalibrationMetadata(metadata_folder or out_folder)
if use_dir_creation_date:
creation_time = get_dir_creation_date(in_folder, run)
else:
from datetime import datetime
creation_time = datetime.now()
print(f'Using {creation_time.isoformat()} as creation time')
# Pick all modules/aggregators or those selected.
if not karabo_da or karabo_da == ['']:
if not modules or modules == [-1]:
modules = list(range(16))
karabo_da = [f'LPD{i:02d}' for i in modules]
# Pick all sequences or those selected.
if not sequences or sequences == [-1]:
do_sequence = lambda seq: True
else:
do_sequence = [int(x) for x in sequences].__contains__
# List of detector sources.
det_inp_sources = [input_source.format(karabo_id=karabo_id, module_index=int(da[-2:])) for da in karabo_da]
```
%% Cell type:markdown id: tags:
# Select data to process
%% Cell type:code id: tags:
``` python
data_to_process = []
for inp_path in run_folder.glob('RAW-*.h5'):
match = file_re.match(inp_path.stem)
if match[2] not in karabo_da or not do_sequence(int(match[3])):
continue
outp_path = out_folder / 'CORR-R{run:04d}-{aggregator}-S{seq:05d}.h5'.format(
run=int(match[1]), aggregator=match[2], seq=int(match[3]))
data_to_process.append((match[2], inp_path, outp_path))
print('Files to process:')
for data_descr in sorted(data_to_process, key=lambda x: f'{x[0]}{x[1]}'):
print(f'{data_descr[0]}\t{data_descr[1]}')
```
%% Cell type:markdown id: tags:
# Obtain and prepare calibration constants
%% Cell type:code id: tags:
``` python
# Connect to CalCat.
calcat_config = restful_config['calcat']
client = CalibrationClient(
client_id=calcat_config['user-id'],
client_secret=calcat_config['user-secret'],
user_email=calcat_config['user-email'],
base_api_url=calcat_config['base-api-url'],
token_url=calcat_config['token-url'],
refresh_url=calcat_config['refresh-url'],
auth_url=calcat_config['auth-url'],
scope='')
```
%% Cell type:code id: tags:
``` python
dark_calibrations = {
1: 'Offset', # np.float32
14: 'BadPixelsDark' # should be np.uint32, but is np.float64
}
dark_condition = [
dict(parameter_id=1, value=bias_voltage), # Sensor bias voltage
dict(parameter_id=7, value=mem_cells), # Memory cells
dict(parameter_id=15, value=capacitor), # Feedback capacitor
dict(parameter_id=13, value=256), # Pixels X
dict(parameter_id=14, value=256), # Pixels Y
]
illuminated_calibrations = {
20: 'BadPixelsFF', # np.uint32
42: 'GainAmpMap', # np.float32
43: 'FFMap', # np.float32
44: 'RelativeGain' # np.float32
}
illuminated_condition = dark_condition.copy()
illuminated_condition += [
dict(parameter_id=3, value=photon_energy), # Source energy
dict(parameter_id=25, value=category) # category
]
const_data = {}
const_load_mp = psh.ProcessContext(num_workers=24)
print('Querying calibration database', end='', flush=True)
start = perf_counter()
for calibrations, condition in [
(dark_calibrations, dark_condition),
(illuminated_calibrations, illuminated_condition)
]:
resp = CalibrationConstantVersion.get_closest_by_time_by_detector_conditions(
client, karabo_id, list(calibrations.keys()),
{'parameters_conditions_attributes': condition},
karabo_da='', event_at=creation_time.isoformat(), snapshot_at=None)
if not resp['success']:
raise RuntimeError(resp)
for ccv in resp['data']:
cc = ccv['calibration_constant']
da = ccv['physical_detector_unit']['karabo_da']
calibration_name = calibrations[cc['calibration_id']]
dtype = np.uint32 if calibration_name.startswith('BadPixels') else np.float32
const_data[(da, calibration_name)] = dict(
path=Path(ccv['path_to_file']) / ccv['file_name'],
dataset=ccv['data_set_name'],
data=const_load_mp.alloc(shape=(256, 256, mem_cells, 3), dtype=dtype)
)
print('.', end='', flush=True)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
def load_constant_dataset(wid, index, const_descr):
ccv_entry = const_data[const_descr]
with h5py.File(cal_db_root / ccv_entry['path'], 'r') as fp:
fp[ccv_entry['dataset'] + '/data'].read_direct(ccv_entry['data'])
print('.', end='', flush=True)
print('Loading calibration data', end='', flush=True)
start = perf_counter()
const_load_mp.map(load_constant_dataset, list(const_data.keys()))
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
```
%% Cell type:code id: tags:
``` python
# These are intended in order cell, X, Y, gain
ccv_offsets = {}
ccv_gains = {}
ccv_masks = {}
ccv_shape = (mem_cells, 256, 256, 3)
constant_order = {
'Offset': (2, 1, 0, 3),
'BadPixelsDark': (2, 1, 0, 3),
'RelativeGain': (2, 1, 0, 3),
'FFMap': (2, 0, 1, 3),
'BadPixelsFF': (2, 0, 1, 3),
'GainAmpMap': (2, 0, 1, 3),
}
def prepare_constants(wid, index, aggregator):
consts = {calibration_name: entry['data']
for (aggregator_, calibration_name), entry
in const_data.items()
if aggregator == aggregator_}
def _prepare_data(calibration_name, dtype):
return consts[calibration_name] \
.transpose(constant_order[calibration_name]) \
.astype(dtype, copy=True) # Make sure array is contiguous.
if offset_corr and 'Offset' in consts:
ccv_offsets[aggregator] = _prepare_data('Offset', np.float32)
else:
ccv_offsets[aggregator] = np.zeros(ccv_shape, dtype=np.float32)
ccv_gains[aggregator] = np.ones(ccv_shape, dtype=np.float32)
if 'BadPixelsDark' in consts:
ccv_masks[aggregator] = _prepare_data('BadPixelsDark', np.uint32)
else:
ccv_masks[aggregator] = np.zeros(ccv_shape, dtype=np.uint32)
if rel_gain and 'RelativeGain' in consts:
ccv_gains[aggregator] *= _prepare_data('RelativeGain', np.float32)
if ff_map and 'FFMap' in consts:
ccv_gains[aggregator] *= _prepare_data('FFMap', np.float32)
if 'BadPixelsFF' in consts:
np.bitwise_or(ccv_masks[aggregator], _prepare_data('BadPixelsFF', np.uint32),
out=ccv_masks[aggregator])
if gain_amp_map and 'GainAmpMap' in consts:
ccv_gains[aggregator] *= _prepare_data('GainAmpMap', np.float32)
print('.', end='', flush=True)
print('Preparing constants', end='', flush=True)
start = perf_counter()
psh.ThreadContext(num_workers=len(karabo_da)).map(prepare_constants, karabo_da)
total_time = perf_counter() - start
print(f'{total_time:.1f}s')
const_data.clear() # Clear raw constants data now to save memory.
gc.collect();
```
%% Cell type:code id: tags:
``` python
def correct_file(wid, index, work):
aggregator, inp_path, outp_path = work
module_index = int(aggregator[-2:])
start = perf_counter()
dc = xd.H5File(inp_path).select('*', 'image.*', require_all=True)
inp_source = dc[input_source.format(karabo_id=karabo_id, module_index=module_index)]
open_time = perf_counter() - start
# Load raw data for this file.
start = perf_counter()
in_raw = inp_source['image.data'].ndarray().squeeze()
in_cell = inp_source['image.cellId'].ndarray().squeeze()
in_pulse = inp_source['image.pulseId'].ndarray().squeeze()
read_time = perf_counter() - start
# Allocate output arrays.
out_data = np.zeros((in_raw.shape[0], 256, 256), dtype=np.float32)
out_gain = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint8)
out_mask = np.zeros((in_raw.shape[0], 256, 256), dtype=np.uint32)
start = perf_counter()
correct_lpd_frames(in_raw, in_cell,
out_data, out_gain, out_mask,
ccv_offsets[aggregator], ccv_gains[aggregator], ccv_masks[aggregator],
num_threads=num_threads_per_worker)
correct_time = perf_counter() - start
image_counts = inp_source['image.data'].data_counts(labelled=False)
start = perf_counter()
if (not outp_path.exists() or overwrite) and image_counts.sum() > 0:
fa = dc.files[0]
sel_trains = np.isin(fa.train_ids, dc.train_ids)
outp_source_name = output_source.format(karabo_id=karabo_id, module_index=module_index)
with DataFile(outp_path, 'w') as outp_file:
outp_file.create_index(
train_ids=dc.train_ids,
timestamps=fa.file['INDEX/timestamp'][sel_trains],
flags=fa.validity_flag[sel_trains])
outp_file.create_metadata(like=dc, instrument_channels=(f'{outp_source_name}/image',))
outp_source = outp_file.create_instrument_source(outp_source_name)
outp_source.create_index(image=image_counts)
outp_source.create_key('image.cellId', data=in_cell,
chunks=(chunks_ids,))
outp_source.create_key('image.pulseId', data=in_pulse,
chunks=(chunks_ids,))
outp_source.create_key('image.data', data=out_data,
chunks=(chunks_data, 256, 256))
write_compressed_frames(
out_gain, outp_file, f'INSTRUMENT/{outp_source_name}/image/gain', comp_threads=8)
write_compressed_frames(
out_mask, outp_file, f'INSTRUMENT/{outp_source_name}/image/mask', comp_threads=8)
write_time = perf_counter() - start
total_time = open_time + read_time + correct_time + write_time
frame_rate = in_raw.shape[0] / total_time
print('{}\t{}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{:.3f}\t{}\t{:.1f}'.format(
wid, aggregator, open_time, read_time, correct_time, write_time, total_time,
in_raw.shape[0], frame_rate))
in_raw = None
in_cell = None
in_pulse = None
out_data = None
out_gain = None
out_mask = None
gc.collect()
print('worker\tDA\topen\tread\tcorrect\twrite\ttotal\tframes\trate')
start = perf_counter()
psh.ProcessContext(num_workers=num_workers).map(correct_file, data_to_process)
total_time = perf_counter() - start
print(f'Total time: {total_time:.1f}s')
```
%% Cell type:markdown id: tags:
# Data preview for first train
%% Cell type:code id: tags:
``` python
geom = xg.LPD_1MGeometry.from_quad_positions(
[(11.4, 299), (-11.5, 8), (254.5, -16), (278.5, 275)])
output_paths = [outp_path for _, _, outp_path in data_to_process if outp_path.exists()]
dc = xd.DataCollection.from_paths(output_paths).select_trains(np.s_[0])
det = LPD1M(dc, detector_name=karabo_id)
data = det.get_array('image.data')
```
%% Cell type:markdown id: tags:
### Intensity histogram across all cells
%% Cell type:code id: tags:
``` python
left_edge_ratio = 0.01
right_edge_ratio = 0.99
fig, ax = plt.subplots(num=1, clear=True, figsize=(15, 6))
values, bins, _ = ax.hist(np.ravel(data.data), bins=2000, range=(-1500, 2000))
def find_nearest_index(array, value):
return (np.abs(array - value)).argmin()
cum_values = np.cumsum(values)
vmin = bins[find_nearest_index(cum_values, cum_values[-1]*left_edge_ratio)]
vmax = bins[find_nearest_index(cum_values, cum_values[-1]*right_edge_ratio)]
max_value = values.max()
ax.vlines([vmin, vmax], 0, max_value, color='red', linewidth=5, alpha=0.2)
ax.text(vmin, max_value, f'{left_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax, max_value, f'{right_edge_ratio*100:.0f}%',
color='red', ha='center', va='bottom', size='large')
ax.text(vmax+(vmax-vmin)*0.01, max_value/2, 'Colormap interval',
color='red', rotation=90, ha='left', va='center', size='x-large')
ax.set_xlim(vmin-(vmax-vmin)*0.1, vmax+(vmax-vmin)*0.1)
ax.set_ylim(0, max_value*1.1)
pass
```
%% Cell type:markdown id: tags:
### First memory cell
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=2, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0, 0], ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Train average
%% Cell type:code id: tags:
``` python
fig, ax = plt.subplots(num=3, figsize=(15, 15), clear=True, nrows=1, ncols=1)
geom.plot_data_fast(data[:, 0].mean(axis=1), ax=ax, vmin=vmin, vmax=vmax)
pass
```
%% Cell type:markdown id: tags:
### Lowest gain stage per pixel
%% Cell type:code id: tags:
``` python
highest_gain_stage = det.get_array('image.gain', pulses=np.s_[:]).max(axis=(1, 2))
fig, ax = plt.subplots(num=4, figsize=(15, 15), clear=True, nrows=1, ncols=1)
p = geom.plot_data_fast(highest_gain_stage, ax=ax, vmin=0, vmax=2);
cb = ax.images[0].colorbar
cb.set_ticks([0, 1, 2])
cb.set_ticklabels(['High gain', 'Medium gain', 'Low gain'])
```
......@@ -18,6 +18,13 @@ ext_modules = [
extra_compile_args=["-fopenmp", "-march=native"],
extra_link_args=["-fopenmp"],
),
Extension(
'cal_tools.lpdalgs',
['src/cal_tools/lpdalgs.pyx'],
extra_compile_args=['-O3', '-fopenmp', '-march=native',
'-ftree-vectorize', '-frename-registers'],
extra_link_args=['-fopenmp'],
)
]
......@@ -89,6 +96,7 @@ install_requires = [
"sphinx==1.8.5",
"tabulate==0.8.6",
"traitlets==4.3.3",
"calibration_client==10.0.0",
]
if "readthedocs.org" not in sys.executable:
......
from datetime import datetime
from itertools import chain
from numbers import Integral
from pathlib import Path
import re
import numpy as np
import h5py
def get_pulse_offsets(pulses_per_train):
"""Compute pulse offsets from pulse counts.
Given an array of number of pulses per train (INDEX/<source>/count),
computes the offsets (INDEX/<source>/first) for the first pulse of a
train inthe data array.
Args:
pulses_per_train (array_like): Pulse count per train.
Returns:
(array_like) Offet of first pulse for each train.
"""
pulse_offsets = np.zeros_like(pulses_per_train)
np.cumsum(pulses_per_train[:-1], out=pulse_offsets[1:])
return pulse_offsets
def sequence_trains(train_ids, trains_per_sequence=256):
"""Iterate over sequences for a list of trains.
For pulse-resolved data, sequence_pulses may be used instead.
Args:
train_ids (array_like): Train IDs to sequence.
trains_per_sequence (int, optional): Number of trains
per sequence, 256 by default.
Yields:
(int, slice) Current sequence ID, train mask.
"""
num_trains = len(train_ids)
for seq_id, start in enumerate(range(0, num_trains, trains_per_sequence)):
train_mask = slice(
*np.s_[start:start+trains_per_sequence].indices(num_trains))
yield seq_id, train_mask
def sequence_pulses(train_ids, pulses_per_train=1, pulse_offsets=None,
trains_per_sequence=256):
"""Split trains into sequences.
Args:
train_ids (array_like): Train IDs to sequence.
pulses_per_train (int or array_like, optional): Pulse count per
train. If scalar, it is assumed to be constant for all
trains. If omitted, it is 1 by default.
pulse_offsets (array_like, optional): Offsets for the first
pulse in each train, computed from pulses_per_train if
omitted.
trains_per_sequence (int, optional): Number of trains
per sequence, 256 by default.
Yields:
(int, array_like, array_like)
Current sequence ID, train mask, pulse mask.
"""
if isinstance(pulses_per_train, Integral):
pulses_per_train = np.full_like(train_ids, pulses_per_train,
dtype=np.uint64)
if pulse_offsets is None:
pulse_offsets = get_pulse_offsets(pulses_per_train)
for seq_id, train_mask in sequence_trains(train_ids, trains_per_sequence):
start = train_mask.start
stop = train_mask.stop - 1
pulse_mask = np.s_[
pulse_offsets[start]:pulse_offsets[stop]+pulses_per_train[stop]]
yield seq_id, train_mask, pulse_mask
def escape_key(key):
"""Escapes a key name from Karabo to HDF notation."""
return key.replace('.', '/')
class DataFile(h5py.File):
"""European XFEL HDF5 data file.
This class extends the h5py.File with methods specific to writing
data in the European XFEL file format. The internal state does not
depend on using any of these methods, and the underlying file may be
manipulated by any of the regular h5py methods, too.
Please refer to
https://extra-data.readthedocs.io/en/latest/data_format.html for
details of the file format.
"""
filename_format = '{prefix}-R{run:04d}-{aggregator}-S{sequence:05d}.h5'
aggregator_pattern = re.compile(r'^[A-Z]{2,}\d{2}$')
instrument_source_pattern = re.compile(r'^[\w\/-]+:\w+$')
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__control_sources = set()
self.__instrument_sources = set()
self.__run = 0
self.__sequence = 0
@classmethod
def from_details(cls, folder, aggregator, run, sequence, prefix='CORR',
mode='w', *args, **kwargs):
"""Open or create a file based on European XFEL details.
This methis is a wrapper to construct the filename based its
components.
Args:
folder (Path, str): Parent location for this file.
aggregator (str): Name of the data aggregator, must satisfy
DataFile.aggregator_pattern.
run (int): Run number.
sequence (int): Sequence number.
prefix (str, optional): First filename component, 'CORR' by
default.
args, kwargs: Any additional arguments are passed on to
h5py.File
Returns:
(DataFile) Opened file object.
"""
if not isinstance(folder, Path):
folder = Path(folder)
if not cls.aggregator_pattern.match(aggregator):
raise ValueError(f'invalid aggregator format, must satisfy '
f'{cls.aggregator_pattern.pattern}')
filename = cls.filename_format.format(
prefix=prefix, aggregator=aggregator, run=run, sequence=sequence)
self = cls((folder / filename).resolve(), mode, *args, **kwargs)
self.__run = run
self.__sequence = sequence
return self
def create_index(self, train_ids, timestamps=None, flags=None,
origins=None):
"""Create global INDEX datasets.
These datasets are agnostic of any source and describe the
trains contained in this file.
Args:
train_ids (array_like): Train IDs contained in this file.
timestamps (array_like, optional): Timestamp of each train,
0 if omitted.
flags (array_like, optional): Whether the time server is the
initial origin of each train, 1 if omitted.
origins (array_like, optional): Which source is the initial
origin of each train, -1 (time server) if omitted.
Returns:
None
"""
self.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)
if timestamps is None:
timestamps = np.zeros_like(train_ids, dtype=np.uint64)
elif len(timestamps) != len(train_ids):
raise ValueError('timestamps and train_ids must be same length')
self.create_dataset('INDEX/timestamp', data=timestamps,
dtype=np.uint64)
if flags is None:
flags = np.ones_like(train_ids, dtype=np.int32)
elif len(flags) != len(train_ids):
raise ValueError('flags and train_ids must be same length')
self.create_dataset('INDEX/flag', data=flags, dtype=np.int32)
if origins is None:
origins = np.full_like(train_ids, -1, dtype=np.int32)
elif len(origins) != len(train_ids):
raise ValueError('origins and train_ids must be same length')
self.create_dataset('INDEX/origin', data=origins, dtype=np.int32)
def create_control_source(self, source):
"""Create group for a control source ("slow data").
Control sources created via this method are not required to be
passed to create_metadata() again.
Args:
source (str): Karabo device ID.
Returns:
(ControlSource) Created group in CONTROL.
"""
self.__control_sources.add(source)
return ControlSource(self.create_group(f'CONTROL/{source}').id, source)
def create_instrument_source(self, source):
"""Create group for an instrument source ("fast data").
Instrument sources created via this method are not required to be
passed to create_metadata() again.
Args:
source (str): Karabp pipeline path, must satisfy
DataFile.instrument_source_pattern.
Returns:
(InstrumentSource) Created group in INSTRUMENT.
"""
if not self.instrument_source_pattern.match(source):
raise ValueError(f'invalid source format, must satisfy '
f'{self.instrument_source_pattern.pattern}')
self.__instrument_sources.add(source)
return InstrumentSource(self.create_group(f'INSTRUMENT/{source}').id,
source)
def create_metadata(self, like=None, *,
creation_date=None, update_date=None, proposal=0,
run=None, sequence=None, daq_library='1.x',
karabo_framework='2.x', control_sources=(),
instrument_channels=()):
"""Create METADATA datasets.
Args:
like (DataCollection, optional): Take proposal, run,
daq_library, karabo_framework from an EXtra-data data
collection, overwriting any of these arguments passed.
creation_date (datetime, optional): Creation date and time,
now if omitted.
update_date (datetime, optional): Update date and time,
now if omitted.
proposal (int, optional): Proposal number, 0 if omitted and
no DataCollection passed.
run (int, optional): Run number, 0 if omitted, no
DataCollection is passed or object not created via
from_details.
sequence (int, optional): Sequence number, 0 if omitted and
object not created via from_details.
daq_library (str, optional): daqLibrary field, '1.x' if
omitted and no DataCollection passed.
karabo_framework (str, optional): karaboFramework field,
'2.x' if omitted and no DataCollection is passed.
control_sources (Iterable, optional): Control sources in
this file, sources created via create_control_source are
automatically included.
instrument_channels (Iterable, optional): Instrument
channels (source and first component of data hash) in
this file, channels created via create_instrument_source
are automatically included.
Returns:
None
"""
if like is not None:
metadata = like.run_metadata()
proposal = metadata['proposalNumber']
run = metadata['runNumber']
daq_library = metadata['daqLibrary']
karabo_framework = metadata['karaboFramework']
else:
if run is None:
run = self.__run
if sequence is None:
sequence = self.__sequence
if creation_date is None:
creation_date = datetime.now()
if update_date is None:
update_date = creation_date
md_group = self.require_group('METADATA')
md_group.create_dataset(
'creationDate', shape=(1,),
data=creation_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
md_group.create_dataset('daqLibrary', shape=(1,),
data=daq_library.encode('ascii'))
md_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.2')
# Start with the known and specified control sources
sources = {name: 'CONTROL'
for name in chain(self.__control_sources, control_sources)}
# Add in the specified instrument data channels.
sources.update({full_channel: 'INSTRUMENT'
for full_channel in instrument_channels})
# Add in those already in the file, if not already passed.
sources.update({f'{name}/{channel}': 'INSTRUMENT'
for name in self.__instrument_sources
for channel in self[f'INSTRUMENT/{name}']})
source_names = sorted(sources.keys())
data_sources_shape = (len(sources),)
md_group.create_dataset('dataSources/dataSourceId',
shape=data_sources_shape,
data=[f'{sources[name]}/{name}'.encode('ascii')
for name in source_names])
md_group.create_dataset('dataSources/deviceId',
shape=data_sources_shape,
data=[name.encode('ascii')
for name in source_names])
md_group.create_dataset('dataSources/root', shape=data_sources_shape,
data=[sources[name].encode('ascii')
for name in source_names])
md_group.create_dataset(
'karaboFramework', shape=(1,),
data=karabo_framework.encode('ascii'))
md_group.create_dataset(
'proposalNumber', shape=(1,), dtype=np.uint32, data=proposal)
md_group.create_dataset(
'runNumber', shape=(1,), dtype=np.uint32, data=run)
md_group.create_dataset(
'sequenceNumber', shape=(1,), dtype=np.uint32, data=sequence)
md_group.create_dataset(
'updateDate', shape=(1,),
data=update_date.strftime('%Y%m%dT%H%M%SZ').encode('ascii'))
class ControlSource(h5py.Group):
"""Group for a control source ("slow data").
This class extends h5py.Group with methods specific to writing data
of a control source in the European XFEL file format. The internal
state does not depend on using any of these methods, and the
underlying file may be manipulated by any of the regular h5py
methods, too.
"""
ascii_dt = h5py.string_dtype('ascii')
def __init__(self, group_id, source):
super().__init__(group_id)
self.__source = source
self.__run_group = self.file.create_group(f'RUN/{source}')
def get_run_group(self):
return self.__run_group
def get_index_group(self):
return self.file.require_group(f'INDEX/{self.__source}')
def create_key(self, key, values, timestamps=None, run_entry=None):
"""Create datasets for a key varying each train.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
values (array_like): Source values for each train.
timestamps (array_like, optional): Timestamps for each
source value, 0 if omitted.
run_entry (tuple of array_like, optional): Value and
timestamp for the corresponding value in the RUN
section. The first entry for the train values is used if
omitted.
Returns:
None
"""
key = escape_key(key)
if timestamps is None:
timestamps = np.zeros_like(values, dtype=np.uint64)
elif len(values) != len(timestamps):
raise ValueError('values and timestamp must be the same length')
self.create_dataset(f'{key}/value', data=values)
self.create_dataset(f'{key}/timestamp', data=timestamps)
if run_entry is None:
run_entry = (values[0], timestamps[0])
self.create_run_key(key, *run_entry)
def create_run_key(self, key, value, timestamp=None):
"""Create datasets for a key constant over a run.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
value (Any): Key value.
timestamp (int, optional): Timestamp of the value,
0 if omitted.
Returns:
None
"""
# TODO: Some types/shapes are still not fully correct here.
key = escape_key(key)
if timestamp is None:
timestamp = 0
if isinstance(value, list):
shape = (1, len(value))
try:
dtype = type(value[0])
except IndexError:
# Assume empty lists are string-typed.
dtype = self.ascii_dt
elif isinstance(value, np.ndarray):
shape = value.shape
dtype = value.dtype
else:
shape = (1,)
dtype = type(value)
if dtype is str:
dtype = self.ascii_dt
self.__run_group.create_dataset(
f'{key}/value', data=value, shape=shape, dtype=dtype)
self.__run_group.create_dataset(
f'{key}/timestamp', data=timestamp, shape=shape, dtype=np.uint64)
def create_index(self, num_trains):
"""Create source-specific INDEX datasets.
Depending on whether this source has train-varying data or not,
different count/first datasets are written.
Args:
num_trains (int): Total number of trains in this file.
Returns:
None
"""
if len(self) > 0:
count_func = np.ones
first_func = np.arange
else:
count_func = np.zeros
first_func = np.zeros
index_group = self.get_index_group()
index_group.create_dataset(
'count', data=count_func(num_trains, dtype=np.uint64))
index_group.create_dataset(
'first', data=first_func(num_trains, dtype=np.uint64))
class InstrumentSource(h5py.Group):
"""Group for an instrument source ("fast data").
This class extends h5py.Group with methods specific to writing data
of a control source in the European XFEL file format. The internal
state does not depend on using any of these methods, and the
underlying file may be manipulated by any of the regular h5py
methods, too.
"""
key_pattern = re.compile(r'^\w+\/[\w\/]+$')
def __init__(self, group_id, source):
super().__init__(group_id)
self.__source = source
def get_index_group(self, channel):
return self.file.require_group(f'INDEX/{self.__source}/{channel}')
def create_key(self, key, data=None, **kwargs):
"""Create dataset for a key.
Args:
key (str): Source key, dots are automatically replaced by
slashes.
data (array_like, optional): Key data to initialize the
dataset to.
kwargs: Any additional keyword arguments are passed to
create_dataset.
Returns:
(h5py.Dataset) Created dataset
"""
key = escape_key(key)
if not self.key_pattern.match(key):
raise ValueError(f'invalid key format, must satisfy '
f'{self.key_pattern.pattern}')
return self.create_dataset(key, data=data, **kwargs)
def create_index(self, *args, **channels):
"""Create source-specific INDEX datasets.
Instrument data is indexed by channel, which is the first
component in its key. If channels have already been created, the
index may be applied to all channels by passing them as a
positional argument.
"""
if not channels:
try:
count = int(args[0])
except IndexError:
raise ValueError('positional arguments required if no '
'explicit channels are passed') from None
# Allow ValueError to propagate directly.
channels = {channel: count for channel in self}
for channel, count in channels.items():
index_group = self.get_index_group(channel)
index_group.create_dataset('count', data=count, dtype=np.uint64)
index_group.create_dataset(
'first', data=get_pulse_offsets(count), dtype=np.uint64)
from cython cimport boundscheck, wraparound, cdivision
from cython.view cimport contiguous
from cython.parallel cimport prange
from cal_tools.enums import BadPixels
ctypedef unsigned short cell_t
ctypedef unsigned short raw_t
ctypedef float data_t
ctypedef unsigned char gain_t
ctypedef unsigned int mask_t
cdef mask_t WRONG_GAIN_VALUE = BadPixels.WRONG_GAIN_VALUE, \
VALUE_OUT_OF_RANGE = BadPixels.VALUE_OUT_OF_RANGE, \
VALUE_IS_NAN = BadPixels.VALUE_IS_NAN
@boundscheck(False)
@wraparound(False)
@cdivision(True)
def correct_lpd_frames(
# (frame, x, y)
raw_t[:, :, ::contiguous] in_raw,
cell_t[::contiguous] in_cell,
# (frame, x, y)
data_t[:, :, ::contiguous] out_data,
gain_t[:, :, ::contiguous] out_gain,
mask_t[:, :, ::contiguous] out_mask,
# (cell, x, y, gain)
float[:, :, :, ::contiguous] ccv_offset,
float[:, :, :, ::contiguous] ccv_gain,
mask_t[:, :, :, ::contiguous] ccv_mask,
int num_threads=1,
):
cdef int frame, ss, fs
cdef cell_t cell
cdef data_t data
cdef gain_t gain
cdef mask_t mask
for frame in prange(in_raw.shape[0], nogil=True, num_threads=num_threads):
cell = in_cell[frame]
for ss in range(in_raw.shape[1]):
for fs in range(in_raw.shape[2]):
# Decode intensity and gain from raw data.
data = <data_t>(in_raw[frame, ss, fs] & 0xFFF)
gain = <gain_t>((in_raw[frame, ss, fs] & 0x3000) >> 12)
if gain <= 2:
mask = ccv_mask[cell, ss, fs, gain]
else:
data = 0.0
gain = 0
mask = WRONG_GAIN_VALUE
data = data - ccv_offset[cell, ss, fs, gain]
data = data * ccv_gain[cell, ss, fs, gain]
if data > 1e7 or data < -1e7:
data = 0.0
mask = mask | VALUE_OUT_OF_RANGE
out_data[frame, ss, fs] = data
out_gain[frame, ss, fs] = gain
out_mask[frame, ss, fs] = mask
......@@ -85,11 +85,11 @@ notebooks = {
"cluster cores": 8},
},
"CORRECT": {
"notebook": "notebooks/LPD/LPD_Correct_and_Verify.ipynb",
"notebook": "notebooks/LPD/LPD_Correct_Fast.ipynb",
"concurrency": {"parameter": "sequences",
"default concurrency": [-1],
"use function": "balance_sequences",
"cluster cores": 32},
"cluster cores": 16},
},
"XGM_MINE": {
"notebook": "notebooks/LPD/Mine_RadIntensity_vs_XGM_NBC.ipynb",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment