diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
index e38e3245e24c1af6486941f4c6120fcb6efa8dfd..7f4df8a20a170bed676111c2c8d232bb659a506c 100644
--- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
+++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb
@@ -21,6 +21,7 @@
     "out_folder = \"/gpfs/exfel/data/scratch/esobolev/pycal_litfrm/p002834/r0225\"  # the folder to output to, required\n",
     "metadata_folder = \"\"  # Directory containing calibration_metadata.yml when run by xfel-calibrate\n",
     "sequences = [-1] # sequences to correct, set to -1 for all, range allowed\n",
+    "overwrite = False  # IGNORED, NEEDED FOR COMPATIBILITY.\n",
     "modules = [-1] # modules to correct, set to -1 for all, range allowed\n",
     "train_ids = [-1] # train IDs to correct, set to -1 for all, range allowed\n",
     "run = 225 # runs to process, required\n",
@@ -852,7 +853,12 @@
     "    :param tid: Train Id to be loaded. First train is considered if None is given\n",
     "    :param path: Path to find image data inside h5 file\n",
     "    \"\"\"\n",
-    "    run_data = RunDirectory(data_folder, include)\n",
+    "    try:\n",
+    "        run_data = RunDirectory(data_folder, include)\n",
+    "    except FileNotFoundError:\n",
+    "        warning(f'No corrected files for {include}. Skipping plots.')\n",
+    "        import sys\n",
+    "        sys.exit(0)\n",
     "    if tid is not None:\n",
     "        tid, data = run_data.select(\n",
     "            f'{detector_id}/DET/*', source).train_from_id(tid, keep_dims=True)\n",
diff --git a/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb b/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb
index bbb32e928c2784a85ea2c1b28b58a1c39cb8388f..f05b3cf1bec5a565338c512c42518b643c220b14 100644
--- a/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb
+++ b/notebooks/AGIPD/AGIPD_Retrieve_Constants_Precorrection.ipynb
@@ -176,12 +176,16 @@
    "outputs": [],
    "source": [
     "run_dc = RunDirectory(in_folder / f\"r{run:04d}\")\n",
-    "\n",
     "instrument_src = instrument_source_template.format(karabo_id, receiver_template)\n",
     "\n",
-    "instr_dc = run_dc.select(instrument_src.format(\"*\"), require_all=True)\n",
+    "instr_dc = run_dc.select(instrument_src.format(\"*\"))\n",
+    "\n",
+    "for m in modules:\n",
+    "    # Remove empty sources from `instr_dc`\n",
+    "    if instr_dc[instrument_src.format(m), 'image.data'].shape[0] == 0:\n",
+    "        instr_dc = instr_dc.deselect(instrument_src.format(m))\n",
     "\n",
-    "if not instr_dc.train_ids:\n",
+    "if not instr_dc.all_sources:\n",
     "    raise ValueError(f\"No images found for {in_folder / f'r{run:04d}'}\")"
    ]
   },
diff --git a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb
index b065ef7681ce6de7261c389e3a0691e7dce1875c..b8f152e7a425bb6c644cc53322bd1a435e2dab94 100644
--- a/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb
+++ b/notebooks/DSSC/Characterize_DSSC_Darks_NBC.ipynb
@@ -54,7 +54,7 @@
     "offset_numpy_algorithm = \"mean\"\n",
     "\n",
     "high_res_badpix_3d = False # set this to True if you need high-resolution 3d bad pixel plots. Runtime: ~ 1h\n",
-    "slow_data_aggregators = [1,2,3,4]  # quadrant/aggregator\n",
+    "slow_data_aggregators = [1,1,1,1]  # quadrant/aggregator\n",
     "slow_data_path = 'SQS_NQS_DSSC/FPGA/PPT_Q'\n",
     "operation_mode = ''  # Detector operation mode, optional"
    ]
