diff --git a/tests/test_webservice.py b/tests/test_webservice.py index 6e1a335a634462ce548c05c2358de62609597ff9..b53b17f78c959ced708f1a4461da0819847f0c4a 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -13,6 +13,7 @@ from webservice.webservice import ( # noqa: import not at top of file parse_config, run_action, wait_on_transfer, + get_slurm_partition, ) @@ -151,3 +152,28 @@ async def test_run_action(mode, cmd, retcode, expected): webservice.webservice.run_proc_async = mock_run_proc_async ret = await run_action(job_db, cmd, mode, 1, 1, 1) assert ret.lower().startswith(expected) + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'proposal_number, action, mock_proposal_status, expected_result', + [ + (42, 'correct', 'A', 'upex-middle'), # active + ('42', 'dark', 'R', 'upex-high'), # active + (404, 'correct', 'FR', 'exfel'), # finished and reviewed + (404, 'dark', 'CLO', 'exfel'), # closed + ], +) +async def test_get_slurm_partition(proposal_number, + action, + mock_proposal_status, + expected_result): + + response = mock.Mock() + response.status_code = 200 + response.json = mock.Mock(return_value={'flg_beamtime_status': mock_proposal_status}) + client = mock.Mock() + client.get_proposal_by_number_api = mock.Mock( + return_value=response) + + ret = await get_slurm_partition(client, action, proposal_number) + assert ret == expected_result diff --git a/webservice/config/webservice.yaml b/webservice/config/webservice.yaml index 26d74e07c8bc2643b9f179afe7c2deec48e8946d..7a764b0fad1807ae1cd3dcd72a41c1962a47e32e 100644 --- a/webservice/config/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -35,7 +35,7 @@ correct: cmd : >- python -m xfel_calibrate.calibrate {detector} CORRECT --slurm-scheduling {sched_prio} - --slurm-partition upex-middle + --slurm-partition {partition} --request-time {request_time} --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} --report-to /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp} @@ -50,7 +50,7 @@ dark: python -m xfel_calibrate.calibrate {detector} DARK --concurrency-par karabo_da --slurm-scheduling {sched_prio} - --slurm-partition upex-high + --slurm-partition {partition} --request-time {request_time} --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} --report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp} diff --git a/webservice/webservice.py b/webservice/webservice.py index b67f5dc48637b4a01e9eb5a0a880170a94fed99a..16ab4efa45dd37cfe6a5ea6d94747788378e3ee6 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -19,7 +19,7 @@ from datetime import datetime, timezone from pathlib import Path from subprocess import PIPE, run from threading import Thread -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple, Union import requests import yaml @@ -628,6 +628,46 @@ def check_files(in_folder: str, files_exists = False return files_exists +async def get_slurm_partition(mdc: MetadataClient, + action: str, + proposal_number: Union[int, str]) -> str: + """Check MyMDC for the proposal status and select the appropriate slurm + partition. + + The partition is either upex-high (for darks) or upex-middle (for + corrections) if the proposal is 'R'eady or 'A'ctive. + In other cases, the jobs default to the exfel partition. + + :param mdc: an authenticated MyMDC client + :param action: the type of action ('correct' or 'dark') + :param proposal_number: the proposal number + :return: 'exfel' on closed proposals, + 'upex-high' if dark request on active proposals, + 'upex-middle' if correct request on active proposals. + """ + # See + # https://git.xfel.eu/ITDM/metadata_catalog/-/blob/develop/app/models/proposal.rb + # for possible proposals states. + + loop = get_event_loop() + response = await shield(loop.run_in_executor(None, + mdc.get_proposal_by_number_api, + proposal_number)) + if response.status_code != 200: + logging.error(f'Failed to check MDC for proposal "{proposal_number}" ' + 'status. ASSUMING CLOSED') + logging.error(Errors.MDC_RESPONSE.format(response)) + + partition = 'exfel' + status = response.json().get('flg_beamtime_status', 'whoopsie') + + if status in ['R', 'A']: + partition = 'upex-high' if action == 'dark' else 'upex-middle' + + logging.debug(f"Using {partition} for {proposal_number} because {status}") + + return partition + async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, out_path: str, report_path: str): @@ -1159,6 +1199,9 @@ class ActionsServer: ) -> Tuple[str, List[str]]: report = [] ret = [] + + partition = await get_slurm_partition(self.mdc, action, proposal) + # run xfel_calibrate for karabo_id, dconfig in detectors.items(): detector = dconfig['detector-type'] @@ -1166,6 +1209,7 @@ class ActionsServer: cmd = self.config[action]['cmd'].format( detector=detector, sched_prio=str(self.config[action]['sched-prio']), + partition=partition, action=action, instrument=instrument, cycle=cycle, proposal=proposal, runs="_".join([f"r{r}" for r in run_nrs]),