diff --git a/tests/test_webservice.py b/tests/test_webservice.py index b53b17f78c959ced708f1a4461da0819847f0c4a..29dbfb2ab6f6ddf8f89432391c57682daa2e5211 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -14,6 +14,7 @@ from webservice.webservice import ( # noqa: import not at top of file run_action, wait_on_transfer, get_slurm_partition, + get_slurm_nice ) @@ -143,13 +144,15 @@ async def test_wait_on_transfer_exceptions( ('sim', ['DARK', '1', '2', '3', '4'], 1, "success: simulated"), ], ) -async def test_run_action(mode, cmd, retcode, expected): +async def test_run_action(mode, cmd, retcode, expected, monkeypatch): job_db = mock.Mock() async def mock_run_proc_async(*args): return retcode, b'Submitted job: 42' - webservice.webservice.run_proc_async = mock_run_proc_async + monkeypatch.setattr( + webservice.webservice, 'run_proc_async', mock_run_proc_async + ) ret = await run_action(job_db, cmd, mode, 1, 1, 1) assert ret.lower().startswith(expected) @@ -177,3 +180,44 @@ async def test_get_slurm_partition(proposal_number, ret = await get_slurm_partition(client, action, proposal_number) assert ret == expected_result + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'cycle, num_jobs, expected_result', + [ + ('202201', 0, 0), ('202201', 10, 3*10**2), # user proposal + ('202221', 0, 5), ('202221', 10, 5+3*10**2), # commissioning + ] +) +async def test_get_slurm_nice_values(fp, cycle, num_jobs, expected_result): + """ Test get_slurm_nice values.""" + + fp.register( + ['squeue', '-h', '-o', '%.20j', '-p', 'upex-higher', '--me'], + stdout='\n'.join( + [f'correct_SPB_{i}' for i in range(num_jobs)] + + [f'correct_FXE_{i}' for i in range(num_jobs*2)]).encode('ascii'), + returncode=0) + + ret = await get_slurm_nice( + 'upex-higher', 'SPB', cycle, job_penalty=3, commissioning_penalty=5) + assert ret == expected_result + +@pytest.mark.asyncio +async def test_get_slurm_nice_fails(fp): + """Test corner cases for get_slurm_nice.""" + + # non-zero returncode + fp.register( + ['squeue', '-h', '-o', '%.20j', '-p', 'upex-higher', '--me'], + stdout='', returncode=1) + + assert await get_slurm_nice('upex-higher', 'SPB', '202201') == 0 + + # exfel is special + fp.register( + ['squeue', '-h', '-o', '%.20j', '-p', 'exfel', '--me'], + stdout='\n'.join([f'correct_SPB_{i}' for i in range(10)]), + returncode=0) + + assert await get_slurm_nice('exfel', 'SPB', '202201') == 0 diff --git a/webservice/config/webservice.yaml b/webservice/config/webservice.yaml index 7a764b0fad1807ae1cd3dcd72a41c1962a47e32e..c91b4b923f70df2982f996d0206d367c0a70c37e 100644 --- a/webservice/config/webservice.yaml +++ b/webservice/config/webservice.yaml @@ -31,7 +31,8 @@ kafka: correct: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/d/proc/{instrument}/{cycle}/p{proposal}/{run} - sched-prio: 80 + commissioning-penalty: 1250 + job-penalty: 2 cmd : >- python -m xfel_calibrate.calibrate {detector} CORRECT --slurm-scheduling {sched_prio} @@ -45,7 +46,8 @@ correct: dark: in-folder: /gpfs/exfel/exp/{instrument}/{cycle}/p{proposal}/raw out-folder: /gpfs/exfel/u/usr/{instrument}/{cycle}/p{proposal}/dark/runs_{runs} - sched-prio: 10 + commissioning-penalty: 1250 + job-penalty: 2 cmd: >- python -m xfel_calibrate.calibrate {detector} DARK --concurrency-par karabo_da diff --git a/webservice/webservice.py b/webservice/webservice.py index aa1274969d75b9d46060472d49412abf9738c73c..21e28df6a323525f19340e4be611e8c46e854dc8 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -669,6 +669,55 @@ async def get_slurm_partition(mdc: MetadataClient, return partition +async def get_slurm_nice(partition: str, instrument: str, + cycle: Union[int, str], job_penalty: int = 2, + commissioning_penalty: int = 1250) -> int: + """Compute priority adjustment based on cycle and number of running + jobs. + + The nice value is computed with + base_penalty + job_penalty * num_jobs**2 + + base_penalty is 0 for user proposals and commissioning_penalty + for commissioning proposals. The number of jobs is computed by + calling `squeue` and counting based on job name. + + The default penalty values give commissioning proposals a + penalty of 25 running jobs. + + :param partition: Partition to run jobs in. + :param instrument: Instrument to run jobs for. + :param cycle: Cycle of proposal to run jobs for. + :param job_penalty: Scaling factor per job, 2 by default. + :param commissioning_penalty: Base penalty for commissioning, + 1250 by default. + :return: Nice value to be passed to sbatch --nice + """ + + if partition == 'exfel': + return 0 # Don't apply degressive priority on exfel. + + # List all names for jobs running in the specified partition. + returncode, job_names = await run_proc_async( + ['squeue', '-h', '-o', '%.20j', '-p', partition, '--me']) + + if returncode != 0: + logging.error(f'Non-zero return code {returncode} from ' + f'`squeue` upon counting number of jobs') + return 0 # Fallback if something went wrong. + + # Base value depending on proposal type using cycle, assuming that + # user proposals follow the pattern xxxx0y, while different kinds of + # commissioning proposals use xxxx2y or xxxx3y. + base_nice = 0 if str(cycle)[4] == '0' else commissioning_penalty + + # Count number of jobs + num_jobs = sum((1 for job in job_names.decode('ascii').split('\n') + if f'correct_{instrument}' in job)) + + return base_nice + num_jobs**2 * job_penalty + + async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, out_path: str, report_path: str): """Update data paths in MyMDC to provide Globus access @@ -1209,6 +1258,10 @@ class ActionsServer: ret = [] partition = await get_slurm_partition(self.mdc, action, proposal) + nice = await get_slurm_nice( + partition, instrument, cycle, + commissioning_penalty=self.config[action]['commissioning-penalty'], + job_penalty=self.config[action]['job-penalty']) # run xfel_calibrate for karabo_id, dconfig in detectors.items(): @@ -1216,7 +1269,7 @@ class ActionsServer: del dconfig['detector-type'] cmd = self.config[action]['cmd'].format( detector=detector, - sched_prio=str(self.config[action]['sched-prio']), + sched_prio=nice, partition=partition, action=action, instrument=instrument, cycle=cycle, proposal=proposal,