diff --git a/tests/test_webservice.py b/tests/test_webservice.py index b53b17f78c959ced708f1a4461da0819847f0c4a..4c8caa7b1431cfb6cb69c0807b411cbdc3c2cd6d 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -14,6 +14,7 @@ from webservice.webservice import ( # noqa: import not at top of file run_action, wait_on_transfer, get_slurm_partition, + get_slurm_nice ) @@ -177,3 +178,37 @@ async def test_get_slurm_partition(proposal_number, ret = await get_slurm_partition(client, action, proposal_number) assert ret == expected_result + +@pytest.mark.asyncio +@pytest.mark.parametrize( + 'cycle, num_jobs, expected_result', + [ + ('202201', 0, 0), ('202201', 10, 3*10**2), # user proposal + ('202221', 0, 5), ('202221', 10, 5+3*10**2), # commissioning + ] +) +async def test_get_slurm_nice_values(fp, cycle, num_jobs, expected_result): + """ Test get_slurm_nice values.""" + + fp.register( + ['squeue', '-h', '-o', '%.20j', '-p', 'upex-higher', '--me'], + stdout='\n'.join( + [f'correct_SPB_{i}' for i in range(num_jobs)] + + [f'correct_FXE_{i}' for i in range(num_jobs*2)]).encode('ascii'), + returncode=0) + + ret = await get_slurm_nice( + 'upex-higher', 'SPB', cycle, job_penalty=3, commissioning_penalty=5) + assert ret == expected_result + +@pytest.mark.asyncio +async def test_get_slurm_nice_fails(fp): + """Test corner cases for get_slurm_nice.""" + + # non-zero returncode + fp.register( + ['squeue', '-h', '-o', '%.20j', '-p', 'upex-higher', '--me'], + stdout='', returncode=1) + + assert await get_slurm_nice('upex-higher', 'SPB', '202201') == 0 + diff --git a/webservice/webservice.py b/webservice/webservice.py index aa1274969d75b9d46060472d49412abf9738c73c..5fde4f4bb75e3c508863b744f96bef2b6f59e1cb 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -669,6 +669,52 @@ async def get_slurm_partition(mdc: MetadataClient, return partition +async def get_slurm_nice(partition: str, instrument: str, + cycle: Union[int, str], job_penalty: int = 2, + commissioning_penalty: int = 1250) -> int: + """Compute priority adjustment based on cycle and number of running + jobs. + + The nice value is computed with + base_penalty + job_penalty * num_jobs**2 + + base_penalty is 0 for user proposals and commissioning_penalty + for commissioning proposals. The number of jobs is computed by + calling `squeue` and counting based on job name. + + The default penalty values give commissioning proposals a + penalty of 25 running jobs. + + :param partition: Partition to run jobs in. + :param instrument: Instrument to run jobs for. + :param cycle: Cycle of proposal to run jobs for. + :param job_penalty: Scaling factor per job, 2 by default. + :param commissioning_penalty: Base penalty for commissioning, + 1250 by default. + :return: Nice value to be passed to sbatch --nice + """ + + # List all names for jobs running in the specified partition. + returncode, job_names = await run_proc_async( + ['squeue', '-h', '-o', '%.20j', '-p', partition, '--me']) + + if returncode != 0: + logging.error(f'Non-zero return code {returncode} from ' + f'`squeue` upon counting number of jobs') + return 0 # Fallback if something went wrong. + + # Base value depending on proposal type using cycle, assuming that + # user proposals follow the pattern xxxx0y, while different kinds of + # commissioning proposals use xxxx2y or xxxx3y. + base_nice = 0 if str(cycle)[4] == '0' else commissioning_penalty + + # Count number of jobs + num_jobs = sum((1 for job in job_names.decode('ascii').split('\n') + if f'correct_{instrument}' in job)) + + return base_nice + num_jobs**2 * job_penalty + + async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, out_path: str, report_path: str): """Update data paths in MyMDC to provide Globus access @@ -1209,6 +1255,7 @@ class ActionsServer: ret = [] partition = await get_slurm_partition(self.mdc, action, proposal) + nice = await get_slurm_nice(partition, instrument, cycle) # run xfel_calibrate for karabo_id, dconfig in detectors.items(): @@ -1216,7 +1263,7 @@ class ActionsServer: del dconfig['detector-type'] cmd = self.config[action]['cmd'].format( detector=detector, - sched_prio=str(self.config[action]['sched-prio']), + sched_prio=nice, partition=partition, action=action, instrument=instrument, cycle=cycle, proposal=proposal,