Skip to content
Snippets Groups Projects

Reproducibility, step 1

Merged Thomas Kluyver requested to merge feat/reproducibility into master
1 file
+ 78
63
Compare changes
  • Side-by-side
  • Inline
@@ -90,7 +90,7 @@ def make_initial_parser(**kwargs):
help="Use vector graphics for figures in the report.")
parser.add_argument('--slurm-mem', type=int, default=500,
help="Requestet node RAM in GB")
help="Requested node RAM in GB")
parser.add_argument('--slurm-name', type=str, default='xfel_calibrate',
help='Name of slurm job')
@@ -649,63 +649,70 @@ def save_executed_command(run_tmp_path, version):
finfile.write(' '.join(sys.argv))
def get_slurm_partition_or_reservation(args) -> List[str]:
"""Return sbatch arguments to use a partition or reservation
--reservation and --slurm-partition options have precedence.
Otherwise, if --priority is <=1, it will use a configured reservation
depending on how many nodes are currently free.
"""
ureservation = args['reservation']
upartition = args['slurm_partition']
priority = args['priority']
relevant_resv = reservation_char if priority <= 0 else reservation
if ureservation:
return ['--reservation', ureservation]
elif upartition:
return ['--partition', upartition]
elif (priority <= 1) and relevant_resv:
# Use a reservation if there aren't many general nodes available to us
free = int(check_output(free_nodes_cmd, shell=True).decode('utf8'))
preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8'))
if free + preempt < max_reserved:
return ['--reservation', relevant_resv]
# Fallback to using the configured partition (default: exfel)
return ['--partition', sprof]
def get_launcher_command(args, temp_path, after_ok=(), after_any=()) -> List[str]:
"""
Return a slurm launcher command
:param args: Command line arguments
:param temp_path: Temporary path to run job
:param after_ok: A list of jobs which must succeed first
:param after_any: A list of jobs which must finish first, but may fail
:return: List of commands and parameters to be used by subprocess
"""
launcher_slurm = launcher_command.format(temp_path=temp_path).split()
launcher_slurm += get_slurm_partition_or_reservation(args)
job_name = args.get('slurm_name', 'xfel_calibrate')
launcher_slurm += ["--job-name", job_name]
if args.get('slurm_priority'):
launcher_slurm += ["--nice", args.get('slurm_priority')]
launcher_slurm.append("--mem={}G".format(args.get('slurm_mem', '500')))
deps = []
if after_ok:
deps.append("afterok:" + ":".join(str(j) for j in after_ok))
if after_any:
deps.append("afterany:" + ":".join(str(j) for j in after_any))
if deps:
launcher_slurm.append("--dependency=" + ",".join(deps))
return launcher_slurm
class SlurmOptions:
def __init__(
self, job_name=None, nice=None, mem=None, partition=None, reservation=None,
priority_group=2,
):
self.job_name = job_name or 'xfel_calibrate'
self.nice = nice
self.mem = mem
self.partition = partition
self.reservation = reservation
self.priority_group = priority_group
def get_partition_or_reservation(self) -> List[str]:
"""Return sbatch arguments to use a partition or reservation
--reservation and --slurm-partition options have precedence.
Otherwise, if --priority is <=1, it will use a configured reservation
depending on how many nodes are currently free.
"""
if self.reservation:
return ['--reservation', self.reservation]
elif self.partition:
return ['--partition', self.partition]
elif self.priority_group <= 1:
auto_resvn = reservation_char if self.priority_group <= 0 else reservation
# Use a reservation if there aren't many general nodes available to us
free = int(check_output(free_nodes_cmd, shell=True).decode('utf8'))
preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8'))
if free + preempt < max_reserved:
return ['--reservation', auto_resvn]
# Fallback to using the configured partition (default: exfel)
return ['--partition', sprof]
def get_launcher_command(self, temp_path, after_ok=(), after_any=()) -> List[str]:
"""
Return a slurm launcher command
:param args: Command line arguments
:param temp_path: Temporary path to run job
:param after_ok: A list of jobs which must succeed first
:param after_any: A list of jobs which must finish first, but may fail
:return: List of commands and parameters to be used by subprocess
"""
launcher_slurm = launcher_command.format(temp_path=temp_path).split()
launcher_slurm += self.get_partition_or_reservation()
launcher_slurm += ["--job-name", self.job_name]
if self.nice:
launcher_slurm += ["--nice", self.nice]
if self.mem:
launcher_slurm.append(f"--mem={self.mem}G")
deps = []
if after_ok:
deps.append("afterok:" + ":".join(str(j) for j in after_ok))
if after_any:
deps.append("afterany:" + ":".join(str(j) for j in after_any))
if deps:
launcher_slurm.append("--dependency=" + ",".join(deps))
return launcher_slurm
def remove_duplications(l) -> list:
@@ -783,8 +790,8 @@ class JobArgs:
def run_direct(self, run_tmp_path, python) -> int:
return call(self.format_cmd(python), cwd=run_tmp_path)
def submit_job(self, run_tmp_path, python, args, after_ok=(), after_any=()):
cmd = get_launcher_command(args, run_tmp_path, after_ok, after_any)
def submit_job(self, run_tmp_path, python, slurm_opts, after_ok=(), after_any=()):
cmd = slurm_opts.get_launcher_command(run_tmp_path, after_ok, after_any)
cmd += self.format_cmd(python)
output = check_output(cmd, cwd=run_tmp_path).decode('utf-8')
return output.partition(';')[0].strip() # job ID
@@ -825,14 +832,14 @@ class JobGroup: # TODO: Naming - task? request? commission?
}
}
def submit_jobs(self, args):
def submit_jobs(self, slurm_opts: SlurmOptions):
all_job_ids = []
dep_job_ids = []
for step in self.steps:
step_job_ids = []
kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
for job_desc in step.jobs:
jid = job_desc.submit_job(self.run_tmp_path, self.python, args, **kw)
jid = job_desc.submit_job(self.run_tmp_path, self.python, slurm_opts, **kw)
step_job_ids.append(jid)
dep_job_ids = step_job_ids
all_job_ids.extend(step_job_ids)
@@ -1107,7+1114,7 @@
if cvals:
# in case default needs to be used for function call
args[concurrency_par] = cvals
callargs = [args[arg] for arg in sig.parameters]
cvals = f(*callargs)
print(f"Split concurrency into {cvals}")
@@ -1157,7+1164,7 @@
joblist = []
else:
print("Submitting jobs to Slurm...")
joblist = job_group.submit_jobs(args)
joblist = job_group.submit_jobs(SlurmOptions(
job_name=args.get('slurm_name', 'xfel_calibrate'),
nice=args['slurm_scheduling'],
mem=args['slurm_mem'],
reservation=args['reservation'],
partition=args['slurm_partition'],
priority_group=args['priority'],
))
errors = False
fmt_args = {'run_path': run_tmp_path,
Loading