From 4e26f4dffee067c49d2f3fb2b8869af0d4c3045e Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Wed, 18 Aug 2021 11:51:38 +0100 Subject: [PATCH] Make class to store Slurm related options --- src/xfel_calibrate/calibrate.py | 141 ++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 63 deletions(-) diff --git a/src/xfel_calibrate/calibrate.py b/src/xfel_calibrate/calibrate.py index 1aa2896f1..24563c6c4 100755 --- a/src/xfel_calibrate/calibrate.py +++ b/src/xfel_calibrate/calibrate.py @@ -90,7 +90,7 @@ def make_initial_parser(**kwargs): help="Use vector graphics for figures in the report.") parser.add_argument('--slurm-mem', type=int, default=500, - help="Requestet node RAM in GB") + help="Requested node RAM in GB") parser.add_argument('--slurm-name', type=str, default='xfel_calibrate', help='Name of slurm job') @@ -649,63 +649,70 @@ def save_executed_command(run_tmp_path, version): finfile.write(' '.join(sys.argv)) -def get_slurm_partition_or_reservation(args) -> List[str]: - """Return sbatch arguments to use a partition or reservation - - --reservation and --slurm-partition options have precedence. - Otherwise, if --priority is <=1, it will use a configured reservation - depending on how many nodes are currently free. - """ - ureservation = args['reservation'] - upartition = args['slurm_partition'] - priority = args['priority'] - relevant_resv = reservation_char if priority <= 0 else reservation - - if ureservation: - return ['--reservation', ureservation] - elif upartition: - return ['--partition', upartition] - elif (priority <= 1) and relevant_resv: - # Use a reservation if there aren't many general nodes available to us - free = int(check_output(free_nodes_cmd, shell=True).decode('utf8')) - preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8')) - if free + preempt < max_reserved: - return ['--reservation', relevant_resv] - - # Fallback to using the configured partition (default: exfel) - return ['--partition', sprof] - - -def get_launcher_command(args, temp_path, after_ok=(), after_any=()) -> List[str]: - """ - Return a slurm launcher command - :param args: Command line arguments - :param temp_path: Temporary path to run job - :param after_ok: A list of jobs which must succeed first - :param after_any: A list of jobs which must finish first, but may fail - :return: List of commands and parameters to be used by subprocess - """ - - launcher_slurm = launcher_command.format(temp_path=temp_path).split() - - launcher_slurm += get_slurm_partition_or_reservation(args) - - job_name = args.get('slurm_name', 'xfel_calibrate') - launcher_slurm += ["--job-name", job_name] - - if args.get('slurm_priority'): - launcher_slurm += ["--nice", args.get('slurm_priority')] - - launcher_slurm.append("--mem={}G".format(args.get('slurm_mem', '500'))) - - deps = [] - if after_ok: - deps.append("afterok:" + ":".join(str(j) for j in after_ok)) - if after_any: - deps.append("afterany:" + ":".join(str(j) for j in after_any)) - if deps: - launcher_slurm.append("--dependency=" + ",".join(deps)) - return launcher_slurm +class SlurmOptions: + def __init__( + self, job_name=None, nice=None, mem=None, partition=None, reservation=None, + priority_group=2, + ): + self.job_name = job_name or 'xfel_calibrate' + self.nice = nice + self.mem = mem + self.partition = partition + self.reservation = reservation + self.priority_group = priority_group + + def get_partition_or_reservation(self) -> List[str]: + """Return sbatch arguments to use a partition or reservation + + --reservation and --slurm-partition options have precedence. + Otherwise, if --priority is <=1, it will use a configured reservation + depending on how many nodes are currently free. + """ + if self.reservation: + return ['--reservation', self.reservation] + elif self.partition: + return ['--partition', self.partition] + elif self.priority_group <= 1: + auto_resvn = reservation_char if self.priority_group <= 0 else reservation + # Use a reservation if there aren't many general nodes available to us + free = int(check_output(free_nodes_cmd, shell=True).decode('utf8')) + preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8')) + if free + preempt < max_reserved: + return ['--reservation', auto_resvn] + + # Fallback to using the configured partition (default: exfel) + return ['--partition', sprof] + + def get_launcher_command(self, temp_path, after_ok=(), after_any=()) -> List[str]: + """ + Return a slurm launcher command + :param args: Command line arguments + :param temp_path: Temporary path to run job + :param after_ok: A list of jobs which must succeed first + :param after_any: A list of jobs which must finish first, but may fail + :return: List of commands and parameters to be used by subprocess + """ + + launcher_slurm = launcher_command.format(temp_path=temp_path).split() + + launcher_slurm += self.get_partition_or_reservation() + + launcher_slurm += ["--job-name", self.job_name] + + if self.nice: + launcher_slurm += ["--nice", self.nice] + + if self.mem: + launcher_slurm.append(f"--mem={self.mem}G") + + deps = [] + if after_ok: + deps.append("afterok:" + ":".join(str(j) for j in after_ok)) + if after_any: + deps.append("afterany:" + ":".join(str(j) for j in after_any)) + if deps: + launcher_slurm.append("--dependency=" + ",".join(deps)) + return launcher_slurm def remove_duplications(l) -> list: @@ -783,8 +790,8 @@ class JobArgs: def run_direct(self, run_tmp_path, python) -> int: return call(self.format_cmd(python), cwd=run_tmp_path) - def submit_job(self, run_tmp_path, python, args, after_ok=(), after_any=()): - cmd = get_launcher_command(args, run_tmp_path, after_ok, after_any) + def submit_job(self, run_tmp_path, python, slurm_opts, after_ok=(), after_any=()): + cmd = slurm_opts.get_launcher_command(run_tmp_path, after_ok, after_any) cmd += self.format_cmd(python) output = check_output(cmd, cwd=run_tmp_path).decode('utf-8') return output.partition(';')[0].strip() # job ID @@ -825,14 +832,14 @@ class JobGroup: # TODO: Naming - task? request? commission? } } - def submit_jobs(self, args): + def submit_jobs(self, slurm_opts: SlurmOptions): all_job_ids = [] dep_job_ids = [] for step in self.steps: step_job_ids = [] kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids} for job_desc in step.jobs: - jid = job_desc.submit_job(self.run_tmp_path, self.python, args, **kw) + jid = job_desc.submit_job(self.run_tmp_path, self.python, slurm_opts, **kw) step_job_ids.append(jid) dep_job_ids = step_job_ids all_job_ids.extend(step_job_ids) @@ -1157,7 +1164,15 @@ def run(): joblist = [] else: print("Submitting jobs to Slurm...") - joblist = job_group.submit_jobs(args) + + joblist = job_group.submit_jobs(SlurmOptions( + job_name=args.get('slurm_name', 'xfel_calibrate'), + nice=args['slurm_scheduling'], + mem=args['slurm_mem'], + reservation=args['reservation'], + partition=args['slurm_partition'], + priority_group=args['priority'], + )) errors = False fmt_args = {'run_path': run_tmp_path, -- GitLab