Skip to content
Snippets Groups Projects

[Timepix] Throw pasha at it

Merged Philipp Schmidt requested to merge feat/timepix-parallelization into master
1 file
+ 41
35
Compare changes
  • Side-by-side
  • Inline
@@ -33,7 +33,7 @@
"karabo_id = 'SQS_EXP_TIMEPIX'\n",
"in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0'\n",
"out_device_id = '{karabo_id}/CAL/TIMEPIX3'\n",
"out_fast_data = '{karabo_id}/CAL/TIMEPIX3:outputChip0'\n",
"out_fast_data = '{karabo_id}/CAL/TIMEPIX3:daqOutput.chip0'\n",
"out_aggregator = 'TIMEPIX01'\n",
"out_seq_len = 2000\n",
"\n",
@@ -45,7 +45,6 @@
"clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other\n",
"clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions\n",
"clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster\n",
"clustering_n_jobs = 5 # centroiding: (DBSCAN) The number of parallel jobs to run.\n",
"threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data\n",
"\n",
"raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string,\n",
@@ -62,11 +61,13 @@
"from datetime import datetime\n",
"from pathlib import Path\n",
"from time import monotonic\n",
"from os import cpu_count\n",
"from warnings import warn\n",
"\n",
"import numpy as np\n",
"import scipy.ndimage as nd\n",
"import h5py\n",
"import pasha as psh\n",
"\n",
"from sklearn.cluster import DBSCAN\n",
"from extra_data import RunDirectory\n",
@@ -247,7 +248,6 @@
" clustering_epsilon=2,\n",
" clustering_tof_scale=1e7,\n",
" clustering_min_samples=3,\n",
" clustering_n_jobs=1,\n",
" centroiding_timewalk_lut=None):\n",
" # format input data\n",
" _tpx_data = {\n",
@@ -269,14 +269,43 @@
" # clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit,\n",
" # each sample belonging to a cluster is labeled with an integer cluster id no)\n",
" _tpx_data = pre_clustering_filter(_tpx_data, tot_threshold=threshold_tot)\n",
" _tpx_data[\"labels\"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples, n_jobs=clustering_n_jobs)\n",
" _tpx_data[\"labels\"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples)\n",
" _tpx_data = post_clustering_filter(_tpx_data)\n",
" # compute centroid data (reduce cluster of samples to a single point with properties)\n",
" if _tpx_data[\"labels\"] is None or _tpx_data[\"labels\"].size == 0:\n",
" # handle case of no identified clusters, return empty dictionary with expected keys\n",
" return empty_centroid_data()\n",
" _centroids = get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut)\n",
" return _centroids"
" return _centroids\n",
"\n",
"\n",
"def process_train(worker_id, index, train_id, data):\n",
" events = data[in_fast_data]\n",
"\n",
" sel = np.s_[:events['data.size']]\n",
"\n",
" x = events['data.x'][sel]\n",
" y = events['data.y'][sel]\n",
" tot = events['data.tot'][sel]\n",
" toa = events['data.toa'][sel]\n",
"\n",
" if raw_timewalk_lut is not None:\n",
" toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3\n",
"\n",
" centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)\n",
"\n",
" num_centroids = len(centroids['x'])\n",
" fraction_centroids = np.sum(centroids[\"size\"])/events['data.size'] if events['data.size']>0 else np.nan\n",
" missing_centroids = num_centroids > max_num_centroids\n",
"\n",
" if num_centroids > max_num_centroids:\n",
" warn('Number of centroids is larger than the defined maximum, some data cannot be written to disk')\n",
"\n",
" for key in centroid_dt.names:\n",
" out_data[index, :num_centroids][key] = centroids[key]\n",
" out_stats[index][\"fraction_px_in_centroids\"] = fraction_centroids\n",
" out_stats[index][\"N_centroids\"] = num_centroids\n",
" out_stats[index][\"missing_centroids\"] = missing_centroids"
]
},
{
@@ -354,10 +383,12 @@
" clustering_epsilon=clustering_epsilon,\n",
" clustering_tof_scale=clustering_tof_scale,\n",
" clustering_min_samples=clustering_min_samples,\n",
" clustering_n_jobs=clustering_n_jobs,\n",
" centroiding_timewalk_lut=centroiding_timewalk_lut)\n",
"\n",
"print('Computing centroids and writing to file', flush=True, end='')\n",
"\n",
"psh.set_default_context('processes', num_workers=(num_workers := cpu_count() // 4))\n",
" \n",
"print(f'Computing centroids with {num_workers} workers and writing to file', flush=True, end='')\n",
"start = monotonic()\n",
"\n",
"for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):\n",
@@ -371,38 +402,13 @@
" instrument_channels=[f'{out_fast_data}/data'])\n",
" seq_file.create_index(train_ids)\n",
" \n",
" out_data = np.empty((len(train_ids), max_num_centroids), dtype=centroid_dt)\n",
" out_data = psh.alloc(shape=(len(train_ids), max_num_centroids), dtype=centroid_dt)\n",
" out_stats = psh.alloc(shape=(len(train_ids),), dtype=centroid_stats_dt)\n",
" \n",
" out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)\n",
" out_stats = np.empty((len(train_ids),), dtype=centroid_stats_dt)\n",
" out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template])\n",
" \n",
" for index, (train_id, data) in enumerate(seq_dc.trains()):\n",
" events = data[in_fast_data]\n",
"\n",
" sel = np.s_[:events['data.size']]\n",
"\n",
" x = events['data.x'][sel]\n",
" y = events['data.y'][sel]\n",
" tot = events['data.tot'][sel]\n",
" toa = events['data.toa'][sel]\n",
"\n",
" if raw_timewalk_lut is not None:\n",
" toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3\n",
"\n",
" centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)\n",
"\n",
" num_centroids = len(centroids['x'])\n",
" fraction_centroids = np.sum(centroids[\"size\"])/events['data.size'] if events['data.size']>0 else np.nan\n",
" missing_centroids = num_centroids > max_num_centroids\n",
" \n",
" if num_centroids > max_num_centroids:\n",
" warn('number of centroids larger than definde maximum, some data cannot be written to disk')\n",
" \n",
" for key in centroid_dt.names:\n",
" out_data[key][index, :num_centroids] = centroids[key]\n",
" out_stats[\"fraction_px_in_centroids\"][index] = fraction_centroids\n",
" out_stats[\"N_centroids\"][index] = num_centroids\n",
" out_stats[\"missing_centroids\"][index] = missing_centroids\n",
" psh.map(process_train, seq_dc)\n",
" \n",
" # Create sources.\n",
" cur_slow_data = seq_file.create_control_source(out_device_id)\n",
Loading