diff --git a/notebooks/REMI/REMI_Digitize_and_Transform.ipynb b/notebooks/REMI/REMI_Digitize_and_Transform.ipynb
index 527af9375ba5c21f54a34f65f7007f551e42aa0d..34dfdee8d1c4b2a4eed74fcc39f1937799f7ec19 100644
--- a/notebooks/REMI/REMI_Digitize_and_Transform.ipynb
+++ b/notebooks/REMI/REMI_Digitize_and_Transform.ipynb
@@ -22,7 +22,7 @@
     "# Output parameters.\n",
     "karabo_id = 'SQS_REMI_DLD6'  # Karabo device ID root for virtual output device.\n",
     "proposal = ''  # Proposal, leave empty for auto detection based on in_folder\n",
-    "out_filename = 'CORR-R{run:04d}-REMI01-S{sequence:05d}.h5'  # Pattern for output filenames.\n",
+    "out_aggregator = 'REMI01'  # Aggregator name for output files.\n",
     "out_seq_len = 5000  # Number of trains per sequence file in output.\n",
     "det_device_id = '{karabo_id}/DET/{det_name}'  # Karabo device ID for virtual output device.\n",
     "det_output_key = 'output'  # Pipeline name for fast data output.\n",
@@ -37,8 +37,15 @@
     "dataset_compression = 'gzip'  # HDF compression method.\n",
     "dataset_compression_opts = 3  # HDF GZIP compression level.\n",
     "\n",
+    "# Trigger parameters.\n",
+    "ppt_source = 'SQS_RR_UTC/TSYS/TIMESERVER:outputBunchPattern'\n",
+    "ppl_offset = 0  # In units of the PPT.\n",
+    "laser_ppt_mask = -1  # Bit mask for used laser, negative to auto-detect from instrument. \n",
+    "instrument_sase = 3\n",
+    "first_pulse_offset = 1000\n",
+    "single_pulse_length = 25000\n",
+    "\n",
     "# Parallelization parameters.\n",
-    "mp_pulse_info = 8  # Parallelization for pulse statistics.\n",
     "mp_find_triggers = 0.5  # Parallelization for finding triggers.\n",
     "mp_find_edges = 0.5  # Parallelization for digitizing analog signal.\n",
     "mt_avg_trace = 2  # Parallelization for trace averaging.\n",
@@ -62,11 +69,13 @@
     "from datetime import datetime\n",
     "\n",
     "import pasha as psh\n",
+    "from euxfel_bunch_pattern import indices_at_sase, indices_at_laser\n",
     "from extra_data import RunDirectory\n",
     "from extra_remi import Analysis, trigger_dt\n",
     "from extra_remi.util import timing\n",
     "from extra_remi.plots import plot_detector_diagnostics\n",
     "from extra_remi.rd_resort import signal_dt, hit_dt\n",
+    "from extra_remi.files import DataFile, sequence_pulses\n",
     "\n",
     "%matplotlib inline"
    ]
@@ -90,7 +99,7 @@
     "remi = Analysis(calib_config_path)\n",
     "\n",
     "with timing('open_run'):\n",
