From 64eae13754d4e300532215d325c12d055ee1ca51 Mon Sep 17 00:00:00 2001
From: Philipp Schmidt <philipp.schmidt@xfel.eu>
Date: Tue, 22 Mar 2022 09:02:15 +0100
Subject: [PATCH] Add initial draft for TIMEPIX correct action

---
 .../Compute_Timepix_Event_Centroids.ipynb     | 339 ++++++++----------
 src/xfel_calibrate/notebooks.py               |  11 +
 2 files changed, 160 insertions(+), 190 deletions(-)

diff --git a/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb b/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb
index cce6da856..735b1110c 100755
--- a/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb
+++ b/notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb
@@ -1,11 +1,37 @@
 {
  "cells": [
   {
-   "cell_type": "markdown",
-   "id": "4d04f210-fc4a-4b1d-9dfe-9c85b94b884f",
+   "cell_type": "code",
+   "execution_count": null,
+   "id": "9484ee10",
    "metadata": {},
+   "outputs": [],
    "source": [
-    "## Imports"
+    "# Data selection parameters.\n",
+    "run = 420\n",
+    "in_folder = '/gpfs/exfel/exp/SQS/202230/p900256/raw'\n",
+    "out_folder = '/gpfs/exfel/exp/SQS/202230/p900256/scratch/cal_test'\n",
+    "proposal = ''  # Proposal, leave empty for auto detection based on in_folder\n",
+    "\n",
+    "karabo_id = 'SQS_AQS_CAM'\n",
+    "in_fast_data = '{karabo_id}/CAM/TIMEPIX3:daqEventOutput'\n",
+    "out_fast_data = '{karabo_id}/CAL/TIMEPIX3:output'\n",
+    "out_filename = 'CORR-R{run:04d}-TPX01-S{sequence:05d}.h5'\n",
+    "out_seq_len = 2000\n",
+    "\n",
+    "max_num_centroids = 10000  # Maxinum number of centroids per train\n",
+    "chunks_centroids = [1, 5000]  # Chunking of centroid data\n",
+    "dataset_compression = 'gzip'  # HDF compression method.\n",
+    "dataset_compression_opts = 3  # HDF GZIP compression level.\n",
+    "\n",
+    "clustering_epsilon = 2  # ?\n",
+    "clustering_tof_scale = 1e7  # ?\n",
+    "clustering_min_samples = 3  # ?\n",
+    "clustering_n_jobs = 1  # ?\n",
+    "threshold_tot = 0  # ?\n",
+    "\n",
+    "raw_timewalk_lut_filepath = ''  # fpath to look up table for timewalk correction or empty string.\n",
+    "centroiding_timewalk_lut_filepath = ''  # fpath to look up table for timewalk correction or empty string."
    ]
   },
   {
@@ -15,30 +41,19 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "from time import time\n",
+    "from datetime import datetime\n",
+    "from pathlib import Path\n",
+    "from time import monotonic\n",
+    "from warnings import warn\n",
     "\n",
     "import numpy as np\n",
+    "import scipy.ndimage as nd\n",
+    "import h5py\n",
     "\n",
-    "from extra_data import open_run, RunDirectory"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "id": "392aa413-0035-4861-9f1f-03abb51d0613",
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "%matplotlib notebook\n",
-    "p900256"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "id": "3911b4e1-11af-4a51-be59-103ca9f6f261",
-   "metadata": {},
-   "source": [
-    "## Centroiding functions"
+    "from sklearn.cluster import DBSCAN\n",
+    "from extra_data import RunDirectory\n",
+    "\n",
+    "%matplotlib inline"
    ]
   },
   {
@@ -49,18 +64,11 @@
    "outputs": [],
    "source": [
     "# centroiding\n",
-    "\n",
-    "import numpy as np\n",
-    "import scipy.ndimage as nd\n",
-    "from sklearn.cluster import DBSCAN\n",
-    "from warnings import warn\n",
-    "import traceback as tb\n",
-    "\n",
     "error_msgs = {\n",
-    "            -1: \"tpx_data has an invalid structure - ignore provided data\",\n",
-    "            -2: \"tpx_data arrays are of invalid lengths - ignore provided data\",\n",
-    "            -3: \"tpx_data arrays are empty\"\n",
-    "        }\n",
+    "    -1: \"tpx_data has an invalid structure - ignore provided data\",\n",
+    "    -2: \"tpx_data arrays are of invalid lengths - ignore provided data\",\n",
+    "    -3: \"tpx_data arrays are empty\"\n",
+    "}\n",
     "\n",
     "def check_data(tpx_data):\n",
     "    required_keys = [\"x\", \"y\", \"tof\", \"tot\"]\n",
@@ -109,7 +117,7 @@
     "        raise e\n",
     "    return tpx_data\n",
     "\n",
-    "def pre_clustering_filter(tpx_data, tot_threshold=None):\n",
+    "def pre_clustering_filter(tpx_data, tot_threshold=0):\n",
     "    \"\"\"\n",
     "    Collection of filters directly applied before clustering.\n",
     "    Note: at no point a copy of the dictionary is made, as they are mutable, the input array is changed in memory!\n",
@@ -123,7 +131,7 @@
     "    -------\n",
     "    tpx_data: like input tpx_data but with applied filters\n",
     "    \"\"\"\n",
-    "    if tot_threshold is not None:\n",
+    "    if tot_threshold > 0:\n",
     "        tpx_data = apply_single_filter(tpx_data, tpx_data[\"tot\"] >= tot_threshold)\n",
     "\n",
     "    return tpx_data\n",
@@ -202,7 +210,7 @@
     "    return centroid_data\n",
     "\n",
     "def compute_centroids(x, y, tof, tot,\n",
-    "                      threshold_tot=None,\n",
+    "                      threshold_tot=0,\n",
     "                      clustering_epsilon=2,\n",
     "                      clustering_tof_scale=1e7,\n",
     "                      clustering_min_samples=3,\n",
@@ -238,130 +246,6 @@
     "    return _centroids"
    ]
   },
-  {
-   "cell_type": "markdown",
-   "id": "4f54e558-d862-48af-86b9-7d9e3d53fd3d",
-   "metadata": {},
-   "source": [
-    "## Output Function (for illustration) - instead data should be written to output file train-id wise"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "id": "83acf826-e351-4a39-a870-6c1e0298e711",
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "def output(train_id, centroids, fixed_len=None):\n",
-    "    \"\"\"\n",
-    "    In this version the resulting vectors within a run vary in length,\n",
-    "    if fixed len is provided fixed length vectors are generated\n",
-    "    \"\"\"\n",
-    "    if fixed_len is None:\n",
-    "        out = {\n",
-    "            \"%s[data.trainId]\"%event_output_key : train_id,\n",
-    "            \"%s[data.x]\"%event_output_key : centroids[\"x\"],\n",
-    "            \"%s[data.y]\"%event_output_key : centroids[\"y\"],\n",
-    "            \"%s[data.toa]\"%event_output_key : centroids[\"tof\"],\n",
-    "            \"%s[data.tot_avg]\"%event_output_key : centroids[\"tot_avg\"],\n",
-    "            \"%s[data.tot_max]\"%event_output_key : centroids[\"tot_max\"],\n",
-    "            \"%s[data.size]\"%event_output_key : centroids[\"size\"],\n",
-    "        }\n",
-    "    else:\n",
-    "        if len(centroids[\"x\"])>fixed_len:\n",
-    "            sel = np.s_[:fixed_len]\n",
-    "            out = {\n",
-    "                \"%s[data.trainId]\"%event_output_key : train_id,\n",
-    "                \"%s[data.x]\"%event_output_key : centroids[\"x\"][sel],\n",
-    "                \"%s[data.y]\"%event_output_key : centroids[\"y\"][sel],\n",
-    "                \"%s[data.toa]\"%event_output_key : centroids[\"tof\"][sel],\n",
-    "                \"%s[data.tot_avg]\"%event_output_key : centroids[\"tot_avg\"][sel],\n",
-    "                \"%s[data.tot_max]\"%event_output_key : centroids[\"tot_max\"][sel],\n",
-    "                \"%s[data.centroid_size]\"%event_output_key : centroids[\"size\"][sel],\n",
-    "                \"%s[data.size]\"%event_output_key : len(centroids[\"x\"]),\n",
-    "            }\n",
-    "        else:\n",
-    "            pad = fixed_len - len(centroids[\"x\"])\n",
-    "            out = {\n",
-    "                \"%s[data.trainId]\"%event_output_key : train_id, # int 0d\n",
-    "                \"%s[data.x]\"%event_output_key : np.pad(centroids[\"x\"], (0,pad), constant_values=1),  # float 1d\n",
-    "                \"%s[data.y]\"%event_output_key : np.pad(centroids[\"y\"], (0,pad), constant_values=1),  # float 1d\n",
-    "                \"%s[data.toa]\"%event_output_key : np.pad(centroids[\"tof\"], (0,pad), constant_values=1),  # float 1d\n",
-    "                \"%s[data.tot_avg]\"%event_output_key : np.pad(centroids[\"tot_avg\"], (0,pad), constant_values=1),  # float 1d\n",
-    "                \"%s[data.tot_max]\"%event_output_key : np.pad(centroids[\"tot_max\"], (0,pad), constant_values=1),  # float 1d\n",
-    "                \"%s[data.centroid_size]\"%event_output_key : np.pad(centroids[\"size\"], (0,pad), constant_values=1),  # int 1d\n",
-    "                \"%s[data.size]\"%event_output_key : len(centroids[\"x\"]),  # int 0d\n",
-    "            }\n",
-    "#     print(out)"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "id": "a39ce0e4-31b3-40c7-9c80-72b89d037714",
-   "metadata": {},
-   "source": [
-    "## Settings"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "id": "8318bc3f-01be-4144-a4de-ebd736563396",
-   "metadata": {},
-   "source": [
-    "### Data selection (for dev)"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "id": "693c72cd-9c67-46d6-a0bc-438126e62642",
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "# Data selection parameters.\n",
-    "run_id = 420\n",
-    "proposal = 900256\n"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "id": "13956e56-d818-42e4-aa73-7f9c4a897732",
-   "metadata": {},
-   "source": [
-    "### Parameters to be exposed with calibration pipeline"
-   ]
-  },
-  {
-   "cell_type": "code",
-   "execution_count": null,
-   "id": "a9adf469-32de-43b4-91fa-2508a952d0dc",
-   "metadata": {},
-   "outputs": [],
-   "source": [
-    "# Centroiding and output parameters that should be easily adjustable\n",
-    "max_n_centroids = 10000\n",
-    "\n",
-    "clustering_epsilon = 2\n",
-    "clustering_tof_scale = 1e7\n",
-    "clustering_min_samples = 3\n",
-    "clustering_n_jobs = 1\n",
-    "threshold_tot = None\n",
-    "raw_timewalk_lut_filepath = None  # fpath to look up table for timewalk correction or None\n",
-    "centroiding_timewalk_lut_filepath = None  # fpath to look up table for timewalk correction or None\n",
-    "\n",
-    "# raw_timewalk_lut_filepath = \"timewalk_raw.npy\"  # fpath to look up table for timewalk correction or None\n",
-    "# centroiding_timewalk_lut_filepath = \"timewalk_cent.npy\"  # fpath to look up table for timewalk correction or None"
-   ]
-  },
-  {
-   "cell_type": "markdown",
-   "id": "c6dff850-18c8-4d5a-9362-e581cce18d8e",
-   "metadata": {},
-   "source": [
-    "## Centroiding"
-   ]
-  },
   {
    "cell_type": "code",
    "execution_count": null,
@@ -369,15 +253,14 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "run = open_run(run=run_id, proposal=proposal)\n",
-    "event_output_key = 'SQS_AQS_CAM/CAM/TIMEPIX3:daqEventOutput'\n",
+    "dc = RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True)\n",
     "\n",
-    "if raw_timewalk_lut_filepath is not None:\n",
+    "if raw_timewalk_lut_filepath:\n",
     "    raw_timewalk_lut = np.load(raw_timewalk_lut_filepath)\n",
     "else:\n",
     "    raw_timewalk_lut = None\n",
     "\n",
-    "if centroiding_timewalk_lut_filepath is not None:\n",
+    "if centroiding_timewalk_lut_filepath:\n",
     "    centroiding_timewalk_lut = np.load(centroiding_timewalk_lut_filepath)\n",
     "else:\n",
     "    centroiding_timewalk_lut = None"
@@ -390,33 +273,109 @@
    "metadata": {},
    "outputs": [],
    "source": [
-    "t00 = time()\n",
-    "for train_id, data in run.select(event_output_key, \"data.*\").trains():\n",
-    "    t0 = time()\n",
-    "    sel = np.s_[:data[event_output_key][\"data.size\"]]\n",
+    "in_fast_data = in_fast_data.format(karabo_id=karabo_id)\n",
+    "out_fast_data = out_fast_data.format(karabo_id=karabo_id)\n",
+    "\n",
+    "in_dc = dc.select(in_fast_data, require_all=True)\n",
+    "\n",
+    "out_path = (Path(out_folder) / Path(out_filename)).resolve()\n",
+    "dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}\n",
+    "\n",
+    "centroid_dt = np.dtype([('x', np.float64),\n",
+    "                        ('y', np.float64),\n",
+    "                        ('tof', np.float64),\n",
+    "                        ('tot', np.float64),\n",
+    "                        ('tot_avg', np.float64),\n",
+    "                        ('tot_max', np.uint16),\n",
+    "                        ('size', np.int16)])\n",
+    "\n",
+    "centroiding_kwargs = dict(\n",
+    "    threshold_tot=threshold_tot,\n",
+    "    clustering_epsilon=clustering_epsilon,\n",
+    "    clustering_tof_scale=clustering_tof_scale,\n",
+    "    clustering_min_samples=clustering_min_samples,\n",
+    "    clustering_n_jobs=clustering_n_jobs,\n",
+    "    centroiding_timewalk_lut=centroiding_timewalk_lut)\n",
+    "\n",
+    "metadata = dc.run_metadata()\n",
+    "daq_library_bytes = metadata.get('daqLibrary', '0.0').encode('ascii')\n",
+    "karabo_framework_bytes = metadata.get('karaboFramework', '0.0').encode('ascii')\n",
+    "proposal_number = int(proposal) if proposal else metadata.get('proposalNumber', -1)\n",
+    "\n",
+    "print('Computing centroids and writing to file', flush=True, end='')\n",
+    "start = monotonic()\n",
+    "\n",
+    "for seq_id, seq_dc in enumerate(in_dc.split_trains(trains_per_part=out_seq_len)):\n",
+    "    train_ids = seq_dc.train_ids\n",
+    "    m_data_sources = []\n",
     "    \n",
-    "    tot = np.array(data[event_output_key][\"data.tot\"][sel])\n",
-    "    toa = np.array(data[event_output_key][\"data.toa\"][sel])\n",
+    "    with h5py.File(str(out_path).format(run=run, sequence=seq_id), 'w') as seq_file:\n",
+    "        seq_file.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)\n",
+    "        seq_file.create_dataset('INDEX/timestamp', data=np.zeros_like(train_ids), dtype=np.uint64)\n",
+    "        seq_file.create_dataset('INDEX/flag', data=np.ones_like(train_ids), dtype=np.int32)\n",
+    "        \n",
+    "        seq_file.create_dataset(f'INDEX/{out_fast_data}/data/count', data=np.ones_like(train_ids), dtype=np.uint64)\n",
+    "        seq_file.create_dataset(f'INDEX/{out_fast_data}/data/first', data=np.arange(len(train_ids)), dtype=np.uint64)\n",
+    "                                                                                                    \n",
+    "        out_data = np.empty((len(train_ids), max_num_centroids), dtype=centroid_dt)\n",
+    "        out_data[:] = (np.nan, np.nan, np.nan, np.nan, np.nan, 0, -1)\n",
+    "        \n",
+    "        for index, (train_id, data) in enumerate(seq_dc.trains()):\n",
+    "            events = data[in_fast_data]\n",
+    "\n",
+    "            sel = np.s_[:events['data.size']]\n",
+    "\n",
+    "            x = events['data.x'][sel]\n",
+    "            y = events['data.y'][sel]\n",
+    "            tot = events['data.tot'][sel]\n",
+    "            toa = events['data.toa'][sel]\n",
+    "\n",
+    "            if raw_timewalk_lut is not None:\n",
+    "                toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3\n",
+    "\n",
+    "            centroids = compute_centroids(x, y, toa, tot, **centroiding_kwargs)\n",
+    "\n",
+    "            num_centroids = len(centroids['x'])\n",
+    "            \n",
+    "            if num_centroids > max_num_centroids:\n",
+    "                warn('number of centroids larger than definde maximum, some data cannot be written to disk')\n",
+    "            \n",
+    "            for key in centroid_dt.names:\n",
+    "                out_data[key][index, :num_centroids] = centroids[key]\n",
+    "                \n",
+    "        centroid_data = seq_file.create_dataset(\n",
+    "            f'INSTRUMENT/{out_fast_data}/data/centroids',\n",
+    "            data=out_data, chunks=tuple(chunks_centroids),\n",
+    "            **dataset_kwargs)\n",
+    "        m_data_sources.append(('INSTRUMENT', f'{out_fast_data}/data'))\n",
+    "        \n",
+    "        now_str = datetime.now().strftime('%Y%m%dT%H%M%SZ').encode('ascii')\n",
     "    \n",
-    "    if raw_timewalk_lut is not None:\n",
-    "        toa -= raw_timewalk_lut[np.int_(tot // 25) - 1] * 1e3\n",
+    "        m_group = seq_file.require_group('METADATA')\n",
+    "        m_group.create_dataset('creationDate', shape=(1,), data=now_str)\n",
+    "        m_group.create_dataset('daqLibrary', shape=(1,), data=daq_library_bytes)\n",
+    "        m_group.create_dataset('dataFormatVersion', shape=(1,), data=b'1.0')\n",
+    "        \n",
+    "        m_data_sources.sort(key=lambda x: f'{x[0]}/{x[1]}')\n",
+    "        m_group.create_dataset('dataSources/dataSourceId', shape=(len(m_data_sources),),\n",
+    "                           data=[f'{x[0]}/{x[1]}'.encode('ascii') for x in m_data_sources])\n",
+    "        m_group.create_dataset('dataSources/deviceId', shape=(len(m_data_sources),),\n",
+    "                           data=[x[1].encode('ascii') for x in m_data_sources])\n",
+    "        m_group.create_dataset('dataSources/root', shape=(len(m_data_sources),),\n",
+    "                           data=[x[0].encode('ascii') for x in m_data_sources])\n",
+    "\n",
+    "        m_group.create_dataset('karaboFramework', shape=(1,), data=karabo_framework_bytes)\n",
+    "        m_group.create_dataset('proposalNumber', shape=(1,), dtype=np.uint32, data=proposal_number)\n",
+    "        m_group.create_dataset('runNumber', shape=(1,), dtype=np.uint32, data=run)\n",
+    "        m_group.create_dataset('sequenceNumber', shape=(1,), dtype=np.uint32, data=seq_id)\n",
+    "        m_group.create_dataset('updateDate', shape=(1,), data=now_str)\n",
+    "        \n",
+    "    print('.', flush=True, end='')\n",
     "    \n",
-    "    centroids = compute_centroids(data[event_output_key][\"data.x\"][sel], \n",
-    "                                  data[event_output_key][\"data.y\"][sel], \n",
-    "                                  toa, \n",
-    "                                  tot,\n",
-    "                                  threshold_tot=threshold_tot,\n",
-    "                                  clustering_epsilon=clustering_epsilon,\n",
-    "                                  clustering_tof_scale=clustering_tof_scale,\n",
-    "                                  clustering_min_samples=clustering_min_samples,\n",
-    "                                  clustering_n_jobs=clustering_n_jobs,\n",
-    "                                  centroiding_timewalk_lut=centroiding_timewalk_lut)\n",
-    "    \n",
-    "#     print(time()-t0)\n",
-    "    output(train_id, centroids, fixed_len=max_n_centroids)\n",
-    "#     if time()-t00 > 0:\n",
-    "#         break\n",
-    "print(time()-t00)"
+    "end = monotonic()\n",
+    "print('')\n",
+    "\n",
+    "print(f'{end-start:.01f}s')"
    ]
   }
  ],
diff --git a/src/xfel_calibrate/notebooks.py b/src/xfel_calibrate/notebooks.py
index 028091d35..a35bdaacf 100644
--- a/src/xfel_calibrate/notebooks.py
+++ b/src/xfel_calibrate/notebooks.py
@@ -290,6 +290,17 @@ notebooks = {
             },
         },
     },
+    "TIMEPIX": {
+        "CORRECT": {
+            "notebook": "notebooks/Timepix/Compute_Timepix_Event_Centroids.ipynb",
+            "concurrency": {
+                "parameter": None,
+                "use function": None,
+                "default concurrency": None,
+                "cluster cores": 1
+            },
+        },
+    },
     "TEST": {
         "TEST-CLI": {
             "notebook": "notebooks/test/test-cli.ipynb",
-- 
GitLab