diff --git a/bin/slurm_calibrate.sh b/bin/slurm_calibrate.sh index 0a9d2d7133af40623ff3ff2824d4bb8d1b75023e..6c2ecfcfa0dcd3317eb719d9d4e5d3341b124c48 100755 --- a/bin/slurm_calibrate.sh +++ b/bin/slurm_calibrate.sh @@ -3,7 +3,7 @@ # set paths to use nb_path=$1 python_path=$2 -uuid=$3 +ipcluster_profile=$3 notebook=$4 detector=$5 caltype=$6 @@ -14,7 +14,7 @@ cluster_cores=$9 echo "Running with the following parameters:" echo "Notebook path: $nb_path" echo "Python path: $python_path" -echo "IP-Cluster profile: $uuid" +echo "IP-Cluster profile: $ipcluster_profile" echo "notebook: $notebook" echo "detector: $detector" echo "caltype: $caltype" @@ -37,10 +37,10 @@ echo "Starting influx feeder" export MPLBACKEND=AGG # start an ip cluster if requested -if [ "${uuid}" != "NO_CLUSTER" ] +if [ "${ipcluster_profile}" != "NO_CLUSTER" ] then - ${python_path} -m IPython profile create ${uuid} --parallel - ${python_path} -m ipyparallel.cluster start --n=${cluster_cores} --profile=${uuid} --daemonize & + ${python_path} -m IPython profile create ${ipcluster_profile} --parallel + ${python_path} -m ipyparallel.cluster start --n=${cluster_cores} --profile=${ipcluster_profile} --daemonize & sleep 15 fi @@ -49,10 +49,10 @@ echo "Running script" ${python_path} -m nbconvert --to rst --ExecutePreprocessor.timeout=36000 --ExecutePreprocessor.allow_errors=True --TemplateExporter.exclude_input=True --execute ${nb_path} # stop the cluster if requested -if [ "${uuid}" != "NO_CLUSTER" ] +if [ "${ipcluster_profile}" != "NO_CLUSTER" ] then - ${python_path} -m ipyparallel.cluster stop --profile=${uuid} - profile_path=$(${python_path} -m IPython locate profile ${uuid}) + ${python_path} -m ipyparallel.cluster stop --profile=${ipcluster_profile} + profile_path=$(${python_path} -m IPython locate profile ${ipcluster_profile}) echo "Removing cluster profile from: $profile_path" rm -rf $profile_path fi diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index e906d9921aa33d3f733bc70e47e11b0cb61f442e..e59571212b7333893e076ec126fd7b9bff84251d 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -22,7 +22,6 @@ }, "outputs": [], "source": [ - "cluster_profile = \"noDB\"\n", "in_folder = \"/gpfs/exfel/exp/MID/202030/p900137/raw\" # the folder to read data from, required\n", "out_folder = \"/gpfs/exfel/exp/MID/202030/p900137/scratch/karnem/r449_v06\" # the folder to output to, required\n", "sequences = [-1] # sequences to correct, set to -1 for all, range allowed\n", diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify_Summary_NBC.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify_Summary_NBC.ipynb index b38add62968d46d7a31145f6d160ca5ba0315618..8565e8db22fe54fad5457d5e54025791dfabd2a2 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify_Summary_NBC.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify_Summary_NBC.ipynb @@ -13,7 +13,6 @@ "metadata": {}, "outputs": [], "source": [ - "cluster_profile = \"noDB\" # The ipcluster profile to use\n", "run = 11 # runs to process, required\n", "out_folder = \"/gpfs/exfel/data/scratch/ahmedk/test/AGIPD_Corr\" # path to output to, required\n", "modules = [-1]" diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index 1f5453f0b5aa7b53d07ca52788ed624d3fb8bcbf..9319c304644e67686962232c75154deb7d8d8de7 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -611,7 +611,7 @@ def remove_duplications(l): def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, final_job=False, job_list=[], fmt_args={}, cluster_cores=8, sequential=False, dep_jids=[], - show_title=True, cluster_profile='NO_CLUSTER'): + show_title=True): """ Launch a concurrent job on the cluster via SLURM """ @@ -619,12 +619,18 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, args[cparm] = cval suffix = flatten_list(cval) - if "cluster_profile" in args: - args["cluster_profile"] = "{}_{}".format(cluster_profile, suffix) # first convert the notebook parms = extract_parameters(nb) + + if has_parm(parms, "cluster_profile"): + cluster_profile = f"{args['cluster_profile']}_{suffix}" + else: + # Don't start ipcluster if there's no cluster_profile parameter + cluster_profile = 'NO_CLUSTER' + params = parameter_values(parms, **args) + params = parameter_values(params, cluster_profile=cluster_profile) new_nb = replace_definitions(nb, params, execute=False) if not show_title: first_markdown_cell(new_nb).source = '' @@ -651,7 +657,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, srun_base += [os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh os.path.abspath(nbpath), # path to notebook python_path, # path to python - args.get("cluster_profile", "NO_CLUSTER"), + cluster_profile, '"{}"'.format(base_name.upper()), '"{}"'.format(args["detector"].upper()), '"{}"'.format(args["type"].upper()), @@ -789,12 +795,9 @@ def run(): concurrency["parameter"]) warnings.warn(msg, RuntimeWarning) - cluster_profile = "NO_CLUSTER" - if not has_parm(parms, "cluster_profile"): - warnings.warn("Notebook has no cluster_profile parameter, " + - "running on cluster will likely fail!", RuntimeWarning) - elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): - cluster_profile = "slurm_prof_{}".format(run_uuid) + # If not explicitly specified, use a new profile for ipcluster + if args.get("cluster_profile") in {None, parser.get_default("cluster_profile")}: + args['cluster_profile'] = "slurm_prof_{}".format(run_uuid) # create a temporary output directory to work in run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}") @@ -854,7 +857,7 @@ def run(): job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, - cluster_profile=cluster_profile) + ) joblist.append(jobid) if concurrency.get("parameter", None) is None: @@ -865,7 +868,7 @@ def run(): cluster_cores=cluster_cores, sequential=sequential, dep_jids=joblist, - cluster_profile=cluster_profile) + ) joblist.append(jobid) else: cvar = concurrency["parameter"] @@ -931,7 +934,7 @@ def run(): sequential=sequential, show_title=show_title, dep_jids=joblist, - cluster_profile=cluster_profile) + ) jlist.append(jobid) joblist.extend(jlist) @@ -948,7 +951,7 @@ def run(): job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, - cluster_profile=cluster_profile) + ) joblist.append(jobid) if not all([j is None for j in joblist]):