Skip to content
Snippets Groups Projects

[TIMEPIX] Bug Fix: Allow manual specification of the proposal

Merged Björn Senfftleben requested to merge fix/timepix_with_specified_proposal into master
1 file
+ 1
1
Compare changes
  • Side-by-side
  • Inline
%% Cell type:markdown id:0393ed18-b4e5-499b-bdd3-c9e5f24b9627 tags:
# Timepix3
Author: Björn Senfftleben / Philipp Schmidt, Version: 1.0
The following notebook provides centroiding for data acquired with the Timepix3 camera detector (ASI TPX3CAM).
%% Cell type:code id:9484ee10 tags:
``` python
# Data selection parameters.
run = 307 # required
in_folder = '/gpfs/exfel/exp/SQS/202430/p900421/raw' # required
out_folder = '/gpfs/exfel/exp/SQS/202430/p900421/scratch/cal_test' # required
proposal = '' # Proposal, leave empty for auto detection based on in_folder
# These parameters are required by xfel-calibrate but ignored in this notebook.
cal_db_timeout = 0 # Calibration DB timeout, currently not used.
cal_db_interface = 'foo' # Calibration DB interface, currently not used.
karabo_da = 'bar' # Karabo data aggregator name, currently not used
karabo_id = 'SQS_EXP_TIMEPIX'
in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0'
out_device_id = '{karabo_id}/CAL/TIMEPIX3'
out_fast_data = '{karabo_id}/CAL/TIMEPIX3:daqOutput.chip0'
out_aggregator = 'TIMEPIX01'
out_seq_len = 2000
max_num_centroids = 10000 # Maximum number of centroids per train
chunks_centroids = [1, 5000] # Chunking of centroid data
dataset_compression = 'gzip' # HDF compression method.
dataset_compression_opts = 3 # HDF GZIP compression level.
clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other
clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions
clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster
threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data
raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string,
centroiding_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string.
```
%% Cell type:code id:524fe654-e112-4abe-813c-a0be9b3a3034 tags:
``` python
from datetime import datetime
from pathlib import Path
from time import monotonic
from os import cpu_count
from warnings import warn
import numpy as np
import scipy.ndimage as nd
import h5py
import pasha as psh
from sklearn.cluster import DBSCAN
from extra_data import RunDirectory
from extra_data.read_machinery import find_proposal
from cal_tools.files import DataFile, sequence_pulses
%matplotlib inline
```
%% Cell type:code id:e36f997c-4b66-4b11-99a8-5887e3572f56 tags:
``` python
# centroiding
error_msgs = {
-1: "tpx_data has an invalid structure - ignore provided data",
-2: "tpx_data arrays are of invalid lengths - ignore provided data",
-3: "tpx_data arrays are empty"
}
def check_data(tpx_data):
required_keys = ["x", "y", "toa", "tot"]
for key in required_keys:
if key not in tpx_data.keys():
warn("tpx data must contain the keys %s, but key %s not in tpx data keys (%s)" % (required_keys, key, list(tpx_data.keys())),
category=UserWarning)
return -1
reference_n_samples_key = "x"
n_samples = len(tpx_data[reference_n_samples_key])
for key in tpx_data.keys():
if n_samples != len(tpx_data[key]):
warn("arrays in tpx data must be of same length ( len(tpx_data[%s])=%i!=%i=(len(tpx_data[%s]) )" % (reference_n_samples_key, n_samples, len(tpx_data[key]), key),
category=UserWarning)
return -2
if n_samples == 0:
warn("no samples were provides with tpx data", category=UserWarning)
return -3
return 0
def apply_single_filter(tpx_data, _filter):
"""
Simple function to apply a selecting or sorting filter to a dictionary of equally sized arrays
Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory!
Parameters
----------
tpx_data: dictionary with timepix data, all arrays behind each key must be of same length
_filter: 1d array or list of integers or booleans or np.s_ to select or sort data like a = a[_filter]
Returns
-------
tpx_data: like input tpx_data but with applied filter
"""
try:
for key in tpx_data.keys():
tpx_data[key] = np.array(tpx_data[key])[_filter]
except Exception as e:
print(_filter)
print(_filter.dtype)
print(_filter.shape)
print(tpx_data[key].shape)
raise e
return tpx_data
def clustering(tpx_data, epsilon=2, tof_scale=1e7, min_samples=3, n_jobs=1):
"""
Parameters
----------
tpx_data Dictionary with timepix data, all arrays behind each key must be of same length, now with key labels
epsilon The maximum distance between two samples for one to be considered as in the neighborhood of the other.
This is not a maximum bound on the distances of points within a cluster. This is the most important
DBSCAN parameter to choose appropriately for your data set and distance function.
tof_scale Scaling factor for the ToA data so that the epsilon parameter in DB scan works not only in the x/y
axes, but also in the ToA axis. So it converts ToA in s into "ToA pixels" -> e.g. tof_scale=1e7 means,
that 100 ns is considered comparable to 1 spatial pixel.
min_samples The number of samples (or total weight) in a neighborhood for a point to be considered as a core point.
This includes the point itself.
n_jobs The number of parallel jobs to run. None means 1 unless in a joblib.parallel_backend context.
-1 means using all processors. See Glossary for more details.
Returns
-------
"""
coords = np.column_stack((tpx_data["x"], tpx_data["y"], tpx_data["toa"]*tof_scale))
dist = DBSCAN(eps=epsilon, min_samples=min_samples, metric="euclidean", n_jobs=n_jobs).fit(coords)
return dist.labels_
def empty_centroid_data():
return {
"x": np.array([]),
"y": np.array([]),
"toa": np.array([]),
"tot": np.array([]),
"tot_avg": np.array([]),
"tot_max": np.array([]),
"size": np.array([]),
}
def get_centroids(tpx_data, timewalk_lut=None):
centroid_data = empty_centroid_data()
cluster_labels, cluster_size = np.unique(tpx_data["labels"], return_counts=True)
cluster_tot_peaks = np.array(nd.maximum_position(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels)).ravel()
cluster_tot_integrals = nd.sum(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels)
# compute centroid center through weighted average
centroid_data["x"] = np.array(nd.sum(tpx_data["x"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel()
centroid_data["y"] = np.array(nd.sum(tpx_data["y"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel()
centroid_data["toa"] = np.array(nd.sum(tpx_data["toa"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel()
# intensity & size information
centroid_data["tot_avg"] = np.array(nd.mean(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels))
centroid_data["tot_max"] = tpx_data["tot"][cluster_tot_peaks]
centroid_data["tot"] = np.array(cluster_tot_integrals)
centroid_data["size"] = cluster_size
# train ID information
# ~ centroid_data["tid"] = tpx_data["tid"][cluster_tot_peaks]
# correct for timewalk if provided
if timewalk_lut is not None:
centroid_data["toa"] -= timewalk_lut[np.int_(centroid_data["tot_max"] // 25) - 1] * 1e3
return centroid_data
def compute_centroids(x, y, tof, tot,
threshold_tot=0,
clustering_epsilon=2,
clustering_tof_scale=1e7,
clustering_min_samples=3,
centroiding_timewalk_lut=None):
# format input data
_tpx_data = {
"x": x.astype(float),
"y": y.astype(float),
"toa": tof.astype(float),
"tot": tot.astype(float)
}
# ensure that valid data is available
data_validation = check_data(_tpx_data)
if data_validation < 0:
if data_validation in error_msgs.keys():
print("Data validation failed with message: %s" % error_msgs[data_validation])
else:
print("Data validation failed: unknown reason")
return np.array([]), empty_centroid_data()
# clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit,
# each sample belonging to a cluster is labeled with an integer cluster id no)
if threshold_tot > 0:
_tpx_data = apply_single_filter(_tpx_data, _tpx_data["tot"] >= threshold_tot)
labels = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples)
_tpx_data["labels"] = labels
if labels is not None:
_tpx_data = apply_single_filter(_tpx_data, labels >= 0)
# compute centroid data (reduce cluster of samples to a single point with properties)
if labels is None or len(_tpx_data['x']) == 0:
# handle case of no identified clusters, return empty dictionary with expected keys
return np.array([]), empty_centroid_data()
return labels, get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut)
def process_train(worker_id, index, train_id, data):
events = data[in_fast_data]
sel = np.s_[:events['data.size']]
x = events['data.x'][sel]
y = events['data.y'][sel]
tot = events['data.tot'][sel]
toa = events['data.toa'][sel]
if raw_timewalk_lut is not None:
toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3
labels, centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)
num_centroids = len(centroids['x'])
fraction_centroids = np.sum(centroids["size"])/events['data.size'] if events['data.size']>0 else np.nan
missing_centroids = num_centroids > max_num_centroids
if num_centroids > max_num_centroids:
warn('Number of centroids is larger than the defined maximum, some data cannot be written to disk')
for key in centroid_dt.names:
out_data[index, :num_centroids][key] = centroids[key]
out_labels[index, :len(labels)] = labels
out_stats[index]["fraction_px_in_centroids"] = fraction_centroids
out_stats[index]["N_centroids"] = num_centroids
out_stats[index]["missing_centroids"] = missing_centroids
```
%% Cell type:code id:56306c15-513e-4e7f-9c47-c52ca61b27a8 tags:
``` python
dc = RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True)
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2]
proposal = list(filter(None, in_folder.strip('/').split('/')))[-2] if proposal=='' else proposal
base_path=find_proposal(proposal)
if raw_timewalk_lut_filepath:
raw_timewalk_lut_filepath_full = (Path(base_path) / Path(raw_timewalk_lut_filepath)).resolve()
raw_timewalk_lut = np.load(raw_timewalk_lut_filepath_full)
else:
raw_timewalk_lut = None
if centroiding_timewalk_lut_filepath:
centroiding_timewalk_lut_filepath_full = (Path(base_path) / Path(centroiding_timewalk_lut_filepath)).resolve()
centroiding_timewalk_lut = np.load(centroiding_timewalk_lut_filepath_full)
else:
centroiding_timewalk_lut = None
```
%% Cell type:code id:8c28a4c8-961c-496b-80da-7fd867e5b0d3 tags:
``` python
in_fast_data = in_fast_data.format(karabo_id=karabo_id)
out_device_id = out_device_id.format(karabo_id=karabo_id)
out_fast_data = out_fast_data.format(karabo_id=karabo_id)
Path(out_folder).mkdir(exist_ok=True, parents=True)
in_dc = dc.select(in_fast_data, require_all=True)
dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}
centroid_dt = np.dtype([('x', np.float64),
('y', np.float64),
('toa', np.float64),
('tot', np.float64),
('tot_avg', np.float64),
('tot_max', np.uint16),
('size', np.int16)])
pixel_shape = in_dc[in_fast_data]['data.x'].entry_shape
centroid_settings_template = {
'timewalk_correction.raw_applied': (np.bool_, bool(raw_timewalk_lut_filepath)),
'timewalk_correction.raw_file': ("S100", str(raw_timewalk_lut_filepath)[-100:]),
'timewalk_correction.centroiding_applied': (np.bool_, bool(centroiding_timewalk_lut_filepath)),
'timewalk_correction.centroiding_file': ("S100", str(centroiding_timewalk_lut_filepath)[-100:]),
'clustering.epsilon': (np.float64, float(clustering_epsilon)),
'clustering.tof_scale': (np.float64, float(clustering_tof_scale)),
'clustering.min_samples': (np.int16, int(clustering_min_samples)),
'threshold_tot': (np.int16, int(threshold_tot)),
}
centroid_stats_template = {
'N_centroids': (np.int64, -1),
'missing_centroids': (np.bool_, False),
'fraction_px_in_centroids': (np.float64, np.nan),
}
centroid_settings_dt = np.dtype([(key, centroid_settings_template[key][0]) for key in centroid_settings_template])
centroid_stats_dt = np.dtype([(key, centroid_stats_template[key][0]) for key in centroid_stats_template])
centroiding_kwargs = dict(
threshold_tot=threshold_tot,
clustering_epsilon=clustering_epsilon,
clustering_tof_scale=clustering_tof_scale,
clustering_min_samples=clustering_min_samples,
centroiding_timewalk_lut=centroiding_timewalk_lut)
psh.set_default_context('processes', num_workers=(num_workers := cpu_count() // 4))
print(f'Computing centroids with {num_workers} workers and writing to file', flush=True, end='')
start = monotonic()
for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):
train_ids = seq_dc.train_ids
m_data_sources = []
with DataFile.from_details(out_folder, out_aggregator, run, seq_id) as seq_file:
# No support needed for old EXDF files.
seq_file.create_metadata(like=in_dc, sequence=seq_id,
control_sources=[out_device_id],
instrument_channels=[f'{out_fast_data}/data'])
seq_file.create_index(train_ids)
out_labels = psh.alloc(shape=(len(train_ids),) + pixel_shape, dtype=np.int32)
out_data = psh.alloc(shape=(len(train_ids), max_num_centroids), dtype=centroid_dt)
out_stats = psh.alloc(shape=(len(train_ids),), dtype=centroid_stats_dt)
out_labels[:] = -1
out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)
out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template])
psh.map(process_train, seq_dc)
# Create sources.
cur_slow_data = seq_file.create_control_source(out_device_id)
cur_fast_data = seq_file.create_instrument_source(out_fast_data)
# Add source indices.
cur_slow_data.create_index(len(train_ids))
cur_fast_data.create_index(data=np.ones_like(train_ids))
for key, (type_, data) in centroid_settings_template.items():
cur_slow_data.create_run_key(f'settings.{key}', data)
cur_fast_data.create_key('data.labels', data=out_labels,
chunks=(1,) + pixel_shape, **dataset_kwargs)
cur_fast_data.create_key('data.centroids', out_data,
chunks=tuple(chunks_centroids),
**dataset_kwargs)
cur_fast_data.create_key('data.stats', out_stats)
print('.', flush=True, end='')
end = monotonic()
print('')
print(f'{end-start:.01f}s')
```
Loading