diff --git a/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb b/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb
index 1c1cfae94189b2ef9b910933bdb6c83f495bc0cd..928f120d8e8c24f527558a6da8dc96d88c6c3224 100644
--- a/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb
+++ b/notebooks/Jungfrau/Jungfrau_Gain_Correct_and_Verify_NBC.ipynb
@@ -72,6 +72,7 @@
    "metadata": {},
    "outputs": [],
    "source": [
+    "import fnmatch\n",
     "import multiprocessing\n",
     "import sys\n",
     "import warnings\n",
@@ -86,7 +87,7 @@
     "import pasha as psh\n",
     "import tabulate\n",
     "from IPython.display import Latex, Markdown, display\n",
-    "from extra_data import H5File, RunDirectory, by_id, components\n",
+    "from extra_data import DataCollection, H5File, RunDirectory, by_id, components\n",
     "from extra_geom import JUNGFRAUGeometry\n",
     "from matplotlib.colors import LogNorm\n",
     "\n",
@@ -479,6 +480,7 @@
    "source": [
     "# Loop over modules\n",
     "empty_seq = 0\n",
+    "corrected_files = []\n",
     "for local_karabo_da, mapped_files_module in mapped_files.items():\n",
     "    instrument_src_kda = instrument_src.format(int(local_karabo_da[-2:]))\n",
     "\n",
@@ -487,6 +489,7 @@
     "        # of corresponding raw sequence file.\n",
     "        ofile_name = sequence_file.name.replace(\"RAW\", \"CORR\")\n",
     "        out_file = out_folder / ofile_name\n",
+    "        corrected_files.append(ofile_name)\n",
     "\n",
     "        # Load sequence file data collection, data.adc keydata,\n",
     "        # the shape for data to later created arrays of the same shape,\n",
@@ -674,7 +677,10 @@
    "source": [
     "first_seq = 0 if sequences == [-1] else sequences[0]\n",
     "\n",
-    "with RunDirectory(out_folder, f\"*{run}*S{first_seq:05d}*\") as corr_dc:\n",
+    "corrected_files = [\n",
+    "    out_folder / f for f in fnmatch.filter(corrected_files, f\"*{run}*S{first_seq:05d}*\")\n",
+    "]\n",
+    "with DataCollection.from_paths(corrected_files) as corr_dc:\n",
     "    # Reading CORR data for plotting.\n",
     "    jf_corr = components.JUNGFRAU(\n",
     "        corr_dc,\n",
diff --git a/notebooks/REMI/REMI_Digitize_and_Transform.ipynb b/notebooks/REMI/REMI_Digitize_and_Transform.ipynb
index 022f851dbf4260fb0067924ec05bfe83e4db1ef5..55515eb7185ecd3666b5ef6b440b53e9685064be 100644
--- a/notebooks/REMI/REMI_Digitize_and_Transform.ipynb
+++ b/notebooks/REMI/REMI_Digitize_and_Transform.ipynb
@@ -46,9 +46,17 @@
     "ignore_ppl = False  # Ignore any PPL entries in the PPT.\n",
     "ppl_offset = 0  # In units of the PPT.\n",
     "laser_ppt_mask = -1  # Bit mask for used laser, negative to auto-detect from instrument. \n",
-    "instrument_sase = 3\n",
-    "first_pulse_offset = 1000\n",
-    "single_pulse_length = 25000\n",
+    "instrument_sase = 3  # Which SASE we're running at for PPT decoding.\n",
+    "first_pulse_offset = 10000  # Sample position where the first pulse begins, ignored when PPT is reconstructed.\n",
+    "single_pulse_length = 25000  # How many samples if there's only one pulse.\n",
+    "pulse_start_offset = 0  # Signal offset at the start of each pulse.\n",
+    "pulse_end_offset = 0  # Signal offset at the end of each pulse.\n",
+    "\n",
+    "# PPT reconstruction parameters.\n",
+    "reconstruct_ppt = False  # Reconstruct PPT from some trigger edges.\n",
+    "trigger_edge_channel = '4_D'  # Channel to use for triggering.\n",
+    "trigger_edge_offset = 0  # Offset to apply to the first trigger edge position to compute first pulse offset.\n",
+    "fake_ppt_offset = 0  # Offset in reconstructed PPT for pulses.\n",
     "\n",
     "# Parallelization parameters.\n",
     "mp_find_triggers = 0.5  # Parallelization for finding triggers.\n",
@@ -108,7 +116,8 @@
     "remi = Analysis(calib_config_path, use_hex=not quad_anode)\n",
     "\n",
     "with timing('open_run'):\n",
-    "    dc = remi.prepare_dc(RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True))"
+    "    dc = remi.prepare_dc(RunDirectory(Path(in_folder) / f'r{run:04d}', inc_suspect_trains=True),\n",
+    "                         require_ppt=not reconstruct_ppt)"
    ]
   },
   {
@@ -142,6 +151,98 @@
     "# Pulse and trigger information"
    ]
   },
+  {
+   "cell_type": "markdown",
+   "metadata": {},
+   "source": [
+    "### Read PPT from file or reconstruct PPT for older data"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "if reconstruct_ppt:\n",
+    "    # Take up to the first hundred trains for now.\n",
+    "    # Could be done for each train individually, but likely not necessary for now.\n",
+    "    trigger_trace = dc[remi['digitizer']['source'], remi['digitizer']['key_pattern'].format(trigger_edge_channel)] \\\n",
+    "        [:100].ndarray().mean(axis=0).astype(np.float64)\n",
+    "    trigger_trace -= trigger_trace[0]  # Use simple offset correction.\n",
+    "\n",
+    "    fake_ppt = np.zeros(2700, dtype=np.uint32)\n",
+    "    \n",
+    "    discr_func, discr_params = remi.get_discriminator([trigger_edge_channel])\n",
+    "\n",
+    "    edges = np.zeros(1000, dtype=np.float64)\n",
+    "    num_pulses = discr_func(trigger_trace, edges, **discr_params[0])\n",
+    "    edges = edges[:num_pulses]\n",
+    "\n",
+    "    first_edge = edges[0]\n",
+    "    rel_edges = np.round(edges - first_edge)\n",
+    "    edge_diff = rel_edges[1] - rel_edges[0]\n",
+    "\n",
+    "    if not np.allclose(rel_edges[1:] - rel_edges[:-1], edge_diff):\n",
+    "        raise ValueError('PPT reconstruction for unstable edge intervals not supported')\n",
+    "\n",
+    "    pulse_spacing = edge_diff / (2 * remi['digitizer']['clock_factor'])  # In units of PPT\n",
+    "\n",
+    "    if not float.is_integer(pulse_spacing):\n",
+    "        raise ValueError('PPT reconstruction encountered non-integer pulse spacing')\n",
+    "\n",
+    "    pulse_spacing = int(pulse_spacing)\n",
+    "\n",
+    "    # Taken from euxfel_bunch_pattern/__init__.py\n",
+    "    from euxfel_bunch_pattern import DESTINATION_T4D, DESTINATION_T5D, PHOTON_LINE_DEFLECTION\n",
+    "    if instrument_sase == 1:\n",
+    "        flag = DESTINATION_T4D\n",
+    "    elif instrument_sase == 2:\n",
+    "        flag = DESTINATION_T5D\n",
+    "    elif instrument_sase == 3:\n",
+    "        flag = DESTINATION_T4D | PHOTON_LINE_DEFLECTION\n",
+    "\n",
+    "    first_pulse_offset = int(first_edge + trigger_edge_offset)  # Overwrite notebook argument.\n",
+    "    fake_ppt[fake_ppt_offset:fake_ppt_offset + (pulse_spacing * num_pulses):pulse_spacing] = flag\n",
+    "\n",
+    "    from pasha.functor import Functor, gen_split_slices\n",
+    "    class FakeKeyDataFunctor(Functor):\n",
+    "        \"\"\"Functor appearing KeyData-like with constant data.\n",
+    "        \n",
+    "        This functor serves a constant data row for a given number\n",
+    "        of train IDs the same way a KeyData object would.\n",
+    "        \"\"\"\n",
+    "        \n",
+    "        def __init__(self, row, train_ids):\n",
+    "            self.row = row\n",
+    "            self.train_ids = train_ids\n",
+    "        \n",
+    "        def split(self, num_workers):\n",
+    "            return gen_split_slices(len(self.train_ids), n_parts=num_workers)\n",
+    "\n",
+    "        def iterate(self, share):\n",
+    "            it = zip(range(*share.indices(len(self.train_ids))), self.train_ids)\n",
+    "\n",
+    "            for index, train_id in it:\n",
+    "                yield index, train_id, self.row\n",
+    "    \n",
+    "    ppt_data = FakeKeyDataFunctor(fake_ppt, dc.train_ids)\n",
+    "    \n",
+    "    fig, ax = plt.subplots(num=99, figsize=(9, 6), clear=True, ncols=1, nrows=1)\n",
+    "\n",
+    "    ax.set_title('Edge trigger signal')\n",
+    "    ax.plot(trigger_trace, lw=1, label=f'Mean {trigger_edge_channel} trace')\n",
+    "    ax.vlines(edges, trigger_trace.min()*1.1, trigger_trace.max()*1.1,\n",
+    "              color='red', linewidth=3, alpha=0.3, label='Edge positions')\n",
+    "    \n",
+    "    ax.set_xlabel('Samples')\n",
+    "    ax.set_ylabel('Intensity / ADU')\n",
+    "    ax.legend()\n",
+    "    \n",
+    "else:\n",
+    "    ppt_data = dc[ppt_source, 'data.bunchPatternTable']"
+   ]
+  },
   {
    "cell_type": "markdown",
    "metadata": {},
@@ -161,8 +262,6 @@
     "# * `pulse_offsets [int32: len(dc.train_ids)]` containing the global offset for the first pulse of each train.\n",
     "# * `num_pulses = pulse_counts.sum(axis=0)`\n",
     "\n",
-    "ppt_data = dc[ppt_source, 'data.bunchPatternTable']\n",
-    "\n",
     "def get_pulse_positions(ppt, sase, laser, ppl_offset):\n",
     "    # Combine FEL and PPL positions.\n",
     "\n",
@@ -290,8 +389,8 @@
     "    pulse_count = pulse_counts[index]\n",
     "        \n",
     "    train_triggers = triggers[pulse_offset:pulse_offset+pulse_count]\n",
-    "    train_triggers['start'] = start_int\n",
-    "    train_triggers['stop'] = start_int + int(pulse_len * 2 * clock_factor) - 1\n",
+    "    train_triggers['start'] = start_int + pulse_start_offset\n",
+    "    train_triggers['stop'] = start_int + int(pulse_len * 2 * clock_factor) - 1 + pulse_end_offset\n",
     "    train_triggers['offset'] = start_frac - start_int\n",
     "    train_triggers['pulse'] = all_pos.astype(np.int16)\n",
     "    train_triggers['fel'] = [pos in fel_pos for pos in all_pos]\n",
@@ -911,9 +1010,36 @@
    "metadata": {},
    "outputs": [],
    "source": [
+    "# Try to figure out proposal number from in_folder to work with older files.\n",
+    "m = re.match(r'p(\\d{6})', Path(in_folder).parts[-2])\n",
+    "if not proposal and m is not None:\n",
+    "    proposal = int(m[1])\n",
+    "\n",
     "seq_len = out_seq_len if out_seq_len > 0 else len(dc.files[0].train_ids)\n",
     "dataset_kwargs = {k[8:]: v for k, v in locals().items() if k.startswith('dataset_compression')}\n",
     "\n",
+    "control_sources = [det_device_id.format(karabo_id=karabo_id, det_name=det_name.upper())\n",
+    "                   for det_name in remi['detector']]\n",
+    "\n",
+    "channels = []\n",
+    "if save_raw_triggers or save_raw_edges:\n",
+    "    channels.append('raw')\n",
+    "if save_rec_signals or save_rec_hits:\n",
+    "    channels.append('rec')\n",
+    "    \n",
+    "instrument_channels = [\n",
+    "    f'{device_id}:{det_output_key}/{channel}'\n",
+    "    for device_id in control_sources\n",
+    "    for channel in channels\n",
+    "]"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
     "Path(out_folder).mkdir(parents=True, exist_ok=True)\n",
     "print('Writing sequence files', flush=True, end='')\n",
     "\n",
@@ -924,6 +1050,8 @@
     "    seq_train_ids = dc.train_ids[train_mask]\n",
     "\n",
     "    with DataFile.from_details(out_folder, out_aggregator, run, seq_id) as outp:\n",
+    "        outp.create_metadata(like=dc, proposal=proposal, run=run, sequence=seq_id,\n",
+    "                             control_sources=control_sources, instrument_channels=instrument_channels)\n",
     "        outp.create_index(seq_train_ids)\n",
     "        \n",
     "        for det_name in remi['detector']:\n",
@@ -953,8 +1081,6 @@
     "                                         chunks=tuple(chunks_hits), **dataset_kwargs)\n",
     "                \n",
     "            cur_fast_data.create_index(raw=pulse_counts[train_mask], rec=pulse_counts[train_mask])\n",
-    "                \n",
-    "        outp.create_metadata(like=dc)\n",
     "        \n",
     "    print('.', flush=True, end='')\n",
     "    \n",
diff --git a/setup.py b/setup.py
index a449a583854adc1d70656c49a714319530330368..320b5926c3bf775fba9ce6b6c6a6567fa9fcdf5e 100644
--- a/setup.py
+++ b/setup.py
@@ -102,6 +102,8 @@ install_requires = [
         "traitlets==4.3.3",
         "xarray==2022.3.0",
         "EXtra-redu==0.0.8",
+        "rich==12.6.0",
+        "httpx==0.23.0",
 ]
 
 if "readthedocs.org" not in sys.executable:
diff --git a/src/cal_tools/agipdlib.py b/src/cal_tools/agipdlib.py
index 8d2ae746d6a32b5d101420a4234cff22d5cc2fb9..011905554fbdc24453ccfb552ec845c517ff8ad4 100644
--- a/src/cal_tools/agipdlib.py
+++ b/src/cal_tools/agipdlib.py
@@ -53,12 +53,19 @@ class AgipdCtrl:
         self.ctrl_src = ctrl_src
         self.raise_error = raise_error
 
-    def get_num_cells(self) -> Optional[int]:
-        """Read number of memory cells from fast data.
+    def _get_num_cells_ctrl(self) -> Optional[int]:
+        """Get number of cells from CONTROL source."""
+        # Attempt to look for number of cells in slow data
+        ncell_src = (
+            self.ctrl_src, "bunchStructure.nPulses.value")
+        if (
+            ncell_src[0] in self.run_dc.all_sources and
+            ncell_src[1] in self.run_dc.keys_for_source(ncell_src[0])
+        ):
+            return int(self.run_dc[ncell_src].as_single_value(reduce_by='max'))
 
-        :return mem_cells: Number of memory cells
-        return None, if no data available.
-        """
+    def _get_num_cells_instr(self) -> Optional[int]:
+        """Get number of cells from INSTRUMENT source."""
         cells = np.squeeze(
             self.run_dc[
                 self.image_src, "image.cellId"].drop_empty_trains().ndarray()
@@ -70,6 +77,20 @@ class AgipdCtrl:
         dists = [abs(o - maxcell) for o in options]
         return options[np.argmin(dists)]
 
+    def get_num_cells(self) -> Optional[int]:
+        """Read number of memory cells from fast data.
+
+        :return mem_cells: Number of memory cells
+        return None, if no data available.
+        """
+        ncell = self._get_num_cells_ctrl()
+        if ncell is not None:
+            return ncell
+        # The method implemented in this function doesn't suit for filtered
+        # data. If DAQ filters data and the last cell is removed, the
+        # function returns wrong value
+        return self._get_num_cells_instr()
+
     def _get_acq_rate_ctrl(self) -> Optional[float]:
         """Get acquisition (repetition) rate from CONTROL source."""
         # Attempt to look for acquisition rate in slow data
@@ -83,9 +104,6 @@ class AgipdCtrl:
             # about bucketing the rate for managing meta-data.
             return round(float(self.run_dc[rep_rate_src].as_single_value()), 1)
 
-    def _get_acq_rate_instr(self) -> Optional[float]:
-        """Get acquisition (repetition rate) from INSTRUMENT source."""
-
     def _get_acq_rate_instr(self) -> Optional[float]:
         """Get acquisition (repetition rate) from INSTRUMENT source."""
 
diff --git a/webservice/README.md b/webservice/README.md
index 1bced8a95363b6f49d8c60b6f338fb9723da4f3b..90a0bdc0985baaac329d64b37def61f91e2bdf20 100644
--- a/webservice/README.md
+++ b/webservice/README.md
@@ -189,3 +189,34 @@ status in myMdC should update as the processing occurs.
 
 The command ``squeue -u xcaltst`` will show running & pending Slurm jobs started
 by this test system.
+
+Manually Submitting Jobs
+------------------------
+
+A script `manual_launch.py` is provided to manually submit jobs to the service.
+
+```bash
+usage: manual_launch.py [-h] --proposal PROPOSAL [--delay DELAY] [--noconfirm] [--really] slices [slices ...]
+
+Manually submit calibration jobs.
+
+positional arguments:
+  slices               slices (or single numbers) of runs to process, inclusive range, starting at 1 (e.g. 1:3 parsed to {1, 2, 3}, 10 parsed to {10}, :10
+                       parsed to {1, 2, ..., 10})
+
+optional arguments:
+  -h, --help           show this help message and exit
+  --proposal PROPOSAL  proposal number
+  --delay DELAY        delay in seconds between submissions
+  --noconfirm          skip confirmation
+  --really             actually submit jobs instead of just printing them
+
+To run in the background use `nohup PYTHONUNBUFFERED=1 python manual_launch.py ... &` followed by `disown`.
+```
+
+Slices inclusive, so `1:10` would mean runs 1 to 10 inclusive of 1 and 10. The
+'slice' can also be a single number.
+
+Example of usage would be `python3 ./manual_launch.py 1 10:12 160:-1 --delay 60
+--proposal 2222 --really` to submit runs 1, 10 to 12, and 160+ for calibration,
+for proposal 2222, with a 60 second delay between submissions.
diff --git a/webservice/manual_launch.py b/webservice/manual_launch.py
index 4753bbdafc2797cfc259355c92b8c1bf9598acf8..154e190b652c699a000f8fdca80aa8ef50db9ba3 100644
--- a/webservice/manual_launch.py
+++ b/webservice/manual_launch.py
@@ -1,26 +1,291 @@
+from __future__ import annotations
+
+import argparse
+import datetime as dt
+import time
+from contextlib import contextmanager
+from pathlib import Path
+from typing import Generator, Optional
+
 import zmq
+from config import webservice as config
+from httpx import Client, Response
+from rich import print
+from rich.progress import (
+    MofNCompleteColumn,
+    Progress,
+    SpinnerColumn,
+    TextColumn,
+    TimeElapsedColumn,
+)
+from rich.prompt import Prompt
+
+parser = argparse.ArgumentParser(
+    description="Manually submit calibration jobs.",
+    epilog="""To run in the background use `nohup PYTHONUNBUFFERED=1 python
+    manual_launch.py ... &` followed by `disown`.""",
+)
+
+parser.add_argument(
+    "slices",
+    type=str,
+    nargs="+",
+    help="""slices (or single numbers) of runs to process, inclusive range, starting at
+    1 (e.g. 1:3 parsed to {1, 2, 3}, 10 parsed to {10}, :10 parsed to {1, 2, ...,
+    10})""",
+)
+parser.add_argument(
+    "--proposal",
+    type=int,
+    help="proposal number",
+    required=True,
+)
+parser.add_argument(
+    "--delay",
+    default=30,
+    type=int,
+    help="delay in seconds between submissions",
+    required=False,
+)
+parser.add_argument(
+    "--noconfirm",
+    action="store_true",
+    help="skip confirmation",
+)
+parser.add_argument(
+    "--really",
+    action="store_true",
+    help="actually submit jobs instead of just printing them",
+)
+
+
+BEARER = {
+    "access_token": "",
+    "expires_at": dt.datetime.now(),
+}
+
+
+def pre_checks():
+    # Fail fast if we don't have the required configs set
+    required_keys = ["token-url", "user-id", "user-secret", "user-email"]
+    for k in required_keys:
+        if config["metadata-client"][k] is None:
+            print(
+                f"Missing key [bold red]`{k}`[/bold red] in metadata client configuration"
+            )
+            print("[bold red]Aborted[/bold red]")
+            exit(1)
+
+
+def get_bearer_token() -> str:
+    if BEARER["access_token"] and BEARER["expires_at"] > dt.datetime.now():
+        return BEARER["access_token"]
+
+    with Client() as client:
+        response = client.post(
+            f"{config['metadata-client']['token-url']}",
+            data={
+                "grant_type": "client_credentials",
+                "client_id": config["metadata-client"]["user-id"],
+                "client_secret": config["metadata-client"]["user-secret"],
+            },
+        )
+
+    data = response.json()
+
+    if any(k not in data for k in ["access_token", "expires_in"]):
+        print(
+            "Response from MyMdC missing required fields, check webservice `user-id`"
+            f"and `user-secret`. Response: {data=}",
+        )
+        raise ValueError("Invalid response from MyMdC")
+
+    expires_in = dt.timedelta(seconds=data["expires_in"])
+
+    BEARER["access_token"] = data["access_token"]
+    BEARER["expires_at"] = dt.datetime.now() + expires_in
+
+    return BEARER["access_token"]
+
+
+@contextmanager
+def get_client() -> Generator[Client, None, None]:
+    bearer_token = get_bearer_token()
+
+    with Client() as client:
+        headers = {
+            "accept": "application/json; version=1",
+            "X-User-Email": config["metadata-client"]["user-email"],
+            "Authorization": f"Bearer {bearer_token}",
+        }
+
+        client.headers.update(headers)
+
+        yield client
+
+
+def _get_runs_by_proposal(number: int, client: Client, page: int = 1) -> Response:
+    return client.get(
+        f"{config['metadata-client']['base-api-url']}/runs/runs_by_proposal",
+        params={"proposal_number": number, "page": page},
+        timeout=10,
+    )
+
+
+def get_runs_by_proposal_all(number: int) -> list[dict]:
+    with get_client() as client:
+        res = _get_runs_by_proposal(number, client, 1)
+        if res.status_code != 200:
+            raise ValueError(res.url, res.text)
+        runs = res.json()
+        for page in range(2, int(res.headers.get("x-total-pages", 1)) + 1):
+            _ = _get_runs_by_proposal(number, client, page)
+            runs.extend(_.json())
+
+    return runs
+
+
+def main(
+    proposal_no: int,
+    slices: list[slice],
+    delay: int,
+    noconfirm: Optional[bool] = False,
+    really: Optional[bool] = False,
+):
+    with Progress(transient=True) as progress:
+        task_submission = progress.add_task(
+            "[yellow]Querying FS for proposal information", total=None
+        )
+        exp = Path("/gpfs/exfel/exp")
+        proposal_paths = list(exp.glob(f"*/*/p{proposal_no:06d}"))
+        if len(proposal_paths) != 1:
+            raise ValueError(f"Proposal {proposal_no} not found")
+
+        proposal_path = proposal_paths[0]
+        instrument = proposal_path.parts[4]
+        cycle = proposal_path.parts[5]
+
+        progress.update(task_submission, description="[yellow]Querying MyMdC for runs")
+
+        all_runs = get_runs_by_proposal_all(proposal_no)
+
+    run_no_id_map = {run["run_number"]: run["id"] for run in all_runs}
+    max_run_no = max(run_no_id_map.keys())
+    requested_ranges = [range(*s.indices(max_run_no)) for s in slices]
+    requested_run_nos = {run_no for r in requested_ranges for run_no in r}
+
+    requests = dict(
+        sorted(
+            {
+                run_no: run_no_id_map[run_no]
+                for run_no in requested_run_nos
+                if run_no in run_no_id_map
+            }.items()
+        )
+    )
+
+    if missing_run_ids := set(requested_run_nos) - set(run_no_id_map.keys()):
+        print(
+            f"[bold red]Missing run IDs for run number(s) {missing_run_ids}[/bold red]"
+        )
+
+    if not really:
+        print("[yellow]`--really` flag missing, not submitting jobs")
+
+    if not noconfirm and not Prompt.ask(
+        f"Submit [red bold]{len(requests)}[/red bold] jobs for proposal "
+        f"[bold]{proposal_no}[/bold]? [y/[bold]n[/bold]]",
+        default=False,
+    ):
+        print("[bold red]Aborted[/bold red]")
+        exit(1)
+
+    with Progress(
+        SpinnerColumn(),
+        TextColumn("[progress.description]{task.description}"),
+        MofNCompleteColumn(),
+        TimeElapsedColumn(),
+    ) as progress:
+        description = f"[green]Submitted request for p{proposal_no:05d}/{{run_str}} "
+        task_submission = progress.add_task(
+            f"{description}r---[------]", total=len(requests)
+        )
+        con = zmq.Context()
+        socket = con.socket(zmq.REQ)
+        con = socket.connect("tcp://max-exfl016:5555")
+
+        if not really:
+            #  Fake socket for testing, just logs what would have been sent via ZMQ
+            socket = lambda: None
+            socket.send = lambda x: progress.console.log(
+                f"mock `zmq.REQ` socket send: {x}"
+            )
+            socket.recv = lambda: "mock `zmq.REQ` socket response"
+
+        last_run_no = list(requests.keys())[-1]
+
+        for run_no, run_id in requests.items():
+            args = (
+                "correct",
+                str(run_id),
+                "_",
+                str(instrument),
+                str(cycle),
+                f"{proposal_no:06d}",
+                str(run_no),
+                "-",
+            )
+            msg = f"""['{"','".join(args)}']""".encode()
+            progress.console.log(args)
+            socket.send(msg)
+
+            progress.update(
+                task_submission,
+                advance=1,
+                description=description.format(
+                    run_str=f"[bold yellow]r{run_no:03d}[{run_id:06d}]"
+                ),
+            )
+
+            res = socket.recv()
+            progress.console.log(res)
+
+            if run_no != last_run_no:
+                progress.console.log(f"sleeping for {delay}s")
+                time.sleep(delay)
+            else:
+                progress.update(task_submission, description="[green]Done")
+
+
+if __name__ == "__main__":
+    args = vars(parser.parse_args())
+
+    slices = []
+    for s in args["slices"]:
+        slice_split = tuple(map(lambda x: int(x) if x else None, s.split(":")))
+        sep = None
+        if len(slice_split) == 1:
+            start, stop = slice_split[0], slice_split[0]
+        elif len(slice_split) == 2:
+            start, stop = slice_split
+        else:
+            start, stop, sep = slice_split
+
+        # Python slice indices are 0-based, but we want to be 1-based
+        if start is None or start == 0:
+            start = 1
+
+        if stop:
+            stop = stop + 1 if stop != -1 else stop
+
+        slices.append(slice(start, stop, sep))
+
+    pre_checks()
 
-con = zmq.Context()
-socket = con.socket(zmq.REQ)
-con = socket.connect("tcp://max-exfl017:5555")
-
-action = 'dark_request'
-dark_run_id = '258'
-sase = 'sase1'
-instrument = 'CALLAB'
-cycle = '202031'
-proposal = '900113'
-detector_id = 'SPB_DET_AGIPD1M-1'
-pdu_physical_names = '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]'  # noqa
-pdu_karabo_das = '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]'  # noqa
-operation_mode = 'FIXED_GAIN'
-run_numbers = '[9985,]'
-
-
-data = [action, dark_run_id, sase, instrument, cycle, proposal, detector_id,
-        operation_mode, *pdu_physical_names, *pdu_karabo_das, run_numbers]
-stuff = [action, dark_run_id, sase, instrument, cycle, proposal, 'SPB_DET_AGIPD1M-1', 'ADAPTIVE_GAIN', '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]', '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]', '[9992', ' 9991', ' 9990]']
-
-socket.send(str(stuff).encode())
-resp = socket.recv_multipart()[0]
-print(resp.decode())
+    main(
+        args["proposal"],
+        slices,
+        args["delay"],
+        args["noconfirm"],
+        args["really"],
+    )