Skip to content
Snippets Groups Projects
Commit 5392117d authored by Mikhail Karnevskiy's avatar Mikhail Karnevskiy
Browse files

Merge branch 'feat/expose_slurm' into 'master'

Feat: Expose some of slurm settings to the command line

See merge request detectors/pycalibration!211
parents 32c7a428 c7c883ca
No related branches found
No related tags found
1 merge request!211Feat: Expose some of slurm settings to the command line
...@@ -60,6 +60,15 @@ def make_initial_parser(): ...@@ -60,6 +60,15 @@ def make_initial_parser():
parser.add_argument('--vector-figs', action="store_true", default=False, parser.add_argument('--vector-figs', action="store_true", default=False,
help="Use vector graphics for figures in the report.") help="Use vector graphics for figures in the report.")
parser.add_argument('--slurm-mem', type=int, default=500,
help="Requestet node RAM in GB")
parser.add_argument('--slurm-name', type=str, default='xfel_calibrate',
help='Name of slurm job')
parser.add_argument('--slurm-priority', type=int, default=0,
help='Change priority of srurm job +- 2147483645 (negative value increases priority)')
parser.add_argument_group('required arguments') parser.add_argument_group('required arguments')
parser.add_argument('--reservation', type=str, default="") parser.add_argument('--reservation', type=str, default="")
...@@ -467,10 +476,56 @@ def set_figure_format(nb, enable_vector_format): ...@@ -467,10 +476,56 @@ def set_figure_format(nb, enable_vector_format):
cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n" cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n"
def get_launcher_command(args, temp_path, dependent, job_list):
"""
Return a slurm launcher command
:param args: Command line arguments
:param temp_path: Temporary path to run job
:param dependent: True if job is dependent
:param job_list: A list of dependent jobs
:return: List of commands and parameters to be used by subprocess
"""
launcher_slurm = launcher_command.format(temp_path=temp_path)
# calculate number of general nodes available
free = int(check_output(free_nodes_cmd, shell=True).decode('utf8'))
preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8'))
ureservation = args['reservation']
priority = args['priority']
if (ureservation == "" and
(free + preempt >= max_reserved or
priority > 1 or
reservation == "")):
launcher_slurm += " --partition {}".format(sprof)
else:
this_res = reservation if priority == 1 else reservation_char
if ureservation != "":
this_res = ureservation
launcher_slurm += " --reservation={}".format(this_res)
job_name = args.get('slurm_name', 'xfel_calibrate')
launcher_slurm += " --job-name {}".format(job_name)
if args.get('slurm_priority'):
launcher_slurm += " --nice={}".format(args.get('slurm_priority'))
launcher_slurm += " --mem {}G".format(args.get('slurm_mem', '500'))
if dependent:
srun_dep = "--dependency=afterok"
for jobid in job_list:
srun_dep += ":{}".format(jobid)
launcher_slurm += [srun_dep]
return launcher_slurm.split()
def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
final_job=False, job_list=[], fmtcmd="", cluster_cores=8, final_job=False, job_list=[], fmtcmd="", cluster_cores=8,
sequential=False, priority=2, dependent=False, sequential=False, dependent=False,
show_title=True, ureservation=""): show_title=True):
""" Launch a concurrent job on the cluster via SLURM """ Launch a concurrent job on the cluster via SLURM
""" """
...@@ -510,32 +565,12 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, ...@@ -510,32 +565,12 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
"python3 -c {}\n".format(fmtcmd.format(joblist=job_list))) "python3 -c {}\n".format(fmtcmd.format(joblist=job_list)))
all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
os.chmod("{}/finalize.sh".format(temp_path), all_stats) os.chmod("{}/finalize.sh".format(temp_path), all_stats)
# then run an sbatch job # then run an sbatch job
srun_base = []
if not sequential: if not sequential:
# calculate number of general nodes available srun_base = get_launcher_command(args, temp_path, dependent, job_list)
free = int(check_output(free_nodes_cmd, shell=True).decode('utf8')) print(" ".join(srun_base))
preempt = int(check_output(
preempt_nodes_cmd, shell=True).decode('utf8'))
if ureservation == "" and (free + preempt >= max_reserved or priority > 1 or reservation == ""):
srun_base = launcher_command.format(
temp_path=temp_path) + " -p {}".format(sprof)
srun_base = srun_base.split()
else:
this_res = reservation if priority == 1 else reservation_char
if ureservation != "":
this_res = ureservation
srun_base = launcher_command.format(
temp_path=temp_path) + " --reservation={}".format(this_res)
srun_base = srun_base.split()
print(" ".join(srun_base))
else:
srun_base = []
if dependent:
srun_dep = "--dependency=afterok"
for jobid in job_list:
srun_dep += ":{}".format(jobid)
srun_base += [srun_dep]
srun_base += [os.path.abspath("{}/bin/slurm_calibrate.sh".format(os.path.dirname(__file__))), # path to helper sh srun_base += [os.path.abspath("{}/bin/slurm_calibrate.sh".format(os.path.dirname(__file__))), # path to helper sh
os.path.abspath(nbpath), # path to notebook os.path.abspath(nbpath), # path to notebook
...@@ -637,8 +672,6 @@ def run(): ...@@ -637,8 +672,6 @@ def run():
detector = args["detector"].upper() detector = args["detector"].upper()
caltype = args["type"].upper() caltype = args["type"].upper()
sequential = args["no_cluster_job"] sequential = args["no_cluster_job"]
priority = int(args['priority'])
ureservation = args['reservation']
if sequential: if sequential:
print("Not running on cluster") print("Not running on cluster")
...@@ -757,8 +790,7 @@ def run(): ...@@ -757,8 +790,7 @@ def run():
cluster_cores = concurrency.get("cluster cores", 8) cluster_cores = concurrency.get("cluster cores", 8)
jobid = concurrent_run(run_tmp_path, nb, os.path.basename(notebook), args, jobid = concurrent_run(run_tmp_path, nb, os.path.basename(notebook), args,
final_job=True, job_list=joblist, fmtcmd=fmtcmd, final_job=True, job_list=joblist, fmtcmd=fmtcmd,
cluster_cores=cluster_cores, sequential=sequential, cluster_cores=cluster_cores, sequential=sequential)
priority=priority, ureservation=ureservation)
joblist.append(jobid) joblist.append(jobid)
else: else:
cvar = concurrency["parameter"] cvar = concurrency["parameter"]
...@@ -826,8 +858,8 @@ def run(): ...@@ -826,8 +858,8 @@ def run():
cnum == len(list(cvals)) - cnum == len(list(cvals)) -
1 and len(dep_notebooks) == 0, 1 and len(dep_notebooks) == 0,
joblist, fmtcmd, joblist, fmtcmd,
cluster_cores=cluster_cores, sequential=sequential, priority=priority, cluster_cores=cluster_cores, sequential=sequential,
show_title=show_title, ureservation=ureservation) show_title=show_title)
joblist.append(jobid) joblist.append(jobid)
# Run dependent notebooks # Run dependent notebooks
...@@ -842,8 +874,8 @@ def run(): ...@@ -842,8 +874,8 @@ def run():
final_job=final_job, final_job=final_job,
job_list=joblist, fmtcmd=fmtcmd, job_list=joblist, fmtcmd=fmtcmd,
cluster_cores=cluster_cores, cluster_cores=cluster_cores,
sequential=sequential, priority=priority, sequential=sequential,
dependent=True, ureservation=ureservation) dependent=True)
joblist.append(jobid) joblist.append(jobid)
if not all([j is None for j in joblist]): if not all([j is None for j in joblist]):
......
...@@ -29,7 +29,7 @@ logo_path = "xfel.pdf" ...@@ -29,7 +29,7 @@ logo_path = "xfel.pdf"
# the command to run this concurrently. It is prepended to the actual call # the command to run this concurrently. It is prepended to the actual call
sprof = os.environ.get("XFELCALSLURM", "exfel") sprof = os.environ.get("XFELCALSLURM", "exfel")
launcher_command = "sbatch -t 24:00:00 --mem 500G --requeue --output {temp_path}/slurm-%j.out" launcher_command = "sbatch -t 24:00:00 --requeue --output {temp_path}/slurm-%j.out"
free_nodes_cmd = "sinfo -p exfel -t idle -N --noheader | wc -l" free_nodes_cmd = "sinfo -p exfel -t idle -N --noheader | wc -l"
preempt_nodes_cmd = "squeue -p all,grid --noheader | grep max-exfl | egrep -v 'max-exfl18[3-8]|max-exfl100|max-exflg' | wc -l" preempt_nodes_cmd = "squeue -p all,grid --noheader | grep max-exfl | egrep -v 'max-exfl18[3-8]|max-exfl100|max-exflg' | wc -l"
max_reserved = 8 max_reserved = 8
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment