diff --git a/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb b/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb index 55b3f9ea1278153d817a3332289e2aa46c4f885e..036bb35f7d417d5afa27d3fc66f80c3c55dabc7a 100755 --- a/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb +++ b/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb @@ -33,7 +33,7 @@ "karabo_id = 'SQS_EXP_TIMEPIX'\n", "in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0'\n", "out_device_id = '{karabo_id}/CAL/TIMEPIX3'\n", - "out_fast_data = '{karabo_id}/CAL/TIMEPIX3:outputChip0'\n", + "out_fast_data = '{karabo_id}/CAL/TIMEPIX3:daqOutput.chip0'\n", "out_aggregator = 'TIMEPIX01'\n", "out_seq_len = 2000\n", "\n", @@ -45,7 +45,6 @@ "clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other\n", "clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions\n", "clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster\n", - "clustering_n_jobs = 5 # centroiding: (DBSCAN) The number of parallel jobs to run.\n", "threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data\n", "\n", "raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string,\n", @@ -62,11 +61,13 @@ "from datetime import datetime\n", "from pathlib import Path\n", "from time import monotonic\n", + "from os import cpu_count\n", "from warnings import warn\n", "\n", "import numpy as np\n", "import scipy.ndimage as nd\n", "import h5py\n", + "import pasha as psh\n", "\n", "from sklearn.cluster import DBSCAN\n", "from extra_data import RunDirectory\n", @@ -247,7 +248,6 @@ " clustering_epsilon=2,\n", " clustering_tof_scale=1e7,\n", " clustering_min_samples=3,\n", - " clustering_n_jobs=1,\n", " centroiding_timewalk_lut=None):\n", " # format input data\n", " _tpx_data = {\n", @@ -269,14 +269,43 @@ " # clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit,\n", " # each sample belonging to a cluster is labeled with an integer cluster id no)\n", " _tpx_data = pre_clustering_filter(_tpx_data, tot_threshold=threshold_tot)\n", - " _tpx_data[\"labels\"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples, n_jobs=clustering_n_jobs)\n", + " _tpx_data[\"labels\"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples)\n", " _tpx_data = post_clustering_filter(_tpx_data)\n", " # compute centroid data (reduce cluster of samples to a single point with properties)\n", " if _tpx_data[\"labels\"] is None or _tpx_data[\"labels\"].size == 0:\n", " # handle case of no identified clusters, return empty dictionary with expected keys\n", " return empty_centroid_data()\n", " _centroids = get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut)\n", - " return _centroids" + " return _centroids\n", + "\n", + "\n", + "def process_train(worker_id, index, train_id, data):\n", + " events = data[in_fast_data]\n", + "\n", + " sel = np.s_[:events['data.size']]\n", + "\n", + " x = events['data.x'][sel]\n", + " y = events['data.y'][sel]\n", + " tot = events['data.tot'][sel]\n", + " toa = events['data.toa'][sel]\n", + "\n", + " if raw_timewalk_lut is not None:\n", + " toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3\n", + "\n", + " centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)\n", + "\n", + " num_centroids = len(centroids['x'])\n", + " fraction_centroids = np.sum(centroids[\"size\"])/events['data.size'] if events['data.size']>0 else np.nan\n", + " missing_centroids = num_centroids > max_num_centroids\n", + "\n", + " if num_centroids > max_num_centroids:\n", + " warn('Number of centroids is larger than the defined maximum, some data cannot be written to disk')\n", + "\n", + " for key in centroid_dt.names:\n", + " out_data[index, :num_centroids][key] = centroids[key]\n", + " out_stats[index][\"fraction_px_in_centroids\"] = fraction_centroids\n", + " out_stats[index][\"N_centroids\"] = num_centroids\n", + " out_stats[index][\"missing_centroids\"] = missing_centroids" ] }, { @@ -354,10 +383,12 @@ " clustering_epsilon=clustering_epsilon,\n", " clustering_tof_scale=clustering_tof_scale,\n", " clustering_min_samples=clustering_min_samples,\n", - " clustering_n_jobs=clustering_n_jobs,\n", " centroiding_timewalk_lut=centroiding_timewalk_lut)\n", "\n", - "print('Computing centroids and writing to file', flush=True, end='')\n", + "\n", + "psh.set_default_context('processes', num_workers=(num_workers := cpu_count() // 4))\n", + " \n", + "print(f'Computing centroids with {num_workers} workers and writing to file', flush=True, end='')\n", "start = monotonic()\n", "\n", "for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):\n", @@ -371,38 +402,13 @@ " instrument_channels=[f'{out_fast_data}/data'])\n", " seq_file.create_index(train_ids)\n", " \n", - " out_data = np.empty((len(train_ids), max_num_centroids), dtype=centroid_dt)\n", + " out_data = psh.alloc(shape=(len(train_ids), max_num_centroids), dtype=centroid_dt)\n", + " out_stats = psh.alloc(shape=(len(train_ids),), dtype=centroid_stats_dt)\n", + " \n", " out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)\n", - " out_stats = np.empty((len(train_ids),), dtype=centroid_stats_dt)\n", " out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template])\n", " \n", - " for index, (train_id, data) in enumerate(seq_dc.trains()):\n", - " events = data[in_fast_data]\n", - "\n", - " sel = np.s_[:events['data.size']]\n", - "\n", - " x = events['data.x'][sel]\n", - " y = events['data.y'][sel]\n", - " tot = events['data.tot'][sel]\n", - " toa = events['data.toa'][sel]\n", - "\n", - " if raw_timewalk_lut is not None:\n", - " toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3\n", - "\n", - " centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)\n", - "\n", - " num_centroids = len(centroids['x'])\n", - " fraction_centroids = np.sum(centroids[\"size\"])/events['data.size'] if events['data.size']>0 else np.nan\n", - " missing_centroids = num_centroids > max_num_centroids\n", - " \n", - " if num_centroids > max_num_centroids:\n", - " warn('number of centroids larger than definde maximum, some data cannot be written to disk')\n", - " \n", - " for key in centroid_dt.names:\n", - " out_data[key][index, :num_centroids] = centroids[key]\n", - " out_stats[\"fraction_px_in_centroids\"][index] = fraction_centroids\n", - " out_stats[\"N_centroids\"][index] = num_centroids\n", - " out_stats[\"missing_centroids\"][index] = missing_centroids\n", + " psh.map(process_train, seq_dc)\n", " \n", " # Create sources.\n", " cur_slow_data = seq_file.create_control_source(out_device_id)\n",