Skip to content
Snippets Groups Projects
Commit e1d107d9 authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Launch jobs with nice value scaling degressively with number of already running jobs

parent ce4b0607
No related branches found
No related tags found
No related merge requests found
...@@ -14,6 +14,7 @@ from webservice.webservice import ( # noqa: import not at top of file ...@@ -14,6 +14,7 @@ from webservice.webservice import ( # noqa: import not at top of file
run_action, run_action,
wait_on_transfer, wait_on_transfer,
get_slurm_partition, get_slurm_partition,
get_slurm_nice
) )
...@@ -177,3 +178,23 @@ async def test_get_slurm_partition(proposal_number, ...@@ -177,3 +178,23 @@ async def test_get_slurm_partition(proposal_number,
ret = await get_slurm_partition(client, action, proposal_number) ret = await get_slurm_partition(client, action, proposal_number)
assert ret == expected_result assert ret == expected_result
@pytest.mark.asyncio
@pytest.mark.parametrize(
'cycle, num_jobs, expected_result',
[
('202201', 0, 0), ('202201', 10, 3*10**2), # user proposal
('202221', 0, 5), ('202221', 10, 5+3*10**2), # commissioning
]
)
async def test_get_slurm_nice(fake_process, cycle, num_jobs, expected_result):
fake_process.register(
['squeue', '-h', '-o', '%.20j', '-p', 'exfel', '--me'],
stdout=b'\n'.join(
[f'correct_SPB_{i}'.encode('ascii') for i in range(num_jobs)] +
[f'correct_FXE_{i}'.encode('ascii') for i in range(num_jobs*2)]))
ret = await get_slurm_nice(
'exfel', 'SPB', cycle, job_penalty=3, commissioning_penalty=5)
assert ret == expected_result
...@@ -669,6 +669,50 @@ async def get_slurm_partition(mdc: MetadataClient, ...@@ -669,6 +669,50 @@ async def get_slurm_partition(mdc: MetadataClient,
return partition return partition
async def get_slurm_nice(partition: str, instrument: str,
cycle: Union[int, str], job_penalty: int = 2,
commissioning_penalty: int = 1250) -> int:
"""Compute priority adjustment based on cycle and number of running
jobs.
The nice value is computed with
base_penalty + job_penalty * num_jobs**2
base_penalty is 0 for user proposals and commissioning_penalty
for commissioning proposals. The number of jobs is computed by
calling `squeue` and counting based on job name.
The default penalty values give commissioning proposals a
penalty of 25 running jobs.
:param partition: Partition to run jobs in.
:param instrument: Instrument to run jobs for.
:param cycle: Cycle of proposal to run jobs for.
:param job_penalty: Scaling factor per job, 2 by default.
:param commissioning_penalty: Base penalty for commissioning,
1250 by default.
:return: Nice value to be passed to sbatch --nice
"""
# List all names for jobs running in the specified partition.
code, job_names = await run_proc_async(
['squeue', '-h', '-o', '%.20j', '-p', partition, '--me'])
if code != 0:
return 0 # Fallback if something went wrong.
# Base value depending on proposal type using cycle, assuming that
# user proposals follow the pattern xxxx0y, while different kinds of
# commissioning proposals use xxxx2y or xxxx3y.
base_nice = 0 if str(cycle)[4] == '0' else commissioning_penalty
# Count number of jobs
num_jobs = sum((1 for job in job_names.decode('ascii').split('\n')
if f'correct_{instrument}' in job))
return base_nice + num_jobs**2 * job_penalty
async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str, async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str,
out_path: str, report_path: str): out_path: str, report_path: str):
"""Update data paths in MyMDC to provide Globus access """Update data paths in MyMDC to provide Globus access
...@@ -1209,6 +1253,7 @@ class ActionsServer: ...@@ -1209,6 +1253,7 @@ class ActionsServer:
ret = [] ret = []
partition = await get_slurm_partition(self.mdc, action, proposal) partition = await get_slurm_partition(self.mdc, action, proposal)
nice = await get_slurm_nice(partition, instrument, cycle)
# run xfel_calibrate # run xfel_calibrate
for karabo_id, dconfig in detectors.items(): for karabo_id, dconfig in detectors.items():
...@@ -1216,7 +1261,7 @@ class ActionsServer: ...@@ -1216,7 +1261,7 @@ class ActionsServer:
del dconfig['detector-type'] del dconfig['detector-type']
cmd = self.config[action]['cmd'].format( cmd = self.config[action]['cmd'].format(
detector=detector, detector=detector,
sched_prio=str(self.config[action]['sched-prio']), sched_prio=nice,
partition=partition, partition=partition,
action=action, instrument=instrument, action=action, instrument=instrument,
cycle=cycle, proposal=proposal, cycle=cycle, proposal=proposal,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment