From 41b5164aa33348d5eb0fd94d0baf141749d50978 Mon Sep 17 00:00:00 2001 From: Robert Rosca <robert.rosca@xfel.eu> Date: Thu, 16 Mar 2023 10:26:39 +0100 Subject: [PATCH] Add script to submit slices of runs for a proposal --- setup.py | 2 + webservice/README.md | 31 ++++ webservice/manual_launch.py | 313 +++++++++++++++++++++++++++++++++--- 3 files changed, 322 insertions(+), 24 deletions(-) diff --git a/setup.py b/setup.py index 3a6d9f43c..b2b0f3f66 100644 --- a/setup.py +++ b/setup.py @@ -102,6 +102,8 @@ install_requires = [ "traitlets==4.3.3", "xarray==2022.3.0", "EXtra-redu==0.0.7", + "rich==12.6.0", + "httpx==0.23.0", ] if "readthedocs.org" not in sys.executable: diff --git a/webservice/README.md b/webservice/README.md index 1bced8a95..90a0bdc09 100644 --- a/webservice/README.md +++ b/webservice/README.md @@ -189,3 +189,34 @@ status in myMdC should update as the processing occurs. The command ``squeue -u xcaltst`` will show running & pending Slurm jobs started by this test system. + +Manually Submitting Jobs +------------------------ + +A script `manual_launch.py` is provided to manually submit jobs to the service. + +```bash +usage: manual_launch.py [-h] --proposal PROPOSAL [--delay DELAY] [--noconfirm] [--really] slices [slices ...] + +Manually submit calibration jobs. + +positional arguments: + slices slices (or single numbers) of runs to process, inclusive range, starting at 1 (e.g. 1:3 parsed to {1, 2, 3}, 10 parsed to {10}, :10 + parsed to {1, 2, ..., 10}) + +optional arguments: + -h, --help show this help message and exit + --proposal PROPOSAL proposal number + --delay DELAY delay in seconds between submissions + --noconfirm skip confirmation + --really actually submit jobs instead of just printing them + +To run in the background use `nohup PYTHONUNBUFFERED=1 python manual_launch.py ... &` followed by `disown`. +``` + +Slices inclusive, so `1:10` would mean runs 1 to 10 inclusive of 1 and 10. The +'slice' can also be a single number. + +Example of usage would be `python3 ./manual_launch.py 1 10:12 160:-1 --delay 60 +--proposal 2222 --really` to submit runs 1, 10 to 12, and 160+ for calibration, +for proposal 2222, with a 60 second delay between submissions. diff --git a/webservice/manual_launch.py b/webservice/manual_launch.py index 4753bbdaf..154e190b6 100644 --- a/webservice/manual_launch.py +++ b/webservice/manual_launch.py @@ -1,26 +1,291 @@ +from __future__ import annotations + +import argparse +import datetime as dt +import time +from contextlib import contextmanager +from pathlib import Path +from typing import Generator, Optional + import zmq +from config import webservice as config +from httpx import Client, Response +from rich import print +from rich.progress import ( + MofNCompleteColumn, + Progress, + SpinnerColumn, + TextColumn, + TimeElapsedColumn, +) +from rich.prompt import Prompt + +parser = argparse.ArgumentParser( + description="Manually submit calibration jobs.", + epilog="""To run in the background use `nohup PYTHONUNBUFFERED=1 python + manual_launch.py ... &` followed by `disown`.""", +) + +parser.add_argument( + "slices", + type=str, + nargs="+", + help="""slices (or single numbers) of runs to process, inclusive range, starting at + 1 (e.g. 1:3 parsed to {1, 2, 3}, 10 parsed to {10}, :10 parsed to {1, 2, ..., + 10})""", +) +parser.add_argument( + "--proposal", + type=int, + help="proposal number", + required=True, +) +parser.add_argument( + "--delay", + default=30, + type=int, + help="delay in seconds between submissions", + required=False, +) +parser.add_argument( + "--noconfirm", + action="store_true", + help="skip confirmation", +) +parser.add_argument( + "--really", + action="store_true", + help="actually submit jobs instead of just printing them", +) + + +BEARER = { + "access_token": "", + "expires_at": dt.datetime.now(), +} + + +def pre_checks(): + # Fail fast if we don't have the required configs set + required_keys = ["token-url", "user-id", "user-secret", "user-email"] + for k in required_keys: + if config["metadata-client"][k] is None: + print( + f"Missing key [bold red]`{k}`[/bold red] in metadata client configuration" + ) + print("[bold red]Aborted[/bold red]") + exit(1) + + +def get_bearer_token() -> str: + if BEARER["access_token"] and BEARER["expires_at"] > dt.datetime.now(): + return BEARER["access_token"] + + with Client() as client: + response = client.post( + f"{config['metadata-client']['token-url']}", + data={ + "grant_type": "client_credentials", + "client_id": config["metadata-client"]["user-id"], + "client_secret": config["metadata-client"]["user-secret"], + }, + ) + + data = response.json() + + if any(k not in data for k in ["access_token", "expires_in"]): + print( + "Response from MyMdC missing required fields, check webservice `user-id`" + f"and `user-secret`. Response: {data=}", + ) + raise ValueError("Invalid response from MyMdC") + + expires_in = dt.timedelta(seconds=data["expires_in"]) + + BEARER["access_token"] = data["access_token"] + BEARER["expires_at"] = dt.datetime.now() + expires_in + + return BEARER["access_token"] + + +@contextmanager +def get_client() -> Generator[Client, None, None]: + bearer_token = get_bearer_token() + + with Client() as client: + headers = { + "accept": "application/json; version=1", + "X-User-Email": config["metadata-client"]["user-email"], + "Authorization": f"Bearer {bearer_token}", + } + + client.headers.update(headers) + + yield client + + +def _get_runs_by_proposal(number: int, client: Client, page: int = 1) -> Response: + return client.get( + f"{config['metadata-client']['base-api-url']}/runs/runs_by_proposal", + params={"proposal_number": number, "page": page}, + timeout=10, + ) + + +def get_runs_by_proposal_all(number: int) -> list[dict]: + with get_client() as client: + res = _get_runs_by_proposal(number, client, 1) + if res.status_code != 200: + raise ValueError(res.url, res.text) + runs = res.json() + for page in range(2, int(res.headers.get("x-total-pages", 1)) + 1): + _ = _get_runs_by_proposal(number, client, page) + runs.extend(_.json()) + + return runs + + +def main( + proposal_no: int, + slices: list[slice], + delay: int, + noconfirm: Optional[bool] = False, + really: Optional[bool] = False, +): + with Progress(transient=True) as progress: + task_submission = progress.add_task( + "[yellow]Querying FS for proposal information", total=None + ) + exp = Path("/gpfs/exfel/exp") + proposal_paths = list(exp.glob(f"*/*/p{proposal_no:06d}")) + if len(proposal_paths) != 1: + raise ValueError(f"Proposal {proposal_no} not found") + + proposal_path = proposal_paths[0] + instrument = proposal_path.parts[4] + cycle = proposal_path.parts[5] + + progress.update(task_submission, description="[yellow]Querying MyMdC for runs") + + all_runs = get_runs_by_proposal_all(proposal_no) + + run_no_id_map = {run["run_number"]: run["id"] for run in all_runs} + max_run_no = max(run_no_id_map.keys()) + requested_ranges = [range(*s.indices(max_run_no)) for s in slices] + requested_run_nos = {run_no for r in requested_ranges for run_no in r} + + requests = dict( + sorted( + { + run_no: run_no_id_map[run_no] + for run_no in requested_run_nos + if run_no in run_no_id_map + }.items() + ) + ) + + if missing_run_ids := set(requested_run_nos) - set(run_no_id_map.keys()): + print( + f"[bold red]Missing run IDs for run number(s) {missing_run_ids}[/bold red]" + ) + + if not really: + print("[yellow]`--really` flag missing, not submitting jobs") + + if not noconfirm and not Prompt.ask( + f"Submit [red bold]{len(requests)}[/red bold] jobs for proposal " + f"[bold]{proposal_no}[/bold]? [y/[bold]n[/bold]]", + default=False, + ): + print("[bold red]Aborted[/bold red]") + exit(1) + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + MofNCompleteColumn(), + TimeElapsedColumn(), + ) as progress: + description = f"[green]Submitted request for p{proposal_no:05d}/{{run_str}} " + task_submission = progress.add_task( + f"{description}r---[------]", total=len(requests) + ) + con = zmq.Context() + socket = con.socket(zmq.REQ) + con = socket.connect("tcp://max-exfl016:5555") + + if not really: + # Fake socket for testing, just logs what would have been sent via ZMQ + socket = lambda: None + socket.send = lambda x: progress.console.log( + f"mock `zmq.REQ` socket send: {x}" + ) + socket.recv = lambda: "mock `zmq.REQ` socket response" + + last_run_no = list(requests.keys())[-1] + + for run_no, run_id in requests.items(): + args = ( + "correct", + str(run_id), + "_", + str(instrument), + str(cycle), + f"{proposal_no:06d}", + str(run_no), + "-", + ) + msg = f"""['{"','".join(args)}']""".encode() + progress.console.log(args) + socket.send(msg) + + progress.update( + task_submission, + advance=1, + description=description.format( + run_str=f"[bold yellow]r{run_no:03d}[{run_id:06d}]" + ), + ) + + res = socket.recv() + progress.console.log(res) + + if run_no != last_run_no: + progress.console.log(f"sleeping for {delay}s") + time.sleep(delay) + else: + progress.update(task_submission, description="[green]Done") + + +if __name__ == "__main__": + args = vars(parser.parse_args()) + + slices = [] + for s in args["slices"]: + slice_split = tuple(map(lambda x: int(x) if x else None, s.split(":"))) + sep = None + if len(slice_split) == 1: + start, stop = slice_split[0], slice_split[0] + elif len(slice_split) == 2: + start, stop = slice_split + else: + start, stop, sep = slice_split + + # Python slice indices are 0-based, but we want to be 1-based + if start is None or start == 0: + start = 1 + + if stop: + stop = stop + 1 if stop != -1 else stop + + slices.append(slice(start, stop, sep)) + + pre_checks() -con = zmq.Context() -socket = con.socket(zmq.REQ) -con = socket.connect("tcp://max-exfl017:5555") - -action = 'dark_request' -dark_run_id = '258' -sase = 'sase1' -instrument = 'CALLAB' -cycle = '202031' -proposal = '900113' -detector_id = 'SPB_DET_AGIPD1M-1' -pdu_physical_names = '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]' # noqa -pdu_karabo_das = '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]' # noqa -operation_mode = 'FIXED_GAIN' -run_numbers = '[9985,]' - - -data = [action, dark_run_id, sase, instrument, cycle, proposal, detector_id, - operation_mode, *pdu_physical_names, *pdu_karabo_das, run_numbers] -stuff = [action, dark_run_id, sase, instrument, cycle, proposal, 'SPB_DET_AGIPD1M-1', 'ADAPTIVE_GAIN', '["AGIPD00 (Q1M1)"', '"AGIPD01 (Q1M2)"', '"AGIPD02 (Q1M3)"', '"AGIPD03 (Q1M4)"', '"AGIPD04 (Q2M1)"', '"AGIPD05 (Q2M2)"', '"AGIPD06 (Q2M3)"', '"AGIPD07 (Q2M4)"', '"AGIPD08 (Q3M1)"', '"AGIPD09 (Q3M2)"', '"AGIPD10 (Q3M3)"', '"AGIPD11 (Q3M4)"', '"AGIPD12 (Q4M1)"', '"AGIPD13 (Q4M2)"', '"AGIPD14 (Q4M3)"', '"AGIPD15 (Q4M4)"]', '["AGIPD00"', ' "AGIPD01"', ' "AGIPD02"', ' "AGIPD03"', ' "AGIPD04"', ' "AGIPD05"', ' "AGIPD06"', ' "AGIPD07"', ' "AGIPD08"', ' "AGIPD09"', ' "AGIPD10"', ' "AGIPD11"', ' "AGIPD12"', ' "AGIPD13"', ' "AGIPD14"', ' "AGIPD15"]', '[9992', ' 9991', ' 9990]'] - -socket.send(str(stuff).encode()) -resp = socket.recv_multipart()[0] -print(resp.decode()) + main( + args["proposal"], + slices, + args["delay"], + args["noconfirm"], + args["really"], + ) -- GitLab