-    "    dc = remi.prepare_dc(RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True, _use_voview=False))"
+    "    dc = remi.prepare_dc(RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True))"
    ]
   },
   {
@@ -143,8 +152,54 @@
     "# * `pulse_offsets [int32: len(dc.train_ids)]` containing the global offset for the first pulse of each train.\n",
     "# * `num_pulses = pulse_counts.sum(axis=0)`\n",
     "\n",
+    "ppt_data = dc[ppt_source, 'data.bunchPatternTable']\n",
+    "\n",
+    "def get_pulse_positions(ppt, sase, laser, ppl_offset):\n",
+    "    # Combine FEL and PPL positions.\n",
+    "\n",
+    "    fel_pos = indices_at_sase(ppt, sase)\n",
+    "\n",
+    "    if len(fel_pos) > 0:\n",
+    "        ppl_pos = indices_at_laser(ppt, laser) + fel_pos[0] + ppl_offset\n",
+    "    else:\n",
+    "        # Just take them as they are\n",
+    "        ppl_pos = indices_at_laser(ppt, laser)\n",
+    "\n",
+    "    return np.union1d(fel_pos, ppl_pos), fel_pos, ppl_pos\n",
+    "\n",
+    "if laser_ppt_mask < 0:\n",
+    "    # If laser PPT mask is not specified, try to figure it out from device IDs.\n",
+    "    from euxfel_bunch_pattern import PPL_BITS\n",
+    "    \n",
+    "    instrument = karabo_id[:karabo_id.index('_')]\n",
+    "    \n",
+    "    try:\n",
+    "        laser_ppt_mask = PPL_BITS[f'LP_{instrument}']\n",
+    "    except KeyError:\n",
+    "        raise ValueError(f'Laser PPT mask unknown for instrument `{instrument}`')\n",
+    "\n",
     "with timing('pulse_info'):\n",
-    "    pulse_counts, pulse_offsets, num_pulses = remi.get_pulse_info(dc, mp_pulse_info)"
+    "    psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_triggers))\n",
+    "    \n",
+    "    # Build the pulse index\n",
+    "    pulse_counts = psh.alloc(shape=len(dc.train_ids), dtype=np.uint64)\n",
+    "    has_ppt = psh.alloc(shape=len(dc.train_ids), dtype=bool, fill=False)\n",
+    "    \n",
+    "    def count_pulses(wid, index, tid, ppt):\n",
+    "        pulse_counts[index] = len(get_pulse_positions(ppt, instrument_sase, laser_ppt_mask, ppl_offset)[0])\n",
+    "        has_ppt[index] = True\n",
+    "    \n",
+    "    psh.map(count_pulses, ppt_data)\n",
+    "\n",
+    "    # Fill any missing values with the highest.\n",
+    "    pulse_counts[has_ppt == False] = pulse_counts.max()\n",
+    "\n",
+    "    # Compute offsets based on pulse counts.\n",
+    "    pulse_offsets = np.zeros_like(pulse_counts)\n",
+    "    np.cumsum(pulse_counts[:-1], out=pulse_offsets[1:])\n",
+    "\n",
+    "    # Total number of pulses.\n",
+    "    num_pulses = int(pulse_counts.sum())"
    ]
   },
   {
@@ -183,132 +238,55 @@
    "source": [
     "# A trigger defines the boundary of a pulse on the digitizer trace stored per train. This cell creates a\n",
     "# global variable:\n",
-    "# * `triggers [(start: int32, stop: int32, offset: float64): num_pulses]` containing the triggers for each\n",
-    "#   pulse.\n",
-    "# \n",
-    "# Triggers may be obtained through two different methods:\n",
+    "# * `triggers [(start: int32, stop: int32, offset: float64, fel: bool, ppl: bool): num_pulses]`\n",
+    "#   containing the triggers for each pulse.\n",
     "# \n",
-    "# * `ppt` uses the pulse puttern table to locate the pulse positions on the trace. Only number of pulses and\n",
-    "#   their distance can be drawn this way, leaving the absolute offset for the very first pulse to be\n",
-    "#   configured via `trigger/ppt/first_pulse_offset`.\n",
-    "# \n",
-    "# * `edge` uses the digitizer channel `trigger/edge/channel` and builds triggers around the edges found on it.\n",
-    "#   The boundaries relative to this edge may be configured with the `group_start`, `group_end` and `dead_time`\n",
-    "#   parameters.\n",
+    "# This uses the pulse puttern table to locate the pulse positions on the trace. Only number of pulses and\n",
+    "# their distance can be drawn this way, leaving the absolute offset for the very first pulse to be\n",
+    "# configured via `trigger/ppt/first_pulse_offset`. If a PPL is used, it will be included in the trigger\n",
+    "# pattern. The ppt_offset parameter allows taking into account an offset betwen PPL and FEL.\n",
     "\n",
     "psh.set_default_context('processes', num_workers=remi.get_num_workers(mp_find_triggers))\n",
     "\n",
-    "triggers = psh.alloc(shape=(num_pulses,), dtype=trigger_dt, fill=(-1, -1, np.nan))\n",
+    "triggers = psh.alloc(shape=(num_pulses,), dtype=trigger_dt, fill=(-1, -1, np.nan, 0, 0))\n",
     "\n",
-    "if remi['trigger']['method'] == 'ppt':\n",
-    "    from euxfel_bunch_pattern import indices_at_sase\n",
-    "    \n",
-    "    pptc = remi['trigger']['ppt']\n",
-    "    \n",
-    "    keydata = dc[remi.get_ppt_sourcekey()]\n",
-    "    sase = remi['instrument']['timeserver']['sase']\n",
-    "    first_pulse_offset = pptc['first_pulse_offset']\n",
-    "    single_pulse_length = pptc['single_pulse_length']\n",
-    "    clock_factor = remi['digitizer']['clock_factor']\n",
-    "    \n",
-    "    def trigger_by_ppt(worker_id, index, train_id, ppt):\n",
-    "        abs_pos = indices_at_sase(ppt, sase)\n",
-    "        num_pulses = len(abs_pos)\n",
-    "        \n",
-    "        if num_pulses > 1:\n",
-    "            rel_pos = (abs_pos - abs_pos[0])\n",
-    "            pulse_len = rel_pos[1] - rel_pos[0]\n",
-    "        elif num_pulses == 1:\n",
-    "            rel_pos = np.zeros(1)\n",
-    "            pulse_len = single_pulse_length\n",
-    "        elif num_pulses == 0:\n",
-    "            return\n",
-    "        \n",
-    "        pulse_offset = pulse_offsets[index]\n",
-    "        pulse_count = pulse_counts[index]\n",
-    "        train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]\n",
-    "        \n",
-    "        start_frac = first_pulse_offset + rel_pos * 2 * clock_factor\n",
-    "        start = start_frac.astype(int)\n",
-    "        \n",
-    "        if start.shape != train_triggers.shape:\n",
-    "            print(f'pulse number mismatch in train {index} / {train_id}, SKIPPING')\n",
-    "            return\n",
-    "        \n",
-    "        train_triggers['start'] = start\n",
-    "        train_triggers['stop'] = start + int(pulse_len * 2 * clock_factor) - 1\n",
-    "        train_triggers['offset'] = start_frac - start\n",
-    "    \n",
-    "    with timing('find_triggers'):\n",
-    "        psh.map(trigger_by_ppt, keydata)\n",
-    "    \n",
-    "elif remi['trigger']['method'] == 'edge':\n",
-    "    edgec = remi['trigger']['edge']\n",
-    "    keydata = dc[remi.get_channel_sourcekey(edgec['channel'])]\n",
+    "clock_factor = remi['digitizer']['clock_factor']\n",
     "\n",
-    "    trace_len = keydata.entry_shape[0]\n",
-    "    group_start = edgec['group_start']\n",
-    "    group_end = edgec['group_end']\n",
-    "    dead_time = edgec['dead_time']\n",
+    "def trigger_by_ppt(worker_id, index, train_id, ppt):\n",
+    "    all_pos, fel_pos, ppl_pos = get_pulse_positions(ppt, instrument_sase, laser_ppt_mask, ppl_offset)\n",
+    "    num_pulses = len(all_pos)\n",
     "    \n",
-    "    def prepare_trigger_edge_worker(worker_id):\n",
-    "        correct_func = remi.get_baseline_corrector()\n",
-    "        discr_func, discr_params = remi.get_discriminator([edgec['channel']])\n",
-    "\n",
-    "        bl_start, bl_stop, _ = remi.get_baseline_limits(trace_len)\n",
-    "        bl_sym = remi['digitizer']['baseline_symmetry']\n",
-    "        \n",
-    "        edge_pos = np.empty(10000, dtype=np.float64)\n",
-    "        trace_corr = np.empty(trace_len, dtype=np.float64)\n",
-    "        baselines = np.empty(bl_sym, dtype=np.float64)\n",
-    "        yield\n",
-    "        \n",
-    "    def group_boundaries(trigger_edges):\n",
-    "        cur_edge = trigger_edges[0]\n",
+    "    if num_pulses == 0:\n",
+    "        return\n",
     "\n",
-    "        for i in range(1, len(trigger_edges)):\n",
-    "            next_edge = trigger_edges[i]\n",
-    "            edge_diff = int(next_edge) - int(cur_edge)\n",
+    "    rel_pos = all_pos - all_pos[0]\n",
     "\n",
-    "            if edge_diff <= dead_time:\n",
-    "                pass\n",
+    "    if num_pulses > 1:\n",
+    "        pulse_lengths = np.unique(rel_pos[1:] - rel_pos[:-1])\n",
     "\n",
-    "            elif edge_diff > dead_time and edge_diff >= group_end:\n",
-    "                yield cur_edge, int(cur_edge) + group_start, int(cur_edge) + group_end\n",
-    "                cur_edge = trigger_edges[i]\n",
+    "        if len(pulse_lengths) > 1:\n",
+    "            print('WARNING: Differing pulse lengths encountered, minimum is used!')\n",
     "\n",
-    "            elif edge_diff > dead_time and edge_diff < group_end:\n",
-    "                yield cur_edge, int(cur_edge) + group_start, int(next_edge)\n",
-    "                cur_edge = trigger_edges[i]\n",
+    "        pulse_len = pulse_lengths.min()\n",
     "\n",
-    "            elif edge_diff < group_end:\n",
-    "                pass\n",
+    "    elif num_pulses == 1:\n",
+    "        pulse_len = single_pulse_length\n",
     "\n",
-    "        yield cur_edge, int(cur_edge) + group_start, int(cur_edge) + group_end\n",
-    "    \n",
-    "    @psh.with_init(prepare_trigger_edge_worker)\n",
-    "    def trigger_by_edge(worker_id, index, train_id, trace_raw):\n",
-    "        correct_func(trace_raw, trace_corr, baselines, bl_start, bl_stop)\n",
+    "    start_frac = first_pulse_offset + rel_pos * 2 * clock_factor\n",
+    "    start_int = start_frac.astype(int)\n",
     "\n",
-    "        pulse_offset = pulse_offsets[index]\n",
-    "        pulse_count = pulse_counts[index]\n",
-    "        num_triggers = discr_func(trace_corr, edge_pos, **discr_params[0])\n",
-    "\n",
-    "        groups = group_boundaries(edge_pos[:num_triggers])\n",
-    "        train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]\n",
+    "    pulse_offset = pulse_offsets[index]\n",
+    "    pulse_count = pulse_counts[index]\n",
     "        \n",
-    "        if num_triggers == 0 or num_triggers != pulse_count:\n",
-    "            print(f'index={index}, train_id={train_id}: Unexpected '\n",
-    "                  f'num_triggers={num_triggers} for pulse_count={pulse_count}')\n",
-    "            return\n",
+    "    train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]\n",
+    "    train_triggers['start'] = start_int\n",
+    "    train_triggers['stop'] = start_int + int(pulse_len * 2 * clock_factor) - 1\n",
+    "    train_triggers['offset'] = start_frac - start_int\n",
+    "    train_triggers['fel'] = [pos in fel_pos for pos in all_pos]\n",
+    "    train_triggers['ppl'] = [pos in ppl_pos for pos in all_pos]\n",
     "\n",
-    "        for (edge, start, stop), pulse_trigger in zip(groups, train_triggers):\n",
-    "            pulse_trigger['start'] = start\n",
-    "            pulse_trigger['stop'] = stop\n",
-    "            pulse_trigger['offset'] = start - edge\n",
-    "    \n",
-    "    with timing('find_triggers'):\n",
-    "        psh.map(trigger_by_edge, keydata)"
+    "with timing('find_triggers'):\n",
+    "    psh.map(trigger_by_ppt, ppt_data)"
    ]
   },
   {
@@ -318,22 +296,35 @@
    "outputs": [],
    "source": [
     "fig, (lx, rx) = plt.subplots(num=2, ncols=2, nrows=1, figsize=(9, 4), clear=True,\n",
-    "                       gridspec_kw=dict(top=0.75))\n",
+    "                             gridspec_kw=dict(top=0.75))\n",
     "\n",
     "# Display ~400 pulses or 10 trains, whatever is lower\n",
-    "n_trains = max(abs(pulse_offsets - 400).argmin(), 10)\n",
+    "n_trains = max(abs(pulse_offsets - 200).argmin(), 5)\n",
+    "\n",
+    "visible_triggers = triggers[:pulse_offsets[n_trains]] \n",
+    "\n",
+    "pulse_index = np.arange(len(visible_triggers))\n",
+    "pumped = visible_triggers['fel'] & visible_triggers['ppl']\n",
+    "fel_only = visible_triggers['fel'] & ~pumped\n",
+    "ppl_only = visible_triggers['ppl'] & ~pumped\n",
+    "\n",
+    "lx.plot(pulse_index[pumped], visible_triggers[pumped]['start'], ' .', ms=3, c='C0', label='FEL+PPL')\n",
+    "lx.plot(pulse_index[fel_only], visible_triggers[fel_only]['start'], '.', ms=3, c='C1', label='FEL-only')\n",
+    "lx.plot(pulse_index[ppl_only], visible_triggers[ppl_only]['start'], '.', ms=2, c='C2', label='PPL-only')\n",
     "\n",
-    "visible_trigger_starts = triggers['start'][:pulse_offsets[n_trains]]\n",
+    "max_start = visible_triggers['start'].max()\n",
     "\n",
-    "lx.plot(visible_trigger_starts, '.', ms=2)\n",
-    "lx.vlines(pulse_offsets[:n_trains], 0, visible_trigger_starts.max(), color='grey', linewidth=1, alpha=0.2)\n",
+    "lx.vlines(pulse_offsets[:n_trains], 0, max_start, color='grey', linewidth=1, alpha=0.2)\n",
     "lx.tick_params(right=True)\n",
+    "lx.set_ylim(-1, max_start+1)\n",
     "\n",
     "lx.set_xlabel('Pulse index')\n",
     "lx.set_xlim(-15, pulse_offsets[n_trains]+15)\n",
     "\n",
     "lx.set_ylabel('Trigger position')\n",
-    "lx.set_ylim(0, visible_trigger_starts.max())\n",
+    "lx.set_ylim(0, max_start)\n",
+    "\n",
+    "lx.legend(fontsize='small', loc='lower right')\n",
     "\n",
     "train_lx = lx.twiny()\n",
     "train_lx.set_xlabel('Train ID', labelpad=8)\n",
@@ -343,7 +334,10 @@
     "                         rotation=-45, fontsize='x-small')\n",
     "\n",
     "rx.plot(triggers['start'], lw=0.2)\n",
+    "\n",
+    "rx.set_xlabel('Pulse index')\n",
     "rx.tick_params(left=False, labelleft=False, right=True, labelright=True)\n",
+    "\n",
     "pass"
    ]
   },
@@ -879,109 +873,53 @@
   {
    "cell_type": "code",
    "execution_count": null,
-   "metadata": {
-    "scrolled": false
-   },
+   "metadata": {},
    "outputs": [],
    "source": [
-    "inp_seq_len = len(dc.files[0].train_ids)\n",
-    "seq_len = out_seq_len if out_seq_len > 0 else inp_seq_len\n",
+    "seq_len = out_seq_len if out_seq_len > 0 else len(dc.files[0].train_ids)\n",
+    "dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}\n",
     "\n",
-    "# Take first field for detector data as sample for naming\n",
-    "out_path = (Path(out_folder) / Path(out_filename)).resolve()\n",
+    "Path(out_folder).mkdir(parents=True, exist_ok=True)\n",
+    "print('Writing sequence files', flush=True, end='')\n",
     "\n",
     "t_write = timing('write_files')\n",
     "t_write.__enter__()\n",
     "\n",
-    "out_path.parent.mkdir(parents=True, exist_ok=True)\n",
+    "for seq_id, train_mask, pulse_mask in sequence_pulses(dc.train_ids, pulse_counts, pulse_offsets, seq_len):\n",
+    "    seq_train_ids = dc.train_ids[train_mask]\n",
     "\n",
-    "dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}\n",
-    "\n",
-    "metadata = dc.run_metadata()\n",
-    "daq_library_bytes = metadata.get('daqLibrary', '0.0').encode('ascii')\n",
-    "karabo_framework_bytes = metadata.get('karaboFramework', '0.0').encode('ascii')\n",
-    "proposal_number = int(proposal) if proposal else metadata.get('proposalNumber', -1)\n",
-    "\n",
-    "print('Writing sequence files', flush=True, end='')\n",
-    "\n",
-    "for seq_id, start in enumerate(range(0, len(dc.train_ids), seq_len)):\n",
-    "    train_ids = dc.train_ids[start:start+seq_len]\n",
-    "    first_train_idx = start\n",
-    "    final_train_idx = start + len(train_ids) - 1\n",
-    "    \n",
-    "    train_sel = np.s_[first_train_idx:final_train_idx+1]\n",
-    "    pulse_sel = np.s_[pulse_offsets[first_train_idx]:(pulse_offsets[final_train_idx]+pulse_counts[final_train_idx])]\n",
-    "\n",
-    "    with h5py.File(str(out_path).format(run=run, sequence=seq_id), 'w') as h5out:\n",
-    "        h5out.create_dataset('INDEX/trainId', data=train_ids, dtype=np.uint64)\n",
-    "        h5out.create_dataset('INDEX/timestamp', data=np.zeros_like(train_ids, dtype=np.uint64), dtype=np.uint64)\n",
-    "        h5out.create_dataset('INDEX/flag', data=np.ones_like(train_ids, dtype=np.int32))\n",
-    "        \n",
-    "        m_data_sources = []\n",
+    "    with DataFile.from_details(out_folder, out_aggregator, run, seq_id) as outp:\n",
+    "        outp.create_index(seq_train_ids)\n",
     "        \n",
     "        for det_name in remi['detector']:\n",
     "            cur_device_id = det_device_id.format(karabo_id=karabo_id, det_name=det_name.upper())\n",
-    "            cur_fast_data = f\"{cur_device_id}:{det_output_key}\"\n",
     "                \n",
-    "            pipeline_prefixes = set()\n",
-    "            \n",
-    "            h5out.create_group(f'CONTROL/{cur_device_id}')\n",
-    "            run_group = h5out.create_group(f'RUN/{cur_device_id}')\n",
-    "            remi.attach_detector_config(det_name, run_group)\n",
+    "            cur_control_data = outp.create_control_source(cur_device_id)\n",
+    "            # Manually manipulate the file here, still creates the index properly.\n",
+    "            remi.attach_detector_config(det_name, cur_control_data.get_run_group())\n",
+    "            cur_control_data.create_index(len(seq_train_ids))\n",
     "            \n",
-    "            h5out.create_group(f'INDEX/{cur_device_id}')\n",
-    "            h5out.create_dataset(f'INDEX/{cur_device_id}/count',\n",
-    "                                 data=np.ones_like(train_ids), dtype=np.uint64)\n",
-    "            h5out.create_dataset(f'INDEX/{cur_device_id}/first',\n",
-    "                                 data=np.arange(len(train_ids)), dtype=np.uint64)\n",
+    "            cur_fast_data = outp.create_instrument_source(f'{cur_device_id}:{det_output_key}')\n",
     "            \n",
-    "            m_data_sources.append(('CONTROL', cur_device_id))\n",
-    "            \n",
-    "            for flag, data, chunks, path in [\n",
-    "                (save_raw_triggers, triggers, chunks_triggers, 'raw/triggers'),\n",
-    "                (save_raw_edges, edges_by_det[det_name], chunks_edges, 'raw/edges'),\n",
-    "                (save_rec_signals, signals_by_det[det_name], chunks_signals, 'rec/signals'),\n",
-    "                (save_rec_hits, hits_by_det[det_name], chunks_hits, 'rec/hits')\n",
-    "            ]:\n",
-    "                if not flag:\n",
-    "                    continue\n",
-    "                    \n",
-    "                subdata = data[pulse_sel]\n",
-    "\n",
-    "                kwargs = dict(data=subdata, chunks=tuple(chunks), **dataset_kwargs) \\\n",
-    "                    if len(subdata) > 0 else dict(shape=(0,), dtype=data.dtype)\n",
+    "            if save_raw_triggers:\n",
+    "                cur_fast_data.create_key('raw.triggers', triggers[pulse_mask],\n",
+    "                                         chunks=tuple(chunks_triggers), **dataset_kwargs)\n",
     "                \n",
-    "                h5out.create_dataset(f'INSTRUMENT/{cur_fast_data}/{path}', **kwargs)\n",
-    "                pipeline_prefixes.add(path[:path.find('/')])\n",
-    "            \n",
-    "            for pipeline_prefix in pipeline_prefixes:\n",
-    "                h5out.create_dataset(f'INDEX/{cur_fast_data}/{pipeline_prefix}/count',\n",
-    "                                     data=pulse_counts[train_sel])\n",
-    "                h5out.create_dataset(f'INDEX/{cur_fast_data}/{pipeline_prefix}/first',\n",
-    "                                     data=pulse_offsets[train_sel] - pulse_offsets[first_train_idx])\n",
+    "            if save_raw_edges:\n",
+    "                cur_fast_data.create_key('raw.edges', edges_by_det[det_name][pulse_mask],\n",
+    "                                         chunks=tuple(chunks_edges), **dataset_kwargs)\n",
     "                \n",
-    "                m_data_sources.append(('INSTRUMENT', f'{cur_fast_data}/{pipeline_prefix}'))\n",
-    "        \n",
-    "        now_str = datetime.now().strftime('%Y%m%dT%H%M%SZ').encode('ascii')\n",
-    "    \n",
-    "        h5m = h5out.require_group('METADATA')\n",
-    "        h5m.create_dataset('creationDate', shape=(1,), data=now_str)\n",
-    "        h5m.create_dataset('daqLibrary', shape=(1,), data=daq_library_bytes)\n",
-    "        h5m.create_dataset('dataFormatVersion', shape=(1,), data=b'1.0')\n",
-    "        \n",
-    "        m_data_sources.sort(key=lambda x: f'{x[0]}/{x[1]}')\n",
-    "        h5m.create_dataset('dataSources/dataSourceId', shape=(len(m_data_sources),),\n",
-    "                           data=[f'{x[0]}/{x[1]}'.encode('ascii') for x in m_data_sources])\n",
-    "        h5m.create_dataset('dataSources/deviceId', shape=(len(m_data_sources),),\n",
-    "                           data=[x[1].encode('ascii') for x in m_data_sources])\n",
-    "        h5m.create_dataset('dataSources/root', shape=(len(m_data_sources),),\n",
-    "                           data=[x[0].encode('ascii') for x in m_data_sources])\n",
-    "\n",
-    "        h5m.create_dataset('karaboFramework', shape=(1,), data=karabo_framework_bytes)\n",
-    "        h5m.create_dataset('proposalNumber', shape=(1,), dtype=np.uint32, data=proposal_number)\n",
-    "        h5m.create_dataset('runNumber', shape=(1,), dtype=np.uint32, data=run)\n",
-    "        h5m.create_dataset('sequenceNumber', shape=(1,), dtype=np.uint32, data=seq_id)\n",
-    "        h5m.create_dataset('updateDate', shape=(1,), data=now_str)\n",
+    "            if save_rec_signals:\n",
+    "                cur_fast_data.create_key('rec.signals', signals_by_det[det_name][pulse_mask],\n",
+    "                                         chunks=tuple(chunks_signals), **dataset_kwargs)\n",
+    "                \n",
+    "            if save_rec_hits:\n",
+    "                cur_fast_data.create_key('rec.hits', hits_by_det[det_name][pulse_mask],\n",
+    "                                         chunks=tuple(chunks_hits), **dataset_kwargs)\n",
+    "                \n",
+    "            cur_fast_data.create_index(raw=pulse_counts[train_mask], rec=pulse_counts[train_mask])\n",
+    "                \n",
+    "        outp.create_metadata(like=dc)\n",
     "        \n",
     "    print('.', flush=True, end='')\n",
     "    \n",