Skip to content
Snippets Groups Projects

[Timepix] Throw pasha at it

Merged Philipp Schmidt requested to merge feat/timepix-parallelization into master
All threads resolved!
1 file
+ 41
35
Compare changes
  • Side-by-side
  • Inline
karabo_id = 'SQS_EXP_TIMEPIX'
karabo_id = 'SQS_EXP_TIMEPIX'
in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0'
in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0'
out_device_id = '{karabo_id}/CAL/TIMEPIX3'
out_device_id = '{karabo_id}/CAL/TIMEPIX3'
out_fast_data = '{karabo_id}/CAL/TIMEPIX3:outputChip0'
out_fast_data = '{karabo_id}/CAL/TIMEPIX3:daqOutput.chip0'
out_aggregator = 'TIMEPIX01'
out_aggregator = 'TIMEPIX01'
out_seq_len = 2000
out_seq_len = 2000
clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other
clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other
clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions
clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions
clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster
clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster
clustering_n_jobs = 5 # centroiding: (DBSCAN) The number of parallel jobs to run.
threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data
threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data
raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string,
raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string,
from datetime import datetime
from datetime import datetime
from pathlib import Path
from pathlib import Path
from time import monotonic
from time import monotonic
 
from os import cpu_count
from warnings import warn
from warnings import warn
import numpy as np
import numpy as np
import scipy.ndimage as nd
import scipy.ndimage as nd
import h5py
import h5py
 
import pasha as psh
from sklearn.cluster import DBSCAN
from sklearn.cluster import DBSCAN
from extra_data import RunDirectory
from extra_data import RunDirectory
clustering_epsilon=2,
clustering_epsilon=2,
clustering_tof_scale=1e7,
clustering_tof_scale=1e7,
clustering_min_samples=3,
clustering_min_samples=3,
clustering_n_jobs=1,
centroiding_timewalk_lut=None):
centroiding_timewalk_lut=None):
# format input data
# format input data
_tpx_data = {
_tpx_data = {
# clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit,
# clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit,
# each sample belonging to a cluster is labeled with an integer cluster id no)
# each sample belonging to a cluster is labeled with an integer cluster id no)
_tpx_data = pre_clustering_filter(_tpx_data, tot_threshold=threshold_tot)
_tpx_data = pre_clustering_filter(_tpx_data, tot_threshold=threshold_tot)
_tpx_data["labels"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples, n_jobs=clustering_n_jobs)
_tpx_data["labels"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples)
_tpx_data = post_clustering_filter(_tpx_data)
_tpx_data = post_clustering_filter(_tpx_data)
# compute centroid data (reduce cluster of samples to a single point with properties)
# compute centroid data (reduce cluster of samples to a single point with properties)
if _tpx_data["labels"] is None or _tpx_data["labels"].size == 0:
if _tpx_data["labels"] is None or _tpx_data["labels"].size == 0:
# handle case of no identified clusters, return empty dictionary with expected keys
# handle case of no identified clusters, return empty dictionary with expected keys
return empty_centroid_data()
return empty_centroid_data()
_centroids = get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut)
_centroids = get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut)
return _centroids
return _centroids
 
 
 
def process_train(worker_id, index, train_id, data):
 
events = data[in_fast_data]
 
 
sel = np.s_[:events['data.size']]
 
 
x = events['data.x'][sel]
 
y = events['data.y'][sel]
 
tot = events['data.tot'][sel]
 
toa = events['data.toa'][sel]
 
 
if raw_timewalk_lut is not None:
 
toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3
 
 
centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)
 
 
num_centroids = len(centroids['x'])
 
fraction_centroids = np.sum(centroids["size"])/events['data.size'] if events['data.size']>0 else np.nan
 
missing_centroids = num_centroids > max_num_centroids
 
 
if num_centroids > max_num_centroids:
 
warn('Number of centroids is larger than the defined maximum, some data cannot be written to disk')
 
 
for key in centroid_dt.names:
 
out_data[index, :num_centroids][key] = centroids[key]
 
out_stats[index]["fraction_px_in_centroids"] = fraction_centroids
 
out_stats[index]["N_centroids"] = num_centroids
 
out_stats[index]["missing_centroids"] = missing_centroids
%% Cell type:code id:56306c15-513e-4e7f-9c47-c52ca61b27a8 tags:
%% Cell type:code id:56306c15-513e-4e7f-9c47-c52ca61b27a8 tags:
clustering_epsilon=clustering_epsilon,
clustering_epsilon=clustering_epsilon,
clustering_tof_scale=clustering_tof_scale,
clustering_tof_scale=clustering_tof_scale,
clustering_min_samples=clustering_min_samples,
clustering_min_samples=clustering_min_samples,
clustering_n_jobs=clustering_n_jobs,
centroiding_timewalk_lut=centroiding_timewalk_lut)
centroiding_timewalk_lut=centroiding_timewalk_lut)
print('Computing centroids and writing to file', flush=True, end='')
 
psh.set_default_context('processes', num_workers=(num_workers := cpu_count() // 4))
 
 
print(f'Computing centroids with {num_workers} workers and writing to file', flush=True, end='')
start = monotonic()
start = monotonic()
for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):
for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):
instrument_channels=[f'{out_fast_data}/data'])
instrument_channels=[f'{out_fast_data}/data'])
seq_file.create_index(train_ids)
seq_file.create_index(train_ids)
out_data = np.empty((len(train_ids), max_num_centroids), dtype=centroid_dt)
out_data = psh.alloc(shape=(len(train_ids), max_num_centroids), dtype=centroid_dt)
 
out_stats = psh.alloc(shape=(len(train_ids),), dtype=centroid_stats_dt)
 
out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)
out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)
out_stats = np.empty((len(train_ids),), dtype=centroid_stats_dt)
out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template])
out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template])
for index, (train_id, data) in enumerate(seq_dc.trains()):
psh.map(process_train, seq_dc)
events = data[in_fast_data]
sel = np.s_[:events['data.size']]
x = events['data.x'][sel]
y = events['data.y'][sel]
tot = events['data.tot'][sel]
toa = events['data.toa'][sel]
if raw_timewalk_lut is not None:
toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3
centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)
num_centroids = len(centroids['x'])
fraction_centroids = np.sum(centroids["size"])/events['data.size'] if events['data.size']>0 else np.nan
missing_centroids = num_centroids > max_num_centroids
if num_centroids > max_num_centroids:
warn('number of centroids larger than definde maximum, some data cannot be written to disk')
for key in centroid_dt.names:
out_data[key][index, :num_centroids] = centroids[key]
out_stats["fraction_px_in_centroids"][index] = fraction_centroids
out_stats["N_centroids"][index] = num_centroids
out_stats["missing_centroids"][index] = missing_centroids
# Create sources.
# Create sources.
cur_slow_data = seq_file.create_control_source(out_device_id)
cur_slow_data = seq_file.create_control_source(out_device_id)
Loading