{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Data selection parameters.\n", "run = 104 # Run ID.\n", "in_folder = '/gpfs/exfel/exp/SQS/202101/p002535/raw' # Partial input path appended with run ID.\n", "out_folder = '/gpfs/exfel/exp/SQS/202101/p002535/scratch/cal_test' # Full path to output folder.\n", "\n", "calib_config_path = '/gpfs/exfel/exp/SQS/202101/p002535/usr/config_board2+4.yaml' # Path to correction and transform configuration\n", "\n", "# These parameters are required by xfel-calibrate but ignored in this notebook.\n", "cycle = '' # Proposal cycle, currently not used.\n", "cal_db_timeout = 0 # Calibration DB timeout, currently not used.\n", "cal_db_interface = 'foo' # Calibration DB interface, currently not used.\n", "karabo_da = 'bar' # Karabo data aggregator name, currently not used\n", "\n", "# Output parameters.\n", "karabo_id = 'SQS_REMI_DLD6' # Karabo device ID root for virtual output device.\n", "proposal = '' # Proposal, leave empty for auto detection based on in_folder\n", "out_filename = 'CORR-R{run:04d}-REMI01-S{sequence:05d}.h5' # Pattern for output filenames.\n", "out_seq_len = 5000 # Number of trains per sequence file in output.\n", "det_device_id = '{karabo_id}/DET/{det_name}' # Karabo device ID for virtual output device.\n", "det_output_key = 'output' # Pipeline name for fast data output.\n", "save_raw_triggers = True # Whether to save trigger position in files.\n", "save_raw_edges = True # Whether to save digitized edge positions in files.\n", "save_rec_signals = True # Whether to save reconstructed signals (u1-w2, mcp) in files.\n", "save_rec_hits = True # Whether to save reoncstructed hits (x,y,t,m) in files.\n", "chunks_triggers = [500] # HDF chunk size for triggers.\n", "chunks_edges = [500, 7, 50] # HDF chunk size for edges.\n", "chunks_hits = [50, 50] # HDF chunk size for hits.\n", "chunks_signals = [50, 50] # HDF chunk size for signals.\n", "dataset_compression = 'gzip' # HDF compression method.\n", "dataset_compression_opts = 3 # HDF GZIP compression level.\n", "extra_sources = [''] # No longer used.\n", "\n", "# Parallelization parameters.\n", "mp_pulse_info = 8 # Parallelization for pulse statistics.\n", "mp_find_triggers = 0.5 # Parallelization for finding triggers.\n", "mp_find_edges = 0.5 # Parallelization for digitizing analog signal.\n", "mt_avg_trace = 2 # Parallelization for trace averaging.\n", "mp_rec_hits = 1.0 # Parallelization for hit reconstruction." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import matplotlib.pyplot as plt\n", "from matplotlib.colors import LogNorm\n", "from threadpoolctl import threadpool_limits\n", "\n", "import re\n", "import h5py\n", "from pathlib import Path\n", "from datetime import datetime\n", "\n", "import pasha as psh\n", "from extra_data import RunDirectory\n", "from extra_remi import Analysis, trigger_dt\n", "from extra_remi.util import timing\n", "from extra_remi.plots import plot_detector_diagnostics\n", "from extra_remi.rd_resort import signal_dt, hit_dt\n", "\n", "%matplotlib inline" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "calib_config_path = Path(calib_config_path)\n", "\n", "if not calib_config_path.is_file():\n", " # If the path cannot be resolved right now, try the same path relative to in_folder.\n", " calib_config_path = Path(in_folder) / calib_config_path\n", " \n", " if not calib_config_path.is_file():\n", " # Disallow implicit config file creation.\n", " raise ValueError('calib_config_path not found - neither absolute nor relative to in_folder')\n", "\n", "remi = Analysis(calib_config_path)\n", "\n", "with timing('open_run'):\n", " dc = remi.prepare_dc(RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Transformation parameters" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def print_leaf(leaf, indent=0):\n", " for key, value in leaf.items():\n", " if isinstance(value, dict):\n", " print(indent * 4 * ' ' + key)\n", " print_leaf(value, indent=indent+1)\n", " else:\n", " print(indent * 4 * ' ' + f'{key}: {value}')\n", " \n", "print_leaf(remi.tree)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Pulse and trigger information" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Count pulses per train and compute offsets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Based on the pulse pattern tables, three global variables are obtained:\n", "#\n", "# * `pulse_counts [int32: len(dc.train_ids)]` containing the number of pulses per train.\n", "# * `pulse_offsets [int32: len(dc.train_ids)]` containing the global offset for the first pulse of each train.\n", "# * `num_pulses = pulse_counts.sum(axis=0)`\n", "\n", "with timing('pulse_info'):\n", " pulse_counts, pulse_offsets, num_pulses = remi.get_pulse_info(dc, mp_pulse_info)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "fig, ax = plt.subplots(num=1, ncols=1, nrows=1, figsize=(9, 4), clear=True)\n", "\n", "ax.set_title('Pulse count')\n", "ax.plot(dc.train_ids, pulse_counts, lw=1)\n", "ax.set_xlabel('Train ID')\n", "ax.set_ylabel('Number of pulses')\n", "ax.set_ylim(0, max(300, pulse_counts.max() + 10))\n", "ax.ticklabel_format(style='plain')\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Find triggers\n", "\n", "The trigger defines the boundary of a pulse on the digitizer trace, which is stored per train." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# A trigger defines the boundary of a pulse on the digitizer trace stored per train. This cell creates a\n", "# global variable:\n", "# * `triggers [(start: int32, stop: int32, offset: float64): num_pulses]` containing the triggers for each\n", "# pulse.\n", "# \n", "# Triggers may be obtained through two different methods:\n", "# \n", "# * `ppt` uses the pulse puttern table to locate the pulse positions on the trace. Only number of pulses and\n", "# their distance can be drawn this way, leaving the absolute offset for the very first pulse to be\n", "# configured via `trigger/ppt/first_pulse_offset`.\n", "# \n", "# * `edge` uses the digitizer channel `trigger/edge/channel` and builds triggers around the edges found on it.\n", "# The boundaries relative to this edge may be configured with the `group_start`, `group_end` and `dead_time`\n", "# parameters.\n", "\n", "psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_triggers))\n", "\n", "triggers = psh.alloc(shape=(num_pulses,), dtype=trigger_dt, fill=(-1, -1, np.nan))\n", "\n", "if remi['trigger']['method'] == 'ppt':\n", " from euxfel_bunch_pattern import indices_at_sase\n", " \n", " pptc = remi['trigger']['ppt']\n", " \n", " keydata = dc[remi.get_ppt_sourcekey()]\n", " sase = remi['instrument']['timeserver']['sase']\n", " first_pulse_offset = pptc['first_pulse_offset']\n", " single_pulse_length = pptc['single_pulse_length']\n", " clock_factor = remi['digitizer']['clock_factor']\n", " \n", " def trigger_by_ppt(worker_id, index, train_id, ppt):\n", " abs_pos = indices_at_sase(ppt, sase)\n", " num_pulses = len(abs_pos)\n", " \n", " if num_pulses > 1:\n", " rel_pos = (abs_pos - abs_pos[0])\n", " pulse_len = rel_pos[1] - rel_pos[0]\n", " elif num_pulses == 1:\n", " rel_pos = np.zeros(1)\n", " pulse_len = single_pulse_length\n", " elif num_pulses == 0:\n", " return\n", " \n", " pulse_offset = pulse_offsets[index]\n", " pulse_count = pulse_counts[index]\n", " train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]\n", " \n", " start_frac = first_pulse_offset + rel_pos * 2 * clock_factor\n", " start = start_frac.astype(int)\n", " \n", " if start.shape != train_triggers.shape:\n", " print(f'pulse number mismatch in train {index} / {train_id}, SKIPPING')\n", " return\n", " \n", " train_triggers['start'] = start\n", " train_triggers['stop'] = start + int(pulse_len * 2 * clock_factor) - 1\n", " train_triggers['offset'] = start_frac - start\n", " \n", " with timing('find_triggers'):\n", " psh.map(trigger_by_ppt, keydata)\n", " \n", "elif remi['trigger']['method'] == 'edge':\n", " edgec = remi['trigger']['edge']\n", " keydata = dc[remi.get_channel_sourcekey(edgec['channel'])]\n", "\n", " trace_len = keydata.entry_shape[0]\n", " group_start = edgec['group_start']\n", " group_end = edgec['group_end']\n", " dead_time = edgec['dead_time']\n", " \n", " def prepare_trigger_edge_worker(worker_id):\n", " correct_func = remi.get_baseline_corrector()\n", " discr_func, discr_params = remi.get_discriminator([edgec['channel']])\n", "\n", " bl_start, bl_stop, _ = remi.get_baseline_limits(trace_len)\n", " bl_sym = remi['digitizer']['baseline_symmetry']\n", " \n", " edge_pos = np.empty(10000, dtype=np.float64)\n", " trace_corr = np.empty(trace_len, dtype=np.float64)\n", " baselines = np.empty(bl_sym, dtype=np.float64)\n", " yield\n", " \n", " def group_boundaries(trigger_edges):\n", " cur_edge = trigger_edges[0]\n", "\n", " for i in range(1, len(trigger_edges)):\n", " next_edge = trigger_edges[i]\n", " edge_diff = int(next_edge) - int(cur_edge)\n", "\n", " if edge_diff <= dead_time:\n", " pass\n", "\n", " elif edge_diff > dead_time and edge_diff >= group_end:\n", " yield cur_edge, int(cur_edge) + group_start, int(cur_edge) + group_end\n", " cur_edge = trigger_edges[i]\n", "\n", " elif edge_diff > dead_time and edge_diff < group_end:\n", " yield cur_edge, int(cur_edge) + group_start, int(next_edge)\n", " cur_edge = trigger_edges[i]\n", "\n", " elif edge_diff < group_end:\n", " pass\n", "\n", " yield cur_edge, int(cur_edge) + group_start, int(cur_edge) + group_end\n", " \n", " @psh.with_init(prepare_trigger_edge_worker)\n", " def trigger_by_edge(worker_id, index, train_id, trace_raw):\n", " correct_func(trace_raw, trace_corr, baselines, bl_start, bl_stop)\n", "\n", " pulse_offset = pulse_offsets[index]\n", " pulse_count = pulse_counts[index]\n", " num_triggers = discr_func(trace_corr, edge_pos, **discr_params[0])\n", "\n", " groups = group_boundaries(edge_pos[:num_triggers])\n", " train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]\n", " \n", " if num_triggers == 0 or num_triggers != pulse_count:\n", " print(f'index={index}, train_id={train_id}: Unexpected '\n", " f'num_triggers={num_triggers} for pulse_count={pulse_count}')\n", " return\n", "\n", " for (edge, start, stop), pulse_trigger in zip(groups, train_triggers):\n", " pulse_trigger['start'] = start\n", " pulse_trigger['stop'] = stop\n", " pulse_trigger['offset'] = start - edge\n", " \n", " with timing('find_triggers'):\n", " psh.map(trigger_by_edge, keydata)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, (lx, rx) = plt.subplots(num=2, ncols=2, nrows=1, figsize=(9, 4), clear=True,\n", " gridspec_kw=dict(top=0.75))\n", "\n", "# Display ~400 pulses or 10 trains, whatever is lower\n", "n_trains = max(abs(pulse_offsets - 400).argmin(), 10)\n", "\n", "visible_trigger_starts = triggers['start'][:pulse_offsets[n_trains]]\n", "\n", "lx.plot(visible_trigger_starts, '.', ms=2)\n", "lx.vlines(pulse_offsets[:n_trains], 0, visible_trigger_starts.max(), color='grey', linewidth=1, alpha=0.2)\n", "lx.tick_params(right=True)\n", "\n", "lx.set_xlabel('Pulse index')\n", "lx.set_xlim(-15, pulse_offsets[n_trains]+15)\n", "\n", "lx.set_ylabel('Trigger position')\n", "lx.set_ylim(0, visible_trigger_starts.max())\n", "\n", "train_lx = lx.twiny()\n", "train_lx.set_xlabel('Train ID', labelpad=8)\n", "train_lx.set_xlim(lx.get_xlim())\n", "train_lx.set_xticks(pulse_offsets[:n_trains])\n", "train_lx.set_xticklabels([str(int(x)) for x in dc.train_ids[:n_trains]],\n", " rotation=-45, fontsize='x-small')\n", "\n", "rx.plot(triggers['start'], lw=0.2)\n", "rx.tick_params(left=False, labelleft=False, right=True, labelright=True)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Analog signal to digital edges" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Find edges in analog signal" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_edges))\n", "threadpool_limits(limits=remi.get_num_workers(mt_avg_trace))\n", "\n", "edges_by_det = {}\n", "avg_traces_by_det = {}\n", "\n", "for det_name, det in remi['detector'].items():\n", " det_sourcekeys = remi.get_detector_sourcekeys(det_name)\n", " det_get_traces = remi.get_traces_getter(det_name)\n", " trace_len = dc[next(iter(det_sourcekeys))].entry_shape[0]\n", " \n", " edges = psh.alloc(shape=(num_pulses, 7, det['max_hits']),\n", " dtype=np.float64, fill=np.nan)\n", " avg_traces = psh.alloc_per_worker(shape=(7, trace_len), dtype=np.float64)\n", " \n", " def prepare_edge_worker(worker_id):\n", " correct_func = remi.get_baseline_corrector()\n", " discr_func, discr_params = remi.get_discriminator(det['channels'])\n", " \n", " source_name = remi['digitizer']['source']\n", " bl_start, bl_stop, _ = remi.get_baseline_limits(trace_len)\n", " bl_sym = remi['digitizer']['baseline_symmetry']\n", " time_cal = remi.get_time_calibration()\n", " \n", " traces_corr = np.empty((7, trace_len), dtype=np.float64)\n", " baselines = np.empty(bl_sym, dtype=np.float64)\n", " yield\n", " \n", " @psh.with_init(prepare_edge_worker)\n", " def find_edges(worker_id, index, train_id, data):\n", " try:\n", " data = det_get_traces(data[source_name])\n", " except KeyError:\n", " return\n", "\n", " for channel_idx in range(7):\n", " correct_func(data[channel_idx], traces_corr[channel_idx],\n", " baselines, bl_start, bl_stop)\n", "\n", " avg_traces[worker_id] += traces_corr\n", "\n", " pulses_slice = np.s_[pulse_offsets[index]:pulse_offsets[index]+pulse_counts[index]]\n", "\n", " for trigger, pulse_edges in zip(triggers[pulses_slice], edges[pulses_slice]):\n", " trigger_slice = np.s_[trigger['start']:trigger['stop']]\n", " \n", " for trace, channel_params, channel_edges in zip(traces_corr, discr_params, pulse_edges):\n", " discr_func(trace[trigger_slice], channel_edges, **channel_params)\n", "\n", " pulse_edges += trigger['offset']\n", " pulse_edges *= time_cal\n", " \n", " with timing(f'find_edges, {det_name}'):\n", " psh.map(find_edges, dc.select(det_sourcekeys))\n", " \n", " edges_by_det[det_name] = edges\n", " avg_traces_by_det[det_name] = avg_traces.sum(axis=0) / len(dc.train_ids)\n", " \n", " with np.printoptions(precision=2, suppress=True):\n", " print(edges[:5, :, :8])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Global average of analog signals" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i, det_name in enumerate(remi['detector'].keys()):\n", " fig, axs = plt.subplots(num=10+i, nrows=7, figsize=(9.5, 8), clear=True,\n", " gridspec_kw=dict(left=0.1, right=0.98, top=0.98, bottom=0.1))\n", " fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')\n", "\n", " for edge_idx, edge_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):\n", " axs[edge_idx].plot(avg_traces_by_det[det_name][edge_idx], lw=1)\n", " axs[edge_idx].tick_params(labelbottom=False)\n", " axs[edge_idx].set_ylabel(edge_name)\n", " \n", " axs[-1].tick_params(labelbottom=True)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sample for digitized traces" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "for i, det_name in enumerate(remi['detector'].keys()):\n", " edges = edges_by_det[det_name]\n", " \n", " fig = plt.figure(num=100+i, figsize=(9.5, 8))\n", " fig.clf()\n", " \n", " grid = fig.add_gridspec(ncols=2, nrows=4, left=0.1, right=0.98, top=0.98, bottom=0.1)\n", "\n", " fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')\n", "\n", " for signal_idx, signal_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):\n", " row = (1 + signal_idx // 2) if signal_idx < 6 else 0\n", " col = (signal_idx % 2) if signal_idx < 6 else np.s_[:]\n", " ax = fig.add_subplot(grid[row, col])\n", " \n", " pulse_idx = np.where(np.isfinite(edges[:, signal_idx, 0]))[0][0]\n", " train_idx = np.where(pulse_idx >= pulse_offsets)[0][-1]\n", " trigger = triggers[pulse_idx]\n", " \n", " sourcekey = remi.get_channel_sourcekey(\n", " remi['detector'][det_name]['channels'][signal_idx])\n", " train_trace = dc[sourcekey].select_trains(np.s_[train_idx:train_idx+1]).ndarray()[0]\n", " corr_trace = np.zeros_like(train_trace, dtype=np.float64)\n", "\n", " remi.get_baseline_corrector()(\n", " train_trace, corr_trace,\n", " np.empty(remi['digitizer']['baseline_symmetry'], dtype=np.float64),\n", " *remi.get_baseline_limits(len(train_trace))[:2])\n", " \n", " pulse_trace = corr_trace[np.s_[trigger['start']:trigger['stop']]]\n", " \n", " x_time = remi.get_time_calibration() * (np.arange(len(pulse_trace) + trigger['offset']))\n", " \n", " ax.plot(x_time, pulse_trace, lw=1)\n", " ax.set_xlim(x_time[0], x_time[-1])\n", " ax.set_ylim(-200, pulse_trace.max()*1.1)\n", " ax.text(x_time[-1], pulse_trace.max(),\n", " f'T{train_idx} P{pulse_idx - pulse_offsets[train_idx]} ',\n", " va='top', ha='right')\n", " ax.tick_params(labelbottom=False)\n", " ax.set_ylabel(signal_name)\n", " \n", " ax.vlines(edges[pulse_idx, signal_idx, :], *ax.get_ylim(), color='red', linewidth=1)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Digitized channel spectra" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "for i, det_name in enumerate(remi['detector'].keys()):\n", " fig = plt.figure(num=20+i, figsize=(9.5, 6))\n", " fig.clf()\n", " \n", " edges = edges_by_det[det_name]\n", " \n", " min_edge = edges[np.isfinite(edges)].min()\n", " max_edge = edges[np.isfinite(edges)].max()\n", "\n", " grid = fig.add_gridspec(ncols=3, nrows=3, left=0.08, right=0.98, top=0.95, hspace=0.4)\n", "\n", " fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')\n", "\n", " numx = fig.add_subplot(grid[0, 0])\n", " numx.set_title('Edges per pulse')\n", "\n", " agg_window = num_pulses // 60\n", " max_num_edges = 0.0\n", " max_spectral_intensity = 0\n", " hist_axs = []\n", "\n", " for edge_idx, edge_name in enumerate(['u1', 'u2', 'v1', 'v2', 'w1', 'w2', 'mcp']):\n", " num_edges = np.isfinite(edges[:, edge_idx, :]).sum(axis=1)\n", " num_edges = num_edges[:((len(num_edges) // agg_window) * agg_window)]\n", " num_edges = num_edges.reshape(-1, agg_window).mean(axis=1)\n", "\n", " if edge_idx < 6:\n", " plot_kwargs = dict(c=f'C{edge_idx}', ls='solid', lw=1.0)\n", " else:\n", " plot_kwargs = dict(c='k', ls='dashed', lw=1.0)\n", "\n", " numx.plot(np.arange(len(num_edges)) * agg_window, num_edges, label=edge_name, **plot_kwargs)\n", " max_num_edges = max(max_num_edges, num_edges.max())\n", "\n", " cur_edges = edges[:, edge_idx, :].flatten()\n", "\n", " if edge_idx < 6:\n", " row = 1 + edge_idx % 2\n", " col = edge_idx // 2\n", " else:\n", " row = 0\n", " col = np.s_[1:3]\n", "\n", " ax = fig.add_subplot(grid[row, col])\n", " ax.set_title(f'TOF spectrum: {edge_name}')\n", " y, _, _ = ax.hist(cur_edges[np.isfinite(cur_edges)], bins=int((max_edge - min_edge) // 5),\n", " range=(min_edge, max_edge), color=plot_kwargs['c'], histtype='step', linewidth=1)\n", " hist_axs.append(ax)\n", "\n", " max_spectral_intensity = max(max_spectral_intensity, y.max())\n", "\n", " numx.tick_params(labelbottom=False)\n", " numx.set_ylim(0, 1.2*max_num_edges)\n", "\n", " for ax in hist_axs:\n", " ax.set_ylim(0, max_spectral_intensity*1.1)\n", " ax.ticklabel_format(axis='y', style='sci', scilimits=(0, 3))\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Detector diagnostics" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "for i, det_name in enumerate(remi['detector'].keys()):\n", " edges = edges_by_det[det_name]\n", " \n", " sort = remi.get_dld_sorter(det_name)\n", " \n", " sum_shifts = sort.sum_shifts if sort.sum_shifts != (0.0, 0.0, 0.0) else None\n", " \n", " is_valid = remi.get_presort_mask(edges, edge_idx=0,\n", " sum_limit=max(sort.uncorrected_time_sum_half_widths),\n", " sum_shifts=sum_shifts)\n", " signals, sums = remi.get_signals_and_sums(edges, indices=sort.channel_indices, sum_shifts=sum_shifts,\n", " mask=is_valid)\n", " fig = plot_detector_diagnostics(signals=signals, sums=sums, fig_num=30+i, im_scale=1.5,\n", " sum_range=max(sort.uncorrected_time_sum_half_widths),\n", " sorter=sort)\n", " fig.text(0.02, 0.98, det_name.upper() + ' before corrections', rotation=90, ha='left', va='top', size='x-large')\n", " \n", " if remi['detector'][det_name]['use_sum_correction'] or remi['detector'][det_name]['use_pos_correction']:\n", " n_masked = is_valid.sum()\n", " signals = np.full((n_masked, 3), np.nan, dtype=np.float64)\n", " sums = np.full((n_masked, 3), np.nan, dtype=np.float64)\n", "\n", " sort.correct(edges[is_valid], signals, sums)\n", " fig = plot_detector_diagnostics(signals=signals, sums=sums, fig_num=40+i, im_scale=1.5,\n", " sum_range=max(sort.uncorrected_time_sum_half_widths),\n", " sorter=sort)\n", " fig.text(0.02, 0.98, det_name.upper() + ' after corrections', rotation=90, ha='left', va='top', size='x-large')\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Hit reconstruction" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_rec_hits))\n", "\n", "signals_by_det = {}\n", "hits_by_det = {}\n", "hit_counts_by_det = {}\n", "\n", "for det_name, det in remi['detector'].items():\n", " edges = edges_by_det[det_name]\n", " \n", " signals = psh.alloc(shape=(num_pulses, 50), dtype=signal_dt, fill=np.nan)\n", " hits = psh.alloc(shape=(num_pulses, 50), dtype=hit_dt, fill=(np.nan, np.nan, np.nan, -1))\n", " hit_counts = psh.alloc(shape=len(dc.train_ids), dtype=np.uint32)\n", "\n", " def prepare_hit_worker(worker_id):\n", " sort = remi.get_dld_sorter(det_name)\n", " yield\n", "\n", " @psh.with_init(prepare_hit_worker)\n", " def reconstruct_hits(worker_id, index, train_id):\n", " hit_counts[index] += sort.run_on_train(\n", " edges, signals, hits, pulse_offsets[index], pulse_offsets[index] + pulse_counts[index])\n", " \n", " with timing(f'rec_hits, {det_name}'):\n", " psh.map(reconstruct_hits, dc.train_ids)\n", " \n", " signals_by_det[det_name] = signals\n", " hits_by_det[det_name] = hits\n", " hit_counts_by_det[det_name] = hit_counts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "fig, ax = plt.subplots(num=50+i, figsize=(9.5, 4), ncols=1, clear=True,\n", " gridspec_kw=dict(top=0.92, right=0.98, left=0.05, bottom=0.12))\n", " \n", "max_num_hits = 0.0\n", " \n", "for det_name in remi['detector'].keys():\n", " agg_window = num_pulses // 1000\n", " \n", " num_hits = np.isfinite(hits_by_det[det_name]['x']).sum(axis=1)\n", " num_hits = num_hits[:(len(num_hits) // agg_window) * agg_window]\n", " num_hits = num_hits.reshape(-1, agg_window).mean(axis=1)\n", " max_num_hits = max(max_num_hits, num_hits.max())\n", " \n", " ax.plot(np.arange(0, (num_pulses // agg_window) * agg_window, agg_window), num_hits,\n", " lw=1, label=det_name.upper())\n", " \n", "ax.set_title('Hits per pulse')\n", "ax.set_xlabel('Pulse index')\n", "ax.set_ylim(0, max_num_hits*1.1)\n", "ax.legend()\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Reconstruction methods\n", "Each hit may be reconstructed by one of 19 different methods. These differ by the number of real signals across the channels, which could be combined to form the hit. Each of these methods is designed by a number between `0` and `19` (with empty hits using `-1`), which can be found in the `m` key of a hit, e.g.:\n", "\n", "* `0`: All six anode signals and the corresponding MCP signal were found.\n", "* `4`: One signal on layer `u` is missing, all other signals for this event were found.\n", "* `18`: Only one anode signal on each layer was found and the MCP signal is missing. There is no way to check whether this combination of signals is actually valid.\n", "\n", "| Method | `u+v+w +mcp` |\n", "| - | - |\n", "| 0 | `2+2+2 +1` |\n", "| 1 | `0+2+2 +1` |\n", "| 2 | `2+0+2 +1` |\n", "| 3 | `2+2+0 +1` |\n", "| 4 | `1+2+2 +1` (2 permutations) |\n", "| 5 | `2+1+2 +1` (2 permutations) |\n", "| 6 | `2+2+1 +1` (2 permutations) |\n", "| 7 | `2+2+2 +0` |\n", "| 8 | `0+2+2 +0` |\n", "| 9 | `2+0+2 +0` |\n", "| 10 | `2+2+0 +0` |\n", "| 11 | `1+2+2 +0` (2 permutations) |\n", "| 12 | `2+1+2 +0` (2 permutations) |\n", "| 13 | `2+2+1 +0` (2 permutations) |\n", "| 14 | `2+1+1 +1` `1+2+1 +1` `1+1+2 +1` (12 permutations) |\n", "| 15 | `2+1+0 +1` `2+0+1 +1` `1+2+0 +1` `1+0+2 +1` `0+2+1 +1` `0+1+2 +1` (12 permutations) |\n", "| 16 | `1+1+1 +1` (8 permutations) |\n", "| 17 | `2+1+1 +0` `1+2+1 +0` `1+1+2 +0` (12 permutations) |\n", "| 18 | `1+1+1 +0` (8 permutations) |\n", "| 19 | `2+1+0 +0` `2+0+1 +0` `1+2+0 +0` `1+0+2 +0` `0+1+2 +0` `0+2+1 +0` (12 permutations) |\n", "\n", "* For hits reconstructed with method `> 10`, extra attention should be given to ensure they add meaningful signal.\n", "* Any method `> 14` has to considered risky, because no time sum can be checked nor the position check be used. If the scale factors and/or `w` shift are not correct, then the number of events reconstructed with the risky methods will increase. They will most likely be *ghost hits*, which do not correspond to actual impacts on the detector." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "for i, det_name in enumerate(remi['detector'].keys()):\n", " hits = hits_by_det[det_name]\n", " \n", " fig, ax = plt.subplots(num=60+i, figsize=(9.5, 5), ncols=1, clear=True,\n", " gridspec_kw=dict(left=0.08, right=0.91, top=0.8))\n", " \n", " fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')\n", "\n", " method_bins = np.bincount(hits['m'][hits['m'] >= 0], minlength=20)\n", " ax.bar(np.arange(20), method_bins, width=0.5)\n", "\n", " ax.set_xlabel('Reconstruction method')\n", " ax.set_xlim(-0.5, 19.5)\n", " ax.set_xticks(np.arange(20))\n", "\n", " ax.set_ylabel('Number of hits')\n", " ax.set_ylim(0, method_bins.max()*1.05)\n", " ylims = ax.get_ylim()\n", "\n", " ax.tick_params(which='both', right=True, labelright=True)\n", "\n", " num_risky = method_bins[15:].sum()\n", " num_total = method_bins.sum()\n", "\n", " ax.text(14.2, method_bins.max(), f'{(100*(num_total-num_risky)/num_total):.2g}%',\n", " va='top', ha='right', color='black')\n", " ax.text(14.8, method_bins.max(), f'{(100*num_risky/num_total):.2g}%',\n", " va='top', ha='left', color='red')\n", "\n", " ax.fill([14.5, 19.5, 19.5, 14.5], [ylims[0], ylims[0], ylims[1], ylims[1]], c='r', alpha=0.2)\n", "\n", " labelx = ax.twiny()\n", " labelx.set_xlim(*ax.get_xlim())\n", " labelx.set_xticks(ax.get_xticks())\n", " labelx.set_xticklabels([\n", " '2+2+2 +1',\n", " '0+2+2 +1', '2+0+2 +1', '2+2+0 +1',\n", " '1+2+2 +1', '2+1+2 +1', '2+2+1 +1',\n", " '2+2+2 +0',\n", " '0+2+2 +0', '2+0+2 +0', '2+2+0 +0', '1+2+2 +0', '2+1+2 +0', '2+2+1 +0',\n", " '2+1+1 +1',\n", " '2+1+0 +1',\n", " '1+1+1 +1',\n", " '2+1+1 +0',\n", " '1+1+1 +0',\n", " '2+1+0 +0',\n", " ], rotation=90)\n", " \n", " min_rel_tick = np.ceil((ax.get_ylim()[0] / num_total) / 0.1) * 0.1\n", " max_rel_tick = np.floor((method_bins.max() / num_total) / 0.1) * 0.1\n", " \n", " rely = ax.twinx()\n", " rely.set_ylim(*ax.get_ylim())\n", " rely.set_yticks(np.arange(0.0, max_rel_tick+0.01, 0.1)*num_total)\n", " rely.set_yticks(np.arange(0.0, ylims[1]/num_total, 0.02)*num_total, minor=True)\n", " rely.set_yticklabels([f'{(y/num_total)*100:.0f}%' for y in rely.get_yticks()])\n", " rely.set_ylabel('Percentage of total hits')\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Detector image and fishes" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "for i, det_name in enumerate(remi['detector'].keys()):\n", " flat_hits = hits_by_det[det_name].reshape(-1)\n", " flat_hits = flat_hits[np.isfinite(flat_hits[:]['x'])]\n", " flat_hits = flat_hits[flat_hits['m'] < 10]\n", "\n", " fig = plt.figure(num=70+i, figsize=(9, 13.5))\n", " fig.clf()\n", " \n", " fig.text(0.02, 0.98, det_name.upper(), rotation=90, ha='left', va='top', size='x-large')\n", " fig.text(0.02, 0.02, det_name.upper(), rotation=90, ha='left', va='bottom', size='x-large')\n", " \n", " imp = fig.add_axes([0.1 + 0.25/2, 0.56, 0.6, 0.4])\n", " txp = fig.add_axes([0.1, 0.28, 0.85, 0.22])\n", " typ = fig.add_axes([0.1, 0.04, 0.85, 0.22])\n", " \n", " im_radius = remi['detector'][det_name]['mcp_radius']*1.1\n", " \n", " min_tof = flat_hits['t'].min()\n", " max_tof = flat_hits['t'].max()\n", " \n", " imp.hist2d(flat_hits['x'], flat_hits['y'], bins=(256, 256),\n", " range=[[-im_radius, im_radius], [-im_radius, im_radius]], norm=LogNorm())\n", " imp.xaxis.set_label_position('top')\n", " imp.set_xlabel('X / mm')\n", " imp.set_ylabel('Y / mm')\n", " imp.tick_params(right=True, labelright=True, top=True, labeltop=True)\n", " imp.grid()\n", "\n", " for ax, dim_label in zip([txp, typ], ['x', 'y']):\n", " ax.hist2d(flat_hits['t'], flat_hits[dim_label], bins=(int((max_tof - min_tof) // 5), 256),\n", " range=[[min_tof, max_tof], [-im_radius, im_radius]], norm=LogNorm())\n", " ax.set_ylabel(f'{dim_label.upper()} / mm')\n", " \n", " typ.set_xlabel('Time-of-flight / ns')\n", " txp.tick_params(bottom=True, labelbottom=False, top=True, labeltop=True, right=True, labelright=True)\n", " typ.tick_params(right=True, labelright=True, top=True)\n", "pass" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Transformed data files" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "inp_seq_len = len(dc.files[0].train_ids)\n", "seq_len = out_seq_len if out_seq_len > 0 else inp_seq_len\n", "\n", "# Take first field for detector data as sample for naming\n", "out_path = (Path(out_folder) / Path(out_filename)).resolve()\n", "seq_len = 5000\n", "\n", "t_write = timing('write_files')\n", "t_write.__enter__()\n", "\n", "out_path.parent.mkdir(parents=True, exist_ok=True)\n", "\n", "dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}\n", "\n", "metadata = dc.run_metadata()\n", "daq_library_bytes = metadata['daqLibrary'].encode('ascii')\n", "karabo_framework_bytes = metadata['karaboFramework'].encode('ascii')\n", "proposal_number = int(proposal) if proposal else metadata['proposalNumber']\n", "\n", "print('Writing sequence files', flush=True, end='')\n", "\n", "for seq_id, start in enumerate(range(0, len(dc.train_ids), seq_len)):\n", " train_ids = dc.train_ids[start:start+seq_len]\n", " first_train_idx = start\n", " final_train_idx = start + len(train_ids) - 1\n", " \n", " train_sel = np.s_[first_train_idx:final_train_idx+1]\n", " pulse_sel = np.s_[pulse_offsets[first_train_idx]:(pulse_offsets[final_train_idx]+pulse_counts[final_train_idx])]\n", "\n", " with h5py.File(str(out_path).format(run=run, sequence=seq_id), 'w') as h5out:\n", " h5out.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)\n", " h5out.create_dataset('INDEX/timestamp', data=np.zeros_like(train_ids, dtype=np.uint64), dtype=np.uint64)\n", " h5out.create_dataset('INDEX/flag', data=np.ones_like(train_ids, dtype=np.int32))\n", " \n", " m_data_sources = []\n", " \n", " for det_name in remi['detector']:\n", " cur_device_id = det_device_id.format(karabo_id=karabo_id, det_name=det_name.upper())\n", " cur_fast_data = f\"{cur_device_id}:{det_output_key}\"\n", " \n", " pipeline_prefixes = set()\n", " \n", " h5out.create_group(f'CONTROL/{cur_device_id}')\n", " run_group = h5out.create_group(f'RUN/{cur_device_id}')\n", " remi.attach_detector_config(det_name, run_group)\n", " \n", " h5out.create_group(f'INDEX/{cur_device_id}')\n", " h5out.create_dataset(f'INDEX/{cur_device_id}/count',\n", " data=np.ones_like(train_ids), dtype=np.uint64)\n", " h5out.create_dataset(f'INDEX/{cur_device_id}/first',\n", " data=np.arange(len(train_ids)), dtype=np.uint64)\n", " \n", " m_data_sources.append(('CONTROL', cur_device_id))\n", " \n", " for flag, data, chunks, path in [\n", " (save_raw_triggers, triggers, chunks_triggers, 'raw/triggers'),\n", " (save_raw_edges, edges_by_det[det_name], chunks_edges, 'raw/edges'),\n", " (save_rec_signals, signals_by_det[det_name], chunks_signals, 'rec/signals'),\n", " (save_rec_hits, hits_by_det[det_name], chunks_hits, 'rec/hits')\n", " ]:\n", " if not flag:\n", " continue\n", " \n", " subdata = data[pulse_sel]\n", "\n", " kwargs = dict(data=subdata, chunks=tuple(chunks), **dataset_kwargs) \\\n", " if len(subdata) > 0 else dict(shape=(0,), dtype=data.dtype)\n", " \n", " h5out.create_dataset(f'INSTRUMENT/{cur_fast_data}/{path}', **kwargs)\n", " pipeline_prefixes.add(path[:path.find('/')])\n", " \n", " for pipeline_prefix in pipeline_prefixes:\n", " h5out.create_dataset(f'INDEX/{cur_fast_data}/{pipeline_prefix}/count',\n", " data=pulse_counts[train_sel])\n", " h5out.create_dataset(f'INDEX/{cur_fast_data}/{pipeline_prefix}/first',\n", " data=pulse_offsets[train_sel] - pulse_offsets[first_train_idx])\n", " \n", " m_data_sources.append(('INSTRUMENT', f'{cur_fast_data}/{pipeline_prefix}'))\n", " \n", " now_str = datetime.now().strftime('%Y%m%dT%H%M%SZ').encode('ascii')\n", " \n", " h5m = h5out.require_group('METADATA')\n", " h5m.create_dataset('creationDate', shape=(1,), data=now_str)\n", " h5m.create_dataset('daqLibrary', shape=(1,), data=daq_library_bytes)\n", " h5m.create_dataset('dataFormatVersion', shape=(1,), data=b'1.0')\n", " \n", " m_data_sources.sort(key=lambda x: f'{x[0]}/{x[1]}')\n", " h5m.create_dataset('dataSources/dataSourceId', shape=(len(m_data_sources),),\n", " data=[f'{x[0]}/{x[1]}'.encode('ascii') for x in m_data_sources])\n", " h5m.create_dataset('dataSources/deviceId', shape=(len(m_data_sources),),\n", " data=[x[1].encode('ascii') for x in m_data_sources])\n", " h5m.create_dataset('dataSources/root', shape=(len(m_data_sources),),\n", " data=[x[0].encode('ascii') for x in m_data_sources])\n", "\n", " h5m.create_dataset('karaboFramework', shape=(1,), data=karabo_framework_bytes)\n", " h5m.create_dataset('proposalNumber', shape=(1,), dtype=np.uint32, data=proposal_number)\n", " h5m.create_dataset('runNumber', shape=(1,), dtype=np.uint32, data=run)\n", " h5m.create_dataset('sequenceNumber', shape=(1,), dtype=np.uint32, data=seq_id)\n", " h5m.create_dataset('updateDate', shape=(1,), data=now_str)\n", " \n", " print('.', flush=True, end='')\n", " \n", "print('')\n", "t_write.__exit__()" ] } ], "metadata": { "kernelspec": { "display_name": "sqs-remi", "language": "python", "name": "sqs-remi" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.3" } }, "nbformat": 4, "nbformat_minor": 2 }