Skip to content
Snippets Groups Projects
Commit 9b582f12 authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Update default test data for Timepix

parent 79375a0f
No related branches found
No related tags found
1 merge request!981[Timepix] Store labels assigned to pixels
%% Cell type:markdown id:0393ed18-b4e5-499b-bdd3-c9e5f24b9627 tags: %% Cell type:markdown id:0393ed18-b4e5-499b-bdd3-c9e5f24b9627 tags:
# Timepix3 # Timepix3
Author: Björn Senfftleben / Philipp Schmidt, Version: 1.0 Author: Björn Senfftleben / Philipp Schmidt, Version: 1.0
The following notebook provides centroiding for data acquired with the Timepix3 camera detector (ASI TPX3CAM). The following notebook provides centroiding for data acquired with the Timepix3 camera detector (ASI TPX3CAM).
%% Cell type:code id:9484ee10 tags: %% Cell type:code id:9484ee10 tags:
``` python ``` python
# Data selection parameters. # Data selection parameters.
run = 420 # required run = 307 # required
in_folder = '/gpfs/exfel/exp/SQS/202230/p900256/raw' # required in_folder = '/gpfs/exfel/exp/SQS/202430/p900421/raw' # required
out_folder = '/gpfs/exfel/exp/SQS/202230/p900256/scratch/cal_test' # required out_folder = '/gpfs/exfel/exp/SQS/202430/p900421/scratch/cal_test' # required
proposal = '' # Proposal, leave empty for auto detection based on in_folder proposal = '' # Proposal, leave empty for auto detection based on in_folder
# These parameters are required by xfel-calibrate but ignored in this notebook. # These parameters are required by xfel-calibrate but ignored in this notebook.
cal_db_timeout = 0 # Calibration DB timeout, currently not used. cal_db_timeout = 0 # Calibration DB timeout, currently not used.
cal_db_interface = 'foo' # Calibration DB interface, currently not used. cal_db_interface = 'foo' # Calibration DB interface, currently not used.
karabo_da = 'bar' # Karabo data aggregator name, currently not used karabo_da = 'bar' # Karabo data aggregator name, currently not used
karabo_id = 'SQS_EXP_TIMEPIX' karabo_id = 'SQS_EXP_TIMEPIX'
in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0' in_fast_data = '{karabo_id}/DET/TIMEPIX3:daqOutput.chip0'
out_device_id = '{karabo_id}/CAL/TIMEPIX3' out_device_id = '{karabo_id}/CAL/TIMEPIX3'
out_fast_data = '{karabo_id}/CAL/TIMEPIX3:daqOutput.chip0' out_fast_data = '{karabo_id}/CAL/TIMEPIX3:daqOutput.chip0'
out_aggregator = 'TIMEPIX01' out_aggregator = 'TIMEPIX01'
out_seq_len = 2000 out_seq_len = 2000
max_num_centroids = 10000 # Maximum number of centroids per train max_num_centroids = 10000 # Maximum number of centroids per train
chunks_centroids = [1, 5000] # Chunking of centroid data chunks_centroids = [1, 5000] # Chunking of centroid data
dataset_compression = 'gzip' # HDF compression method. dataset_compression = 'gzip' # HDF compression method.
dataset_compression_opts = 3 # HDF GZIP compression level. dataset_compression_opts = 3 # HDF GZIP compression level.
clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other clustering_epsilon = 2.0 # centroiding: The maximum distance between two samples for one to be considered as in the neighborhood of the other
clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions clustering_tof_scale = 1e7 # centroiding: Scaling factor for the ToA axis so that the epsilon parameter in DB scan works in all 3 dimensions
clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster clustering_min_samples = 2 # centroiding: minimum number of samples necessary for a cluster
threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data threshold_tot = 0 # raw data: minimum ToT necessary for a pixel to contain valid data
raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string, raw_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string,
centroiding_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string. centroiding_timewalk_lut_filepath = '' # fpath to look up table for timewalk correction relative to proposal path or empty string.
``` ```
%% Cell type:code id:524fe654-e112-4abe-813c-a0be9b3a3034 tags: %% Cell type:code id:524fe654-e112-4abe-813c-a0be9b3a3034 tags:
``` python ``` python
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from time import monotonic from time import monotonic
from os import cpu_count from os import cpu_count
from warnings import warn from warnings import warn
import numpy as np import numpy as np
import scipy.ndimage as nd import scipy.ndimage as nd
import h5py import h5py
import pasha as psh import pasha as psh
from sklearn.cluster import DBSCAN from sklearn.cluster import DBSCAN
from extra_data import RunDirectory from extra_data import RunDirectory
from extra_data.read_machinery import find_proposal from extra_data.read_machinery import find_proposal
from cal_tools.files import DataFile, sequence_pulses from cal_tools.files import DataFile, sequence_pulses
%matplotlib inline %matplotlib inline
``` ```
%% Cell type:code id:e36f997c-4b66-4b11-99a8-5887e3572f56 tags: %% Cell type:code id:e36f997c-4b66-4b11-99a8-5887e3572f56 tags:
``` python ``` python
# centroiding # centroiding
error_msgs = { error_msgs = {
-1: "tpx_data has an invalid structure - ignore provided data", -1: "tpx_data has an invalid structure - ignore provided data",
-2: "tpx_data arrays are of invalid lengths - ignore provided data", -2: "tpx_data arrays are of invalid lengths - ignore provided data",
-3: "tpx_data arrays are empty" -3: "tpx_data arrays are empty"
} }
def check_data(tpx_data): def check_data(tpx_data):
required_keys = ["x", "y", "toa", "tot"] required_keys = ["x", "y", "toa", "tot"]
for key in required_keys: for key in required_keys:
if key not in tpx_data.keys(): if key not in tpx_data.keys():
warn("tpx data must contain the keys %s, but key %s not in tpx data keys (%s)" % (required_keys, key, list(tpx_data.keys())), warn("tpx data must contain the keys %s, but key %s not in tpx data keys (%s)" % (required_keys, key, list(tpx_data.keys())),
category=UserWarning) category=UserWarning)
return -1 return -1
reference_n_samples_key = "x" reference_n_samples_key = "x"
n_samples = len(tpx_data[reference_n_samples_key]) n_samples = len(tpx_data[reference_n_samples_key])
for key in tpx_data.keys(): for key in tpx_data.keys():
if n_samples != len(tpx_data[key]): if n_samples != len(tpx_data[key]):
warn("arrays in tpx data must be of same length ( len(tpx_data[%s])=%i!=%i=(len(tpx_data[%s]) )" % (reference_n_samples_key, n_samples, len(tpx_data[key]), key), warn("arrays in tpx data must be of same length ( len(tpx_data[%s])=%i!=%i=(len(tpx_data[%s]) )" % (reference_n_samples_key, n_samples, len(tpx_data[key]), key),
category=UserWarning) category=UserWarning)
return -2 return -2
if n_samples == 0: if n_samples == 0:
warn("no samples were provides with tpx data", category=UserWarning) warn("no samples were provides with tpx data", category=UserWarning)
return -3 return -3
return 0 return 0
def apply_single_filter(tpx_data, _filter): def apply_single_filter(tpx_data, _filter):
""" """
Simple function to apply a selecting or sorting filter to a dictionary of equally sized arrays Simple function to apply a selecting or sorting filter to a dictionary of equally sized arrays
Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory! Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory!
Parameters Parameters
---------- ----------
tpx_data: dictionary with timepix data, all arrays behind each key must be of same length tpx_data: dictionary with timepix data, all arrays behind each key must be of same length
_filter: 1d array or list of integers or booleans or np.s_ to select or sort data like a = a[_filter] _filter: 1d array or list of integers or booleans or np.s_ to select or sort data like a = a[_filter]
Returns Returns
------- -------
tpx_data: like input tpx_data but with applied filter tpx_data: like input tpx_data but with applied filter
""" """
try: try:
for key in tpx_data.keys(): for key in tpx_data.keys():
tpx_data[key] = np.array(tpx_data[key])[_filter] tpx_data[key] = np.array(tpx_data[key])[_filter]
except Exception as e: except Exception as e:
print(_filter) print(_filter)
print(_filter.dtype) print(_filter.dtype)
print(_filter.shape) print(_filter.shape)
print(tpx_data[key].shape) print(tpx_data[key].shape)
raise e raise e
return tpx_data return tpx_data
def pre_clustering_filter(tpx_data, tot_threshold=0): def pre_clustering_filter(tpx_data, tot_threshold=0):
""" """
Collection of filters directly applied before clustering. Collection of filters directly applied before clustering.
Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory! Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory!
Parameters Parameters
---------- ----------
tpx_data: Dictionary with timepix data, all arrays behind each key must be of same length tpx_data: Dictionary with timepix data, all arrays behind each key must be of same length
tot_threshold: minimum ToT required for a pixel to contain valid data tot_threshold: minimum ToT required for a pixel to contain valid data
Returns Returns
------- -------
tpx_data: like input tpx_data but with applied filters tpx_data: like input tpx_data but with applied filters
""" """
if tot_threshold > 0: if tot_threshold > 0:
tpx_data = apply_single_filter(tpx_data, tpx_data["tot"] >= tot_threshold) tpx_data = apply_single_filter(tpx_data, tpx_data["tot"] >= tot_threshold)
return tpx_data return tpx_data
def post_clustering_filter(tpx_data): def post_clustering_filter(tpx_data):
""" """
Collection of filters directly applied after clustering. Collection of filters directly applied after clustering.
Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory! Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory!
Parameters Parameters
---------- ----------
tpx_data: Dictionary with timepix data, all arrays behind each key must be of same length, now with key labels tpx_data: Dictionary with timepix data, all arrays behind each key must be of same length, now with key labels
Returns Returns
------- -------
tpx_data: like input tpx_data but with applied filters tpx_data: like input tpx_data but with applied filters
""" """
if tpx_data["labels"] is not None: if tpx_data["labels"] is not None:
tpx_data = apply_single_filter(tpx_data, tpx_data["labels"] != 0) tpx_data = apply_single_filter(tpx_data, tpx_data["labels"] != 0)
return tpx_data return tpx_data
def clustering(tpx_data, epsilon=2, tof_scale=1e7, min_samples=3, n_jobs=1): def clustering(tpx_data, epsilon=2, tof_scale=1e7, min_samples=3, n_jobs=1):
""" """
Parameters Parameters
---------- ----------
tpx_data Dictionary with timepix data, all arrays behind each key must be of same length, now with key labels tpx_data Dictionary with timepix data, all arrays behind each key must be of same length, now with key labels
epsilon The maximum distance between two samples for one to be considered as in the neighborhood of the other. epsilon The maximum distance between two samples for one to be considered as in the neighborhood of the other.
This is not a maximum bound on the distances of points within a cluster. This is the most important This is not a maximum bound on the distances of points within a cluster. This is the most important
DBSCAN parameter to choose appropriately for your data set and distance function. DBSCAN parameter to choose appropriately for your data set and distance function.
tof_scale Scaling factor for the ToA data so that the epsilon parameter in DB scan works not only in the x/y tof_scale Scaling factor for the ToA data so that the epsilon parameter in DB scan works not only in the x/y
axes, but also in the ToA axis. So it converts ToA in s into "ToA pixels" -> e.g. tof_scale=1e7 means, axes, but also in the ToA axis. So it converts ToA in s into "ToA pixels" -> e.g. tof_scale=1e7 means,
that 100 ns is considered comparable to 1 spatial pixel. that 100 ns is considered comparable to 1 spatial pixel.
min_samples The number of samples (or total weight) in a neighborhood for a point to be considered as a core point. min_samples The number of samples (or total weight) in a neighborhood for a point to be considered as a core point.
This includes the point itself. This includes the point itself.
n_jobs The number of parallel jobs to run. None means 1 unless in a joblib.parallel_backend context. n_jobs The number of parallel jobs to run. None means 1 unless in a joblib.parallel_backend context.
-1 means using all processors. See Glossary for more details. -1 means using all processors. See Glossary for more details.
Returns Returns
------- -------
""" """
coords = np.column_stack((tpx_data["x"], tpx_data["y"], tpx_data["toa"]*tof_scale)) coords = np.column_stack((tpx_data["x"], tpx_data["y"], tpx_data["toa"]*tof_scale))
dist = DBSCAN(eps=epsilon, min_samples=min_samples, metric="euclidean", n_jobs=n_jobs).fit(coords) dist = DBSCAN(eps=epsilon, min_samples=min_samples, metric="euclidean", n_jobs=n_jobs).fit(coords)
return dist.labels_ + 1 return dist.labels_ + 1
def empty_centroid_data(): def empty_centroid_data():
return { return {
"x": np.array([]), "x": np.array([]),
"y": np.array([]), "y": np.array([]),
"toa": np.array([]), "toa": np.array([]),
"tot": np.array([]), "tot": np.array([]),
"tot_avg": np.array([]), "tot_avg": np.array([]),
"tot_max": np.array([]), "tot_max": np.array([]),
"size": np.array([]), "size": np.array([]),
} }
def get_centroids(tpx_data, timewalk_lut=None): def get_centroids(tpx_data, timewalk_lut=None):
centroid_data = empty_centroid_data() centroid_data = empty_centroid_data()
cluster_labels, cluster_size = np.unique(tpx_data["labels"], return_counts=True) cluster_labels, cluster_size = np.unique(tpx_data["labels"], return_counts=True)
cluster_tot_peaks = np.array(nd.maximum_position(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels)).ravel() cluster_tot_peaks = np.array(nd.maximum_position(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels)).ravel()
cluster_tot_integrals = nd.sum(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) cluster_tot_integrals = nd.sum(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels)
# compute centroid center through weighted average # compute centroid center through weighted average
centroid_data["x"] = np.array(nd.sum(tpx_data["x"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel() centroid_data["x"] = np.array(nd.sum(tpx_data["x"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel()
centroid_data["y"] = np.array(nd.sum(tpx_data["y"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel() centroid_data["y"] = np.array(nd.sum(tpx_data["y"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel()
centroid_data["toa"] = np.array(nd.sum(tpx_data["toa"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel() centroid_data["toa"] = np.array(nd.sum(tpx_data["toa"] * tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels) / cluster_tot_integrals).ravel()
# intensity & size information # intensity & size information
centroid_data["tot_avg"] = np.array(nd.mean(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels)) centroid_data["tot_avg"] = np.array(nd.mean(tpx_data["tot"], labels=tpx_data["labels"], index=cluster_labels))
centroid_data["tot_max"] = tpx_data["tot"][cluster_tot_peaks] centroid_data["tot_max"] = tpx_data["tot"][cluster_tot_peaks]
centroid_data["tot"] = np.array(cluster_tot_integrals) centroid_data["tot"] = np.array(cluster_tot_integrals)
centroid_data["size"] = cluster_size centroid_data["size"] = cluster_size
# train ID information # train ID information
# ~ centroid_data["tid"] = tpx_data["tid"][cluster_tot_peaks] # ~ centroid_data["tid"] = tpx_data["tid"][cluster_tot_peaks]
# correct for timewalk if provided # correct for timewalk if provided
if timewalk_lut is not None: if timewalk_lut is not None:
centroid_data["toa"] -= timewalk_lut[np.int_(centroid_data["tot_max"] // 25) - 1] * 1e3 centroid_data["toa"] -= timewalk_lut[np.int_(centroid_data["tot_max"] // 25) - 1] * 1e3
return centroid_data return centroid_data
def compute_centroids(x, y, tof, tot, def compute_centroids(x, y, tof, tot,
threshold_tot=0, threshold_tot=0,
clustering_epsilon=2, clustering_epsilon=2,
clustering_tof_scale=1e7, clustering_tof_scale=1e7,
clustering_min_samples=3, clustering_min_samples=3,
centroiding_timewalk_lut=None): centroiding_timewalk_lut=None):
# format input data # format input data
_tpx_data = { _tpx_data = {
"x": x.astype(float), "x": x.astype(float),
"y": y.astype(float), "y": y.astype(float),
"toa": tof.astype(float), "toa": tof.astype(float),
"tot": tot.astype(float) "tot": tot.astype(float)
} }
# ensure that valid data is available # ensure that valid data is available
data_validation = check_data(_tpx_data) data_validation = check_data(_tpx_data)
if data_validation < 0: if data_validation < 0:
if data_validation in error_msgs.keys(): if data_validation in error_msgs.keys():
print("Data validation failed with message: %s" % error_msgs[data_validation]) print("Data validation failed with message: %s" % error_msgs[data_validation])
else: else:
print("Data validation failed: unknown reason") print("Data validation failed: unknown reason")
return empty_centroid_data() return empty_centroid_data()
# clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit, # clustering (identify clusters in 2d data (x,y,tof) that belong to a single hit,
# each sample belonging to a cluster is labeled with an integer cluster id no) # each sample belonging to a cluster is labeled with an integer cluster id no)
_tpx_data = pre_clustering_filter(_tpx_data, tot_threshold=threshold_tot) _tpx_data = pre_clustering_filter(_tpx_data, tot_threshold=threshold_tot)
_tpx_data["labels"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples) _tpx_data["labels"] = clustering(_tpx_data, epsilon=clustering_epsilon, tof_scale=clustering_tof_scale, min_samples=clustering_min_samples)
_tpx_data = post_clustering_filter(_tpx_data) _tpx_data = post_clustering_filter(_tpx_data)
# compute centroid data (reduce cluster of samples to a single point with properties) # compute centroid data (reduce cluster of samples to a single point with properties)
if _tpx_data["labels"] is None or _tpx_data["labels"].size == 0: if _tpx_data["labels"] is None or _tpx_data["labels"].size == 0:
# handle case of no identified clusters, return empty dictionary with expected keys # handle case of no identified clusters, return empty dictionary with expected keys
return empty_centroid_data() return empty_centroid_data()
_centroids = get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut) _centroids = get_centroids(_tpx_data, timewalk_lut=centroiding_timewalk_lut)
return _centroids return _centroids
def process_train(worker_id, index, train_id, data): def process_train(worker_id, index, train_id, data):
events = data[in_fast_data] events = data[in_fast_data]
sel = np.s_[:events['data.size']] sel = np.s_[:events['data.size']]
x = events['data.x'][sel] x = events['data.x'][sel]
y = events['data.y'][sel] y = events['data.y'][sel]
tot = events['data.tot'][sel] tot = events['data.tot'][sel]
toa = events['data.toa'][sel] toa = events['data.toa'][sel]
if raw_timewalk_lut is not None: if raw_timewalk_lut is not None:
toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3 toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3
centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs) centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)
num_centroids = len(centroids['x']) num_centroids = len(centroids['x'])
fraction_centroids = np.sum(centroids["size"])/events['data.size'] if events['data.size']>0 else np.nan fraction_centroids = np.sum(centroids["size"])/events['data.size'] if events['data.size']>0 else np.nan
missing_centroids = num_centroids > max_num_centroids missing_centroids = num_centroids > max_num_centroids
if num_centroids > max_num_centroids: if num_centroids > max_num_centroids:
warn('Number of centroids is larger than the defined maximum, some data cannot be written to disk') warn('Number of centroids is larger than the defined maximum, some data cannot be written to disk')
for key in centroid_dt.names: for key in centroid_dt.names:
out_data[index, :num_centroids][key] = centroids[key] out_data[index, :num_centroids][key] = centroids[key]
out_stats[index]["fraction_px_in_centroids"] = fraction_centroids out_stats[index]["fraction_px_in_centroids"] = fraction_centroids
out_stats[index]["N_centroids"] = num_centroids out_stats[index]["N_centroids"] = num_centroids
out_stats[index]["missing_centroids"] = missing_centroids out_stats[index]["missing_centroids"] = missing_centroids
``` ```
%% Cell type:code id:56306c15-513e-4e7f-9c47-c52ca61b27a8 tags: %% Cell type:code id:56306c15-513e-4e7f-9c47-c52ca61b27a8 tags:
``` python ``` python
dc = RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True) dc = RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True)
base_path=find_proposal("p%06i" % int(dc.run_metadata()['proposalNumber'])) base_path=find_proposal("p%06i" % int(dc.run_metadata()['proposalNumber']))
if raw_timewalk_lut_filepath: if raw_timewalk_lut_filepath:
raw_timewalk_lut_filepath_full = (Path(base_path) / Path(raw_timewalk_lut_filepath)).resolve() raw_timewalk_lut_filepath_full = (Path(base_path) / Path(raw_timewalk_lut_filepath)).resolve()
raw_timewalk_lut = np.load(raw_timewalk_lut_filepath_full) raw_timewalk_lut = np.load(raw_timewalk_lut_filepath_full)
else: else:
raw_timewalk_lut = None raw_timewalk_lut = None
if centroiding_timewalk_lut_filepath: if centroiding_timewalk_lut_filepath:
centroiding_timewalk_lut_filepath_full = (Path(base_path) / Path(centroiding_timewalk_lut_filepath)).resolve() centroiding_timewalk_lut_filepath_full = (Path(base_path) / Path(centroiding_timewalk_lut_filepath)).resolve()
centroiding_timewalk_lut = np.load(centroiding_timewalk_lut_filepath_full) centroiding_timewalk_lut = np.load(centroiding_timewalk_lut_filepath_full)
else: else:
centroiding_timewalk_lut = None centroiding_timewalk_lut = None
``` ```
%% Cell type:code id:8c28a4c8-961c-496b-80da-7fd867e5b0d3 tags: %% Cell type:code id:8c28a4c8-961c-496b-80da-7fd867e5b0d3 tags:
``` python ``` python
in_fast_data = in_fast_data.format(karabo_id=karabo_id) in_fast_data = in_fast_data.format(karabo_id=karabo_id)
out_device_id = out_device_id.format(karabo_id=karabo_id) out_device_id = out_device_id.format(karabo_id=karabo_id)
out_fast_data = out_fast_data.format(karabo_id=karabo_id) out_fast_data = out_fast_data.format(karabo_id=karabo_id)
Path(out_folder).mkdir(exist_ok=True, parents=True) Path(out_folder).mkdir(exist_ok=True, parents=True)
in_dc = dc.select(in_fast_data, require_all=True) in_dc = dc.select(in_fast_data, require_all=True)
dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')} dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}
centroid_dt = np.dtype([('x', np.float64), centroid_dt = np.dtype([('x', np.float64),
('y', np.float64), ('y', np.float64),
('toa', np.float64), ('toa', np.float64),
('tot', np.float64), ('tot', np.float64),
('tot_avg', np.float64), ('tot_avg', np.float64),
('tot_max', np.uint16), ('tot_max', np.uint16),
('size', np.int16)]) ('size', np.int16)])
centroid_settings_template = { centroid_settings_template = {
'timewalk_correction.raw_applied': (np.bool, bool(raw_timewalk_lut_filepath)), 'timewalk_correction.raw_applied': (np.bool, bool(raw_timewalk_lut_filepath)),
'timewalk_correction.raw_file': ("S100", str(raw_timewalk_lut_filepath)[-100:]), 'timewalk_correction.raw_file': ("S100", str(raw_timewalk_lut_filepath)[-100:]),
'timewalk_correction.centroiding_applied': (np.bool, bool(centroiding_timewalk_lut_filepath)), 'timewalk_correction.centroiding_applied': (np.bool, bool(centroiding_timewalk_lut_filepath)),
'timewalk_correction.centroiding_file': ("S100", str(centroiding_timewalk_lut_filepath)[-100:]), 'timewalk_correction.centroiding_file': ("S100", str(centroiding_timewalk_lut_filepath)[-100:]),
'clustering.epsilon': (np.float64, float(clustering_epsilon)), 'clustering.epsilon': (np.float64, float(clustering_epsilon)),
'clustering.tof_scale': (np.float64, float(clustering_tof_scale)), 'clustering.tof_scale': (np.float64, float(clustering_tof_scale)),
'clustering.min_samples': (np.int16, int(clustering_min_samples)), 'clustering.min_samples': (np.int16, int(clustering_min_samples)),
'threshold_tot': (np.int16, int(threshold_tot)), 'threshold_tot': (np.int16, int(threshold_tot)),
} }
centroid_stats_template = { centroid_stats_template = {
'N_centroids': (np.int, -1), 'N_centroids': (np.int, -1),
'missing_centroids': (np.bool, False), 'missing_centroids': (np.bool, False),
'fraction_px_in_centroids': (np.float64, np.nan), 'fraction_px_in_centroids': (np.float64, np.nan),
} }
centroid_settings_dt = np.dtype([(key, centroid_settings_template[key][0]) for key in centroid_settings_template]) centroid_settings_dt = np.dtype([(key, centroid_settings_template[key][0]) for key in centroid_settings_template])
centroid_stats_dt = np.dtype([(key, centroid_stats_template[key][0]) for key in centroid_stats_template]) centroid_stats_dt = np.dtype([(key, centroid_stats_template[key][0]) for key in centroid_stats_template])
centroiding_kwargs = dict( centroiding_kwargs = dict(
threshold_tot=threshold_tot, threshold_tot=threshold_tot,
clustering_epsilon=clustering_epsilon, clustering_epsilon=clustering_epsilon,
clustering_tof_scale=clustering_tof_scale, clustering_tof_scale=clustering_tof_scale,
clustering_min_samples=clustering_min_samples, clustering_min_samples=clustering_min_samples,
centroiding_timewalk_lut=centroiding_timewalk_lut) centroiding_timewalk_lut=centroiding_timewalk_lut)
psh.set_default_context('processes', num_workers=(num_workers := cpu_count() // 4)) psh.set_default_context('processes', num_workers=(num_workers := cpu_count() // 4))
print(f'Computing centroids with {num_workers} workers and writing to file', flush=True, end='') print(f'Computing centroids with {num_workers} workers and writing to file', flush=True, end='')
start = monotonic() start = monotonic()
for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)): for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):
train_ids = seq_dc.train_ids train_ids = seq_dc.train_ids
m_data_sources = [] m_data_sources = []
with DataFile.from_details(out_folder, out_aggregator, run, seq_id) as seq_file: with DataFile.from_details(out_folder, out_aggregator, run, seq_id) as seq_file:
# No support needed for old EXDF files. # No support needed for old EXDF files.
seq_file.create_metadata(like=in_dc, sequence=seq_id, seq_file.create_metadata(like=in_dc, sequence=seq_id,
control_sources=[out_device_id], control_sources=[out_device_id],
instrument_channels=[f'{out_fast_data}/data']) instrument_channels=[f'{out_fast_data}/data'])
seq_file.create_index(train_ids) seq_file.create_index(train_ids)
out_data = psh.alloc(shape=(len(train_ids), max_num_centroids), dtype=centroid_dt) out_data = psh.alloc(shape=(len(train_ids), max_num_centroids), dtype=centroid_dt)
out_stats = psh.alloc(shape=(len(train_ids),), dtype=centroid_stats_dt) out_stats = psh.alloc(shape=(len(train_ids),), dtype=centroid_stats_dt)
out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1) out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)
out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template]) out_stats[:] = tuple([centroid_stats_template[key][1] for key in centroid_stats_template])
psh.map(process_train, seq_dc) psh.map(process_train, seq_dc)
# Create sources. # Create sources.
cur_slow_data = seq_file.create_control_source(out_device_id) cur_slow_data = seq_file.create_control_source(out_device_id)
cur_fast_data = seq_file.create_instrument_source(out_fast_data) cur_fast_data = seq_file.create_instrument_source(out_fast_data)
# Add source indices. # Add source indices.
cur_slow_data.create_index(len(train_ids)) cur_slow_data.create_index(len(train_ids))
cur_fast_data.create_index(data=np.ones_like(train_ids)) cur_fast_data.create_index(data=np.ones_like(train_ids))
for key, (type_, data) in centroid_settings_template.items(): for key, (type_, data) in centroid_settings_template.items():
cur_slow_data.create_run_key(f'settings.{key}', data) cur_slow_data.create_run_key(f'settings.{key}', data)
cur_fast_data.create_key('data.centroids', out_data, cur_fast_data.create_key('data.centroids', out_data,
chunks=tuple(chunks_centroids), chunks=tuple(chunks_centroids),
**dataset_kwargs) **dataset_kwargs)
cur_fast_data.create_key('data.stats', out_stats) cur_fast_data.create_key('data.stats', out_stats)
print('.', flush=True, end='') print('.', flush=True, end='')
end = monotonic() end = monotonic()
print('') print('')
print(f'{end-start:.01f}s') print(f'{end-start:.01f}s')
``` ```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment