Skip to content
Snippets Groups Projects
Commit 41b5164a authored by Robert Rosca's avatar Robert Rosca
Browse files

Add script to submit slices of runs for a proposal

parent 4e31209f
No related branches found
No related tags found
1 merge request!756Add script to submit slices of runs for a proposal
......@@ -102,6 +102,8 @@ install_requires = [
"traitlets==4.3.3",
"xarray==2022.3.0",
"EXtra-redu==0.0.7",
"rich==12.6.0",
"httpx==0.23.0",
]
if "readthedocs.org" not in sys.executable:
......
......@@ -189,3 +189,34 @@ status in myMdC should update as the processing occurs.
The command ``squeue -u xcaltst`` will show running & pending Slurm jobs started
by this test system.
Manually Submitting Jobs
------------------------
A script `manual_launch.py` is provided to manually submit jobs to the service.
```bash
usage: manual_launch.py [-h] --proposal PROPOSAL [--delay DELAY] [--noconfirm] [--really] slices [slices ...]
Manually submit calibration jobs.
positional arguments:
slices slices (or single numbers) of runs to process, inclusive range, starting at 1 (e.g. 1:3 parsed to {1, 2, 3}, 10 parsed to {10}, :10
parsed to {1, 2, ..., 10})
optional arguments:
-h, --help show this help message and exit
--proposal PROPOSAL proposal number
--delay DELAY delay in seconds between submissions
--noconfirm skip confirmation
--really actually submit jobs instead of just printing them
To run in the background use `nohup PYTHONUNBUFFERED=1 python manual_launch.py ... &` followed by `disown`.
```
Slices inclusive, so `1:10` would mean runs 1 to 10 inclusive of 1 and 10. The
'slice' can also be a single number.
Example of usage would be `python3 ./manual_launch.py 1 10:12 160:-1 --delay 60
--proposal 2222 --really` to submit runs 1, 10 to 12, and 160+ for calibration,
for proposal 2222, with a 60 second delay between submissions.
from __future__ import annotations
import argparse
import datetime as dt
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Generator, Optional
import zmq
from config import webservice as config
from httpx import Client, Response
from rich import print
from rich.progress import (
MofNCompleteColumn,
Progress,
SpinnerColumn,
TextColumn,
TimeElapsedColumn,
)
from rich.prompt import Prompt
parser = argparse.ArgumentParser(
description="Manually submit calibration jobs.",
epilog="""To run in the background use `nohup PYTHONUNBUFFERED=1 python
manual_launch.py ... &` followed by `disown`.""",
)
parser.add_argument(
"slices",
type=str,
nargs="+",
help="""slices (or single numbers) of runs to process, inclusive range, starting at
1 (e.g. 1:3 parsed to {1, 2, 3}, 10 parsed to {10}, :10 parsed to {1, 2, ...,
10})""",
)
parser.add_argument(
"--proposal",
type=int,
help="proposal number",
required=True,
)
parser.add_argument(
"--delay",
default=30,
type=int,
help="delay in seconds between submissions",
required=False,
)
parser.add_argument(
"--noconfirm",
action="store_true",
help="skip confirmation",
)
parser.add_argument(
"--really",
action="store_true",
help="actually submit jobs instead of just printing them",
)
BEARER = {
"access_token": "",
"expires_at": dt.datetime.now(),
}
def pre_checks():
# Fail fast if we don't have the required configs set
required_keys = ["token-url", "user-id", "user-secret", "user-email"]
for k in required_keys:
if config["metadata-client"][k] is None:
print(
f"Missing key [bold red]`{k}`[/bold red] in metadata client configuration"
)
print("[bold red]Aborted[/bold red]")
exit(1)
def get_bearer_token() -> str:
if BEARER["access_token"] and BEARER["expires_at"] > dt.datetime.now():
return BEARER["access_token"]
with Client() as client:
response = client.post(
f"{config['metadata-client']['token-url']}",
data={
"grant_type": "client_credentials",
"client_id": config["metadata-client"]["user-id"],
"client_secret": config["metadata-client"]["user-secret"],
},
)
data = response.json()
if any(k not in data for k in ["access_token", "expires_in"]):
print(
"Response from MyMdC missing required fields, check webservice `user-id`"
f"and `user-secret`. Response: {data=}",
)
raise ValueError("Invalid response from MyMdC")
expires_in = dt.timedelta(seconds=data["expires_in"])
BEARER["access_token"] = data["access_token"]
BEARER["expires_at"] = dt.datetime.now() + expires_in
return BEARER["access_token"]
@contextmanager
def get_client() -> Generator[Client, None, None]:
bearer_token = get_bearer_token()
with Client() as client:
headers = {
"accept": "application/json; version=1",
"X-User-Email": config["metadata-client"]["user-email"],
"Authorization": f"Bearer {bearer_token}",
}
client.headers.update(headers)
yield client
def _get_runs_by_proposal(number: int, client: Client, page: int = 1) -> Response:
return client.get(
f"{config['metadata-client']['base-api-url']}/runs/runs_by_proposal",
params={"proposal_number": number, "page": page},
timeout=10,
)
def get_runs_by_proposal_all(number: int) -> list[dict]:
with get_client() as client:
res = _get_runs_by_proposal(number, client, 1)
if res.status_code != 200:
raise ValueError(res.url, res.text)
runs = res.json()
for page in range(2, int(res.headers.get("x-total-pages", 1)) + 1):
_ = _get_runs_by_proposal(number, client, page)
runs.extend(_.json())
return runs
def main(
proposal_no: int,
slices: list[slice],
delay: int,
noconfirm: Optional[bool] = False,
really: Optional[bool] = False,
):
with Progress(transient=True) as progress:
task_submission = progress.add_task(
"[yellow]Querying FS for proposal information", total=None
)
exp = Path("/gpfs/exfel/exp")
proposal_paths = list(exp.glob(f"*/*/p{proposal_no:06d}"))
if len(proposal_paths) != 1:
raise ValueError(f"Proposal {proposal_no} not found")
proposal_path = proposal_paths[0]
instrument = proposal_path.parts[4]
cycle = proposal_path.parts[5]
progress.update(task_submission, description="[yellow]Querying MyMdC for runs")
all_runs = get_runs_by_proposal_all(proposal_no)
run_no_id_map = {run["run_number"]: run["id"] for run in all_runs}
max_run_no = max(run_no_id_map.keys())
requested_ranges = [range(*s.indices(max_run_no)) for s in slices]
requested_run_nos = {run_no for r in requested_ranges for run_no in r}
requests = dict(
sorted(
{
run_no: run_no_id_map[run_no]
for run_no in requested_run_nos
if run_no in run_no_id_map
}.items()
)
)
if missing_run_ids := set(requested_run_nos) - set(run_no_id_map.keys()):
print(
f"[bold red]Missing run IDs for run number(s) {missing_run_ids}[/bold red]"
)
if not really:
print("[yellow]`--really` flag missing, not submitting jobs")
if not noconfirm and not Prompt.ask(
f"Submit [red bold]{len(requests)}[/red bold] jobs for proposal "
f"[bold]{proposal_no}[/bold]? [y/[bold]n[/bold]]",
default=False,
):
print("[bold red]Aborted[/bold red]")
exit(1)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
MofNCompleteColumn(),
TimeElapsedColumn(),
) as progress:
description = f"[green]Submitted request for p{proposal_no:05d}/{{run_str}} "
task_submission = progress.add_task(
f"{description}r---[------]", total=len(requests)
)
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect("tcp://max-exfl016:5555")
if not really:
# Fake socket for testing, just logs what would have been sent via ZMQ
socket = lambda: None
socket.send = lambda x: progress.console.log(
f"mock `zmq.REQ` socket send: {x}"
)
socket.recv = lambda: "mock `zmq.REQ` socket response"
last_run_no = list(requests.keys())[-1]
for run_no, run_id in requests.items():
args = (
"correct",
str(run_id),
"_",
str(instrument),
str(cycle),
f"{proposal_no:06d}",
str(run_no),
"-",
)
msg = f"""['{"','".join(args)}']""".encode()
progress.console.log(args)
socket.send(msg)
progress.update(
task_submission,
advance=1,
description=description.format(
run_str=f"[bold yellow]r{run_no:03d}[{run_id:06d}]"
),
)
res = socket.recv()
progress.console.log(res)
if run_no != last_run_no:
progress.console.log(f"sleeping for {delay}s")
time.sleep(delay)
else:
progress.update(task_submission, description="[green]Done")
if __name__ == "__main__":
args = vars(parser.parse_args())
slices = []
for s in args["slices"]:
slice_split = tuple(map(lambda x: int(x) if x else None, s.split(":")))
sep = None
if len(slice_split) == 1:
start, stop = slice_split[0], slice_split[0]
elif len(slice_split) == 2:
start, stop = slice_split
else:
start, stop, sep = slice_split
# Python slice indices are 0-based, but we want to be 1-based
if start is None or start == 0:
start = 1
if stop:
stop = stop + 1 if stop != -1 else stop
slices.append(slice(start, stop, sep))
pre_checks()
con = zmq.Context()
socket = con.socket(zmq.REQ)
con = socket.connect("tcp://max-exfl017:5555")
action = 'dark_request'
dark_run_id = '258'
sase = 'sase1'
instrument = 'CALLAB'
cycle = '202031'
proposal = '900113'
detector_id = 'SPB_DET_AGIPD1M-1'
pdu_physical_names = '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]' # noqa
pdu_karabo_das = '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]' # noqa
operation_mode = 'FIXED_GAIN'
run_numbers = '[9985,]'
data = [action, dark_run_id, sase, instrument, cycle, proposal, detector_id,
operation_mode, *pdu_physical_names, *pdu_karabo_das, run_numbers]
stuff = [action, dark_run_id, sase, instrument, cycle, proposal, 'SPB_DET_AGIPD1M-1', 'ADAPTIVE_GAIN', '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]', '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]', '[9992', ' 9991', ' 9990]']
socket.send(str(stuff).encode())
resp = socket.recv_multipart()[0]
print(resp.decode())
main(
args["proposal"],
slices,
args["delay"],
args["noconfirm"],
args["really"],
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment