diff --git a/tests/test_webservice.py b/tests/test_webservice.py index a61c7e12be7ac6515b7105fc1ee9699449f39afd..97bb33f381f8effb9e72bfac497b34e4c72391fa 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -1,5 +1,7 @@ +from collections import namedtuple import logging import sys +import datetime as dt from pathlib import Path from unittest import mock @@ -15,7 +17,8 @@ from webservice.webservice import ( # noqa: import not at top of file run_action, wait_on_transfer, get_slurm_partition, - get_slurm_nice + get_slurm_nice, + config, ) @@ -263,6 +266,48 @@ async def test_get_slurm_partition(proposal_number, ret = await get_slurm_partition(client, action, proposal_number) assert ret == expected_result + +@pytest.mark.parametrize( + 'commissioning, run_age_days, expected_partition', + [ + (False, 0, "upex-middle"), + (True, 0, "upex-middle"), + (False, config["correct"]["commissioning-max-age-days"], "upex-middle"), + (True, config["correct"]["commissioning-max-age-days"], "upex-middle"), + (False, config["correct"]["commissioning-max-age-days"]+1, "upex-middle"), + (True, config["correct"]["commissioning-max-age-days"]+1, "exfel"), + ] +) +@pytest.mark.asyncio +async def test_get_slurm_partition_run_age( + commissioning: bool, + run_age_days: int, + expected_partition: str, +): + # NOTE: non-zero at cycle index 4 (`str(cycle)[4]`) indicates commissioning + cycle = 202231 if commissioning else 202201 + + now = dt.datetime.now().astimezone() + run_dt = now - dt.timedelta(days=run_age_days) + run_str = run_dt.strftime("%Y-%m-%dT%H:%M:%S.%f%z") + + # Note that, as the fields are independent, this mocks both the + # response for `get_proposal_by_number_api` AND `get_run_by_id_api` + response = mock.Mock() + response.status_code = 200 + response.json = lambda: {"flg_beamtime_status": "A", "begin_at": run_str} + + client = mock.Mock() + client.get_proposal_by_number_api = mock.Mock(return_value=response) + client.get_run_by_id_api = mock.Mock(return_value=response) + + ret = await get_slurm_partition( + client, "correct", 1, run_id=1, cycle=cycle + ) + + assert ret == expected_partition + + @pytest.mark.asyncio @pytest.mark.parametrize( 'cycle, num_jobs, expected_result', @@ -303,3 +348,4 @@ async def test_get_slurm_nice_fails(fp): returncode=0) assert await get_slurm_nice('exfel', 'SPB', '202201') == 0 + diff --git a/webservice/config/webservice.yaml b/webservice/config/webservice.yaml index a992376fa75035d0179c55a435be8e321e7a2ecb..f57b6431f3c78aebb539f4b9727a458985b4eaa3 100644 --- a/webservice/config/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -1,61 +1,63 @@ +--- config-repo: - url: "@note add this to secrets file" - local-path: "@format {env[HOME]}/calibration_config" + url: "@note add this to secrets file" + local-path: "@format {env[HOME]}/calibration_config" web-service: - port: 5555 - bind-to: tcp://* - allowed-ips: - job-db: "@format {this.webservice_dir}/webservice_jobs.sqlite" - job-update-interval: 60 - job-timeout: 3600 + port: 5555 + bind-to: tcp://* + allowed-ips: + job-db: "@format {this.webservice_dir}/webservice_jobs.sqlite" + job-update-interval: 60 + job-timeout: 3600 metadata-client: - user-id: "@note add this to secrets file" - user-secret: "@note add this to secrets file" - user-email: "@note add this to secrets file" - metadata-web-app-url: "https://in.xfel.eu/metadata" - token-url: "https://in.xfel.eu/metadata/oauth/token" - refresh-url: "https://in.xfel.eu/metadata/oauth/token" - auth-url: "https://in.xfel.eu/metadata/oauth/authorize" - scope: "" - base-api-url: "https://in.xfel.eu/metadata/api/" + user-id: "@note add this to secrets file" + user-secret: "@note add this to secrets file" + user-email: "@note add this to secrets file" + metadata-web-app-url: "https://in.xfel.eu/metadata" + token-url: "https://in.xfel.eu/metadata/oauth/token" + refresh-url: "https://in.xfel.eu/metadata/oauth/token" + auth-url: "https://in.xfel.eu/metadata/oauth/authorize" + scope: "" + base-api-url: "https://in.xfel.eu/metadata/api/" kafka: - brokers: + brokers: - it-kafka-broker01.desy.de - it-kafka-broker02.desy.de - it-kafka-broker03.desy.de - topic: xfel-test-offline-cal + topic: xfel-test-offline-cal correct: - in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw - out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} - reports-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/ - commissioning-penalty: 1250 - job-penalty: 2 - cmd : >- - python -m xfel_calibrate.calibrate {detector} CORRECT - --slurm-scheduling {sched_prio} - --slurm-partition {partition} - --request-time {request_time} - --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} - --report-to /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp} - --cal-db-timeout 300000 - --cal-db-interface tcp://max-exfl016:8015#8044 + in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw + out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} + reports-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/ + commissioning-penalty: 1250 + commissioning-max-age-days: 3 + job-penalty: 2 + cmd: >- + python -m xfel_calibrate.calibrate {detector} CORRECT + --slurm-scheduling {sched_prio} + --slurm-partition {partition} + --request-time {request_time} + --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} + --report-to /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/usr/Reports/{runs}/{det_instance}_{action}_{proposal}_{runs}_{time_stamp} + --cal-db-timeout 300000 + --cal-db-interface tcp://max-exfl016:8015#8044 dark: - in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw - out-folder: /gpfs/exfel/u/usr/{instrument}/{cycle}/p{proposal}/dark/runs_{runs} - commissioning-penalty: 1250 - job-penalty: 2 - cmd: >- - python -m xfel_calibrate.calibrate {detector} DARK - --concurrency-par karabo_da - --slurm-scheduling {sched_prio} - --slurm-partition {partition} - --request-time {request_time} - --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} - --report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp} - --cal-db-interface tcp://max-exfl016:8015#8044 - --db-output + in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw + out-folder: /gpfs/exfel/u/usr/{instrument}/{cycle}/p{proposal}/dark/runs_{runs} + commissioning-penalty: 1250 + job-penalty: 2 + cmd: >- + python -m xfel_calibrate.calibrate {detector} DARK + --concurrency-par karabo_da + --slurm-scheduling {sched_prio} + --slurm-partition {partition} + --request-time {request_time} + --slurm-name {action}_{instrument}_{detector}_{cycle}_p{proposal}_{runs} + --report-to /gpfs/exfel/d/cal/caldb_store/xfel/reports/{instrument}/{det_instance}/{action}/{action}_{proposal}_{runs}_{time_stamp} + --cal-db-interface tcp://max-exfl016:8015#8044 + --db-output diff --git a/webservice/webservice.py b/webservice/webservice.py index b79128fbe4a7bfa08acd079fb84b847624d756e2..e727eb5d5407f2b6c8e97ba1111dfa7b5238b388 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -1,6 +1,7 @@ import argparse import ast import asyncio +import contextlib import copy import glob import inspect @@ -505,9 +506,14 @@ def check_files(in_folder: str, files_exists = False return files_exists -async def get_slurm_partition(mdc: MetadataClient, - action: str, - proposal_number: Union[int, str]) -> str: + +async def get_slurm_partition( + mdc: MetadataClient, + action: str, + proposal_number: Union[int, str], + run_id: Optional[int] = None, + cycle: Optional[int] = None +) -> str: """Check MyMDC for the proposal status and select the appropriate slurm partition. @@ -524,24 +530,74 @@ async def get_slurm_partition(mdc: MetadataClient, """ # See # https://git.xfel.eu/ITDM/metadata_catalog/-/blob/develop/app/models/proposal.rb - # for possible proposals states. + # for possible proposals states. Relevant ones for us are: + # | Flag | Name | Description | + # |------|--------------|----------------------------------------| + # | R | Ready | Users created, proposal structure | + # | | | initialised on GPFS | + # |------|--------------|----------------------------------------| + # | A | Active | Data currently being acquired | + # |------|--------------|----------------------------------------| + # | I | Inactive | Proposal still scheduled, data not | + # | | | currently being acquired | + # |------|--------------|----------------------------------------| + # | F | Finished | Beamtime is over, no more new runs | + # |------|--------------|----------------------------------------| + # | FR | Finished and | Local instrument contact reviewed | + # | | Reviewed | details after initial finish flag set | + # |------|--------------|----------------------------------------| + # | CLO | Closed | Data fully migrated to dCACHE | + # |------|--------------|----------------------------------------| loop = get_event_loop() - response = await shield(loop.run_in_executor(None, - mdc.get_proposal_by_number_api, - proposal_number)) + + response = await shield( + loop.run_in_executor( + None, + mdc.get_proposal_by_number_api, + proposal_number + ) + ) + if response.status_code != 200: - logging.error(f'Failed to check MDC for proposal "{proposal_number}" ' - 'status. ASSUMING CLOSED') + logging.error( + f'Failed to check MDC for proposal "{proposal_number}" ' + 'status. ASSUMING CLOSED' + ) logging.error(Errors.MDC_RESPONSE.format(response)) partition = 'exfel' - status = response.json().get('flg_beamtime_status', 'whoopsie') - if status in ['R', 'A']: + status_beamtime = response.json().get('flg_beamtime_status', 'whoopsie') + + if status_beamtime in ['R', 'A']: partition = 'upex-high' if action == 'dark' else 'upex-middle' - logging.debug(f"Using {partition} for {proposal_number} because {status}") + # NOTE: non-zero at cycle index 4 (`str(cycle)[4]`) indicates commissioning + if run_id and cycle and str(cycle)[4] != '0': + response_run = await shield( + loop.run_in_executor( + None, + mdc.get_run_by_id_api, + run_id + ) + ) + + if (begin_at := response_run.json().get("begin_at", None)): + with contextlib.suppress(ValueError): + run_begin = datetime.strptime(begin_at, "%Y-%m-%dT%H:%M:%S.%f%z") + run_age = (datetime.now().astimezone() - run_begin) + max_age_days = config[action]["commissioning-max-age-days"] + if run_age.days > max_age_days: + partition = 'exfel' + logging.debug( + f"{run_age.days} > {max_age_days}, set partition " + "to {partition}" + ) + + logging.debug( + f"Using {partition} for {proposal_number} because {status_beamtime}" + ) return partition @@ -913,22 +969,25 @@ class ActionsServer: logging.warning(f'Skipping disabled detector {karabo_id}') del detectors[karabo_id] - if len(detectors) == 0: + if not detectors: msg = Errors.NOTHING_TO_DO.format(rpath) logging.warning(msg) await update_mdc_status(self.mdc, 'correct', rid, msg) return ret, _ = await self.launch_jobs( - [runnr], req_id, detectors, 'correct', instrument, cycle, proposal, - request_time, + [runnr], req_id, detectors, 'correct', instrument, cycle, + proposal, request_time, rid ) await update_mdc_status(self.mdc, 'correct', rid, ret) loop = get_event_loop() await loop.run_in_executor( - None, self.mdc.update_run_api, - rid, {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()} + None, + self.mdc.update_run_api, + rid, + {'cal_last_begin_at': datetime.now(tz=timezone.utc).isoformat()} ) + # END of part to run after sending reply asyncio.ensure_future(_continue()) @@ -1246,13 +1305,16 @@ class ActionsServer: return yaml.load(f.read(), Loader=yaml.FullLoader) async def launch_jobs( - self, run_nrs, req_id, detectors, action, instrument, cycle, proposal, - request_time + self, run_nrs, req_id, detectors, action, instrument, cycle, + proposal, request_time, run_id: Optional[int] = None ) -> Tuple[str, List[str]]: report = [] ret = [] - partition = await get_slurm_partition(self.mdc, action, proposal) + partition = await get_slurm_partition( + self.mdc, action, proposal, run_id, cycle + ) + nice = await get_slurm_nice( partition, instrument, cycle, commissioning_penalty=self.config[action]['commissioning-penalty'],