From adbbeff9b8f9b6fe4f9651ede405e1beea1b83b7 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Mon, 21 Sep 2020 11:56:35 +0100 Subject: [PATCH] Decide to start ipcluster per-notebook, not per calibration type --- xfel_calibrate/calibrate.py | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index 1f5453f0b..a3a94db0a 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -611,7 +611,7 @@ def remove_duplications(l): def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, final_job=False, job_list=[], fmt_args={}, cluster_cores=8, sequential=False, dep_jids=[], - show_title=True, cluster_profile='NO_CLUSTER'): + show_title=True): """ Launch a concurrent job on the cluster via SLURM """ @@ -619,12 +619,18 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, args[cparm] = cval suffix = flatten_list(cval) - if "cluster_profile" in args: - args["cluster_profile"] = "{}_{}".format(cluster_profile, suffix) # first convert the notebook parms = extract_parameters(nb) + + if has_parm(parms, "cluster_profile"): + cluster_profile = f"{args['cluster_profile']}_{suffix}" + else: + # Don't start ipcluster if there's no cluster_profile parameter + cluster_profile = 'NO_CLUSTER' + params = parameter_values(parms, **args) + params = parameter_values(params, cluster_profile=cluster_profile) new_nb = replace_definitions(nb, params, execute=False) if not show_title: first_markdown_cell(new_nb).source = '' @@ -651,7 +657,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, srun_base += [os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh os.path.abspath(nbpath), # path to notebook python_path, # path to python - args.get("cluster_profile", "NO_CLUSTER"), + cluster_profile, '"{}"'.format(base_name.upper()), '"{}"'.format(args["detector"].upper()), '"{}"'.format(args["type"].upper()), @@ -789,12 +795,9 @@ def run(): concurrency["parameter"]) warnings.warn(msg, RuntimeWarning) - cluster_profile = "NO_CLUSTER" - if not has_parm(parms, "cluster_profile"): - warnings.warn("Notebook has no cluster_profile parameter, " + - "running on cluster will likely fail!", RuntimeWarning) - elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): - cluster_profile = "slurm_prof_{}".format(run_uuid) + # If not explicitly specified, use a new profile for ipcluster + if args.get("cluster_profile") in {None, parser.get_default("cluster_profile")}: + args['cluster_profile'] = "slurm_prof_{}".format(run_uuid) # create a temporary output directory to work in run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}") @@ -854,7 +857,7 @@ def run(): job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, - cluster_profile=cluster_profile) + ) joblist.append(jobid) if concurrency.get("parameter", None) is None: @@ -865,7 +868,7 @@ def run(): cluster_cores=cluster_cores, sequential=sequential, dep_jids=joblist, - cluster_profile=cluster_profile) + ) joblist.append(jobid) else: cvar = concurrency["parameter"] @@ -948,7 +951,7 @@ def run(): job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, - cluster_profile=cluster_profile) + ) joblist.append(jobid) if not all([j is None for j in joblist]): -- GitLab