diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index 77b3f5190d26a0391c99ab41dd6d3eac6b351e93..09bc6978ec39be88d822faffd2c186af53ca5e2c 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -60,6 +60,15 @@ def make_initial_parser(): parser.add_argument('--vector-figs', action="store_true", default=False, help="Use vector graphics for figures in the report.") + parser.add_argument('--slurm-mem', type=int, default=500, + help="Requestet node RAM in GB") + + parser.add_argument('--slurm-name', type=str, default='xfel_calibrate', + help='Name of slurm job') + + parser.add_argument('--slurm-priority', type=int, default=0, + help='Change priority of srurm job +- 2147483645 (negative value increases priority)') + parser.add_argument_group('required arguments') parser.add_argument('--reservation', type=str, default="") @@ -467,10 +476,56 @@ def set_figure_format(nb, enable_vector_format): cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n" +def get_launcher_command(args, temp_path, dependent, job_list): + """ + Return a slurm launcher command + :param args: Command line arguments + :param temp_path: Temporary path to run job + :param dependent: True if job is dependent + :param job_list: A list of dependent jobs + :return: List of commands and parameters to be used by subprocess + """ + + launcher_slurm = launcher_command.format(temp_path=temp_path) + + # calculate number of general nodes available + free = int(check_output(free_nodes_cmd, shell=True).decode('utf8')) + preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8')) + ureservation = args['reservation'] + priority = args['priority'] + + if (ureservation == "" and + (free + preempt >= max_reserved or + priority > 1 or + reservation == "")): + launcher_slurm += " --partition {}".format(sprof) + else: + this_res = reservation if priority == 1 else reservation_char + if ureservation != "": + this_res = ureservation + launcher_slurm += " --reservation={}".format(this_res) + + job_name = args.get('slurm_name', 'xfel_calibrate') + launcher_slurm += " --job-name {}".format(job_name) + + if args.get('slurm_priority'): + launcher_slurm += " --nice={}".format(args.get('slurm_priority')) + + launcher_slurm += " --mem {}G".format(args.get('slurm_mem', '500')) + + if dependent: + srun_dep = "--dependency=afterok" + for jobid in job_list: + srun_dep += ":{}".format(jobid) + launcher_slurm += [srun_dep] + + return launcher_slurm.split() + + def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, final_job=False, job_list=[], fmtcmd="", cluster_cores=8, - sequential=False, priority=2, dependent=False, - show_title=True, ureservation=""): + sequential=False, dependent=False, + show_title=True): """ Launch a concurrent job on the cluster via SLURM """ @@ -510,32 +565,12 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, "python3 -c {}\n".format(fmtcmd.format(joblist=job_list))) all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH os.chmod("{}/finalize.sh".format(temp_path), all_stats) + # then run an sbatch job + srun_base = [] if not sequential: - # calculate number of general nodes available - free = int(check_output(free_nodes_cmd, shell=True).decode('utf8')) - preempt = int(check_output( - preempt_nodes_cmd, shell=True).decode('utf8')) - if ureservation == "" and (free + preempt >= max_reserved or priority > 1 or reservation == ""): - srun_base = launcher_command.format( - temp_path=temp_path) + " -p {}".format(sprof) - srun_base = srun_base.split() - else: - this_res = reservation if priority == 1 else reservation_char - if ureservation != "": - this_res = ureservation - srun_base = launcher_command.format( - temp_path=temp_path) + " --reservation={}".format(this_res) - srun_base = srun_base.split() - print(" ".join(srun_base)) - else: - srun_base = [] - - if dependent: - srun_dep = "--dependency=afterok" - for jobid in job_list: - srun_dep += ":{}".format(jobid) - srun_base += [srun_dep] + srun_base = get_launcher_command(args, temp_path, dependent, job_list) + print(" ".join(srun_base)) srun_base += [os.path.abspath("{}/bin/slurm_calibrate.sh".format(os.path.dirname(__file__))), # path to helper sh os.path.abspath(nbpath), # path to notebook @@ -637,8 +672,6 @@ def run(): detector = args["detector"].upper() caltype = args["type"].upper() sequential = args["no_cluster_job"] - priority = int(args['priority']) - ureservation = args['reservation'] if sequential: print("Not running on cluster") @@ -757,8 +790,7 @@ def run(): cluster_cores = concurrency.get("cluster cores", 8) jobid = concurrent_run(run_tmp_path, nb, os.path.basename(notebook), args, final_job=True, job_list=joblist, fmtcmd=fmtcmd, - cluster_cores=cluster_cores, sequential=sequential, - priority=priority, ureservation=ureservation) + cluster_cores=cluster_cores, sequential=sequential) joblist.append(jobid) else: cvar = concurrency["parameter"] @@ -826,8 +858,8 @@ def run(): cnum == len(list(cvals)) - 1 and len(dep_notebooks) == 0, joblist, fmtcmd, - cluster_cores=cluster_cores, sequential=sequential, priority=priority, - show_title=show_title, ureservation=ureservation) + cluster_cores=cluster_cores, sequential=sequential, + show_title=show_title) joblist.append(jobid) # Run dependent notebooks @@ -842,8 +874,8 @@ def run(): final_job=final_job, job_list=joblist, fmtcmd=fmtcmd, cluster_cores=cluster_cores, - sequential=sequential, priority=priority, - dependent=True, ureservation=ureservation) + sequential=sequential, + dependent=True) joblist.append(jobid) if not all([j is None for j in joblist]): diff --git a/xfel_calibrate/settings.py b/xfel_calibrate/settings.py index b086c70d7722f268bfe94351eac3072c30ad891c..33924ac125467fd35cb93e882c0d68d619894af9 100644 --- a/xfel_calibrate/settings.py +++ b/xfel_calibrate/settings.py @@ -29,7 +29,7 @@ logo_path = "xfel.pdf" # the command to run this concurrently. It is prepended to the actual call sprof = os.environ.get("XFELCALSLURM", "exfel") -launcher_command = "sbatch -t 24:00:00 --mem 500G --requeue --output {temp_path}/slurm-%j.out" +launcher_command = "sbatch -t 24:00:00 --requeue --output {temp_path}/slurm-%j.out" free_nodes_cmd = "sinfo -p exfel -t idle -N --noheader | wc -l" preempt_nodes_cmd = "squeue -p all,grid --noheader | grep max-exfl | egrep -v 'max-exfl18[3-8]|max-exfl100|max-exflg' | wc -l" max_reserved = 8