Skip to content
Snippets Groups Projects
Commit adbbeff9 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Decide to start ipcluster per-notebook, not per calibration type

parent 4f9f7f86
No related branches found
No related tags found
1 merge request!357Only start ipcluster if the notebook uses cluster_profile
...@@ -611,7 +611,7 @@ def remove_duplications(l): ...@@ -611,7 +611,7 @@ def remove_duplications(l):
def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
final_job=False, job_list=[], fmt_args={}, cluster_cores=8, final_job=False, job_list=[], fmt_args={}, cluster_cores=8,
sequential=False, dep_jids=[], sequential=False, dep_jids=[],
show_title=True, cluster_profile='NO_CLUSTER'): show_title=True):
""" Launch a concurrent job on the cluster via SLURM """ Launch a concurrent job on the cluster via SLURM
""" """
...@@ -619,12 +619,18 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, ...@@ -619,12 +619,18 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
args[cparm] = cval args[cparm] = cval
suffix = flatten_list(cval) suffix = flatten_list(cval)
if "cluster_profile" in args:
args["cluster_profile"] = "{}_{}".format(cluster_profile, suffix)
# first convert the notebook # first convert the notebook
parms = extract_parameters(nb) parms = extract_parameters(nb)
if has_parm(parms, "cluster_profile"):
cluster_profile = f"{args['cluster_profile']}_{suffix}"
else:
# Don't start ipcluster if there's no cluster_profile parameter
cluster_profile = 'NO_CLUSTER'
params = parameter_values(parms, **args) params = parameter_values(parms, **args)
params = parameter_values(params, cluster_profile=cluster_profile)
new_nb = replace_definitions(nb, params, execute=False) new_nb = replace_definitions(nb, params, execute=False)
if not show_title: if not show_title:
first_markdown_cell(new_nb).source = '' first_markdown_cell(new_nb).source = ''
...@@ -651,7 +657,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, ...@@ -651,7 +657,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
srun_base += [os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh srun_base += [os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh
os.path.abspath(nbpath), # path to notebook os.path.abspath(nbpath), # path to notebook
python_path, # path to python python_path, # path to python
args.get("cluster_profile", "NO_CLUSTER"), cluster_profile,
'"{}"'.format(base_name.upper()), '"{}"'.format(base_name.upper()),
'"{}"'.format(args["detector"].upper()), '"{}"'.format(args["detector"].upper()),
'"{}"'.format(args["type"].upper()), '"{}"'.format(args["type"].upper()),
...@@ -789,12 +795,9 @@ def run(): ...@@ -789,12 +795,9 @@ def run():
concurrency["parameter"]) concurrency["parameter"])
warnings.warn(msg, RuntimeWarning) warnings.warn(msg, RuntimeWarning)
cluster_profile = "NO_CLUSTER" # If not explicitly specified, use a new profile for ipcluster
if not has_parm(parms, "cluster_profile"): if args.get("cluster_profile") in {None, parser.get_default("cluster_profile")}:
warnings.warn("Notebook has no cluster_profile parameter, " + args['cluster_profile'] = "slurm_prof_{}".format(run_uuid)
"running on cluster will likely fail!", RuntimeWarning)
elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'):
cluster_profile = "slurm_prof_{}".format(run_uuid)
# create a temporary output directory to work in # create a temporary output directory to work in
run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}") run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}")
...@@ -854,7 +857,7 @@ def run(): ...@@ -854,7 +857,7 @@ def run():
job_list=joblist, fmt_args=fmt_args, job_list=joblist, fmt_args=fmt_args,
cluster_cores=cluster_cores, cluster_cores=cluster_cores,
sequential=sequential, sequential=sequential,
cluster_profile=cluster_profile) )
joblist.append(jobid) joblist.append(jobid)
if concurrency.get("parameter", None) is None: if concurrency.get("parameter", None) is None:
...@@ -865,7 +868,7 @@ def run(): ...@@ -865,7 +868,7 @@ def run():
cluster_cores=cluster_cores, cluster_cores=cluster_cores,
sequential=sequential, sequential=sequential,
dep_jids=joblist, dep_jids=joblist,
cluster_profile=cluster_profile) )
joblist.append(jobid) joblist.append(jobid)
else: else:
cvar = concurrency["parameter"] cvar = concurrency["parameter"]
...@@ -948,7 +951,7 @@ def run(): ...@@ -948,7 +951,7 @@ def run():
job_list=joblist, fmt_args=fmt_args, job_list=joblist, fmt_args=fmt_args,
cluster_cores=cluster_cores, cluster_cores=cluster_cores,
sequential=sequential, sequential=sequential,
cluster_profile=cluster_profile) )
joblist.append(jobid) joblist.append(jobid)
if not all([j is None for j in joblist]): if not all([j is None for j in joblist]):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment