From 123f88d2e964a6b88f7bef2640d4bccaba3c28f3 Mon Sep 17 00:00:00 2001
From: Philipp Schmidt <philipp.schmidt@xfel.eu>
Date: Mon, 4 Apr 2022 16:39:00 +0200
Subject: [PATCH] Launch jobs with nice value scaling degressively with number
 of already running jobs

---
 tests/test_webservice.py | 35 ++++++++++++++++++++++++++++
 webservice/webservice.py | 49 +++++++++++++++++++++++++++++++++++++++-
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git a/tests/test_webservice.py b/tests/test_webservice.py
index b53b17f78..4c8caa7b1 100644
--- a/tests/test_webservice.py
+++ b/tests/test_webservice.py
@@ -14,6 +14,7 @@ from webservice.webservice import (  # noqa: import not at top of file
     run_action,
     wait_on_transfer,
     get_slurm_partition,
+    get_slurm_nice
 )
 
 
@@ -177,3 +178,37 @@ async def test_get_slurm_partition(proposal_number,
 
     ret = await get_slurm_partition(client, action, proposal_number)
     assert ret == expected_result
+
+@pytest.mark.asyncio
+@pytest.mark.parametrize(
+    'cycle, num_jobs, expected_result',
+    [
+        ('202201', 0, 0), ('202201', 10, 3*10**2),  # user proposal
+        ('202221', 0, 5), ('202221', 10, 5+3*10**2),  # commissioning
+    ]
+)
+async def test_get_slurm_nice_values(fp, cycle, num_jobs, expected_result):
+    """ Test get_slurm_nice values."""
+
+    fp.register(
+        ['squeue', '-h', '-o', '%.20j', '-p', 'upex-higher', '--me'],
+        stdout='\n'.join(
+            [f'correct_SPB_{i}' for i in range(num_jobs)] +
+            [f'correct_FXE_{i}' for i in range(num_jobs*2)]).encode('ascii'),
+        returncode=0)
+
+    ret = await get_slurm_nice(
+        'upex-higher', 'SPB', cycle, job_penalty=3, commissioning_penalty=5)
+    assert ret == expected_result
+
+@pytest.mark.asyncio
+async def test_get_slurm_nice_fails(fp):
+    """Test corner cases for get_slurm_nice."""
+
+    # non-zero returncode
+    fp.register(
+        ['squeue', '-h', '-o', '%.20j', '-p', 'upex-higher', '--me'],
+        stdout='', returncode=1)
+
+    assert await get_slurm_nice('upex-higher', 'SPB', '202201') == 0
+
diff --git a/webservice/webservice.py b/webservice/webservice.py
index aa1274969..5fde4f4bb 100644
--- a/webservice/webservice.py
+++ b/webservice/webservice.py
@@ -669,6 +669,52 @@ async def get_slurm_partition(mdc: MetadataClient,
     return partition
 
 
+async def get_slurm_nice(partition: str, instrument: str,
+                         cycle: Union[int, str], job_penalty: int = 2,
+                         commissioning_penalty: int = 1250) -> int:
+    """Compute priority adjustment based on cycle and number of running
+       jobs.
+
+       The nice value is computed with
+           base_penalty + job_penalty * num_jobs**2
+
+       base_penalty is 0 for user proposals and commissioning_penalty
+       for commissioning proposals. The number of jobs is computed by
+       calling `squeue` and counting based on job name.
+
+       The default penalty values give commissioning proposals a
+       penalty of 25 running jobs.
+
+       :param partition: Partition to run jobs in.
+       :param instrument: Instrument to run jobs for.
+       :param cycle: Cycle of proposal to run jobs for.
+       :param job_penalty: Scaling factor per job, 2 by default.
+       :param commissioning_penalty: Base penalty for commissioning,
+           1250 by default.
+       :return: Nice value to be passed to sbatch --nice
+    """
+
+    # List all names for jobs running in the specified partition.
+    returncode, job_names = await run_proc_async(
+        ['squeue', '-h', '-o', '%.20j', '-p', partition, '--me'])
+
+    if returncode != 0:
+        logging.error(f'Non-zero return code {returncode} from '
+                      f'`squeue` upon counting number of jobs')
+        return 0  # Fallback if something went wrong.
+
+    # Base value depending on proposal type using cycle, assuming that
+    # user proposals follow the pattern xxxx0y, while different kinds of
+    # commissioning proposals use xxxx2y or xxxx3y.
+    base_nice = 0 if str(cycle)[4] == '0' else commissioning_penalty
+
+    # Count number of jobs
+    num_jobs = sum((1 for job in job_names.decode('ascii').split('\n')
+                    if f'correct_{instrument}' in job))
+
+    return base_nice + num_jobs**2 * job_penalty
+
+
 async def update_darks_paths(mdc: MetadataClient, rid: int, in_path: str,
                              out_path: str, report_path: str):
     """Update data paths in MyMDC to provide Globus access
@@ -1209,6 +1255,7 @@ class ActionsServer:
         ret = []
 
         partition = await get_slurm_partition(self.mdc, action, proposal)
+        nice = await get_slurm_nice(partition, instrument, cycle)
 
         # run xfel_calibrate
         for karabo_id, dconfig in detectors.items():
@@ -1216,7 +1263,7 @@ class ActionsServer:
             del dconfig['detector-type']
             cmd = self.config[action]['cmd'].format(
                 detector=detector,
-                sched_prio=str(self.config[action]['sched-prio']),
+                sched_prio=nice,
                 partition=partition,
                 action=action, instrument=instrument,
                 cycle=cycle, proposal=proposal,
-- 
GitLab