#!/usr/bin/env python import argparse from datetime import datetime import inspect import nbconvert import nbformat from nbparameterise import ( extract_parameters, replace_definitions, parameter_values ) import os import pprint import re from subprocess import check_output, DEVNULL import sys import warnings from .settings import * from .notebooks import notebooks from jinja2 import Template import stat import textwrap from .finalize import tex_escape PKG_DIR = os.path.dirname(os.path.abspath(__file__)) # Add a class combining raw description formatting with # Metavariable default outputs class RawTypeFormatter(argparse.RawDescriptionHelpFormatter, argparse.MetavarTypeHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): pass # The argument parser for calibrate.py, will be extended depending # on the options given. def make_initial_parser(**kwargs): parser = argparse.ArgumentParser( description="Main entry point for offline calibration", formatter_class=RawTypeFormatter, **kwargs ) parser.add_argument('detector', metavar='DETECTOR', type=str, help='The detector to calibrate: ' + ", ".join(notebooks)) parser.add_argument('type', metavar='TYPE', type=str, help='Type of calibration.') parser.add_argument('--no-cluster-job', action="store_true", default=False, help="Do not run as a cluster job") parser.add_argument('--report-to', type=str, help='Filename (and optionally path) for output' ' report') parser.add_argument('--concurrency-par', type=str, help='Name of cuncurrency parameter.' 'If not given, it is taken from configuration.') parser.add_argument('--priority', type=int, default=2, help="Priority of batch jobs. If priority<=1, reserved" " nodes become available.") parser.add_argument('--vector-figs', action="store_true", default=False, help="Use vector graphics for figures in the report.") parser.add_argument('--slurm-mem', type=int, default=500, help="Requestet node RAM in GB") parser.add_argument('--slurm-name', type=str, default='xfel_calibrate', help='Name of slurm job') parser.add_argument('--slurm-scheduling', type=int, default=0, help='Change scheduling priority for a slurm job ' '+- 2147483645 (negative value increases ' 'priority)') parser.add_argument('--request-time', type=str, default='Now', help='Time of request to process notebook. Iso format') parser.add_argument_group('required arguments') parser.add_argument('--reservation', type=str, default="") return parser # Helper functions for parser extensions def make_intelli_list(ltype): """ Parses a list from range and comma expressions. An expression of the form "1-5,6" will be parsed into the following list: [1,2,3,4,6] """ class IntelliListAction(argparse.Action): element_type = ltype def __init__(self, *args, **kwargs): super(IntelliListAction, self).__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): parsed_values = [] values = ",".join(values) if isinstance(values, str): for rcomp in values.split(","): if "-" in rcomp: start, end = rcomp.split("-") parsed_values += list(range(int(start), int(end))) else: parsed_values += [int(rcomp)] elif isinstance(values, (list, tuple)): parsed_values = values else: parsed_values = [values, ] parsed_values = [self.element_type(p) for p in parsed_values] print("Parsed input {} to {}".format(values, parsed_values)) setattr(namespace, self.dest, parsed_values) return IntelliListAction def consolize_name(name): """ Names of console parameters don't have underscores """ return name.replace("_", "-") def deconsolize_args(args): """ Variable names have underscores """ new_args = {} for k, v in args.items(): new_args[k.replace("-", "_")] = v return new_args def extract_title_author_version(nb): """ Tries to extract title, author from markdown. The version is taken from git. """ first_md = first_markdown_cell(nb) source = first_md["source"] title = re.findall(r'#+\s*(.*)\s*#+', source) author = re.findall( r'author[\s]*[:][\s]*(.*?)\s*(?:[,?]|version)', source, flags=re.IGNORECASE) title = title[0] if len(title) else None author = author[0] if len(author) else None # Try to get version from the git # Will work only in case of the development installation # Suppress output errors # In case of a standard installation a version is stored # in the _version.py file try: git_dir = os.path.join(PKG_DIR, '..', '.git') version = check_output([ 'git', f'--git-dir={git_dir}', 'describe', '--tag', ], stderr=DEVNULL).decode('utf8') version = version.replace("\n", "") except: from .VERSION import __version__ version = __version__ return title, author, version def get_cell_n(nb, cell_type, cell_n): """ Return notebook cell with given number and given type :param nb: jupyter notebook :param cell_type: cell type, 'code' or 'markdown' :param cell_n: cell number (count from 0) :return: notebook cell """ counter = 0 for cell in nb.cells: if cell.cell_type == cell_type: if counter == cell_n: return cell counter=+1 def first_code_cell(nb): """ Return the first code cell of a notebook """ return get_cell_n(nb, 'code', 0) def first_markdown_cell(nb): """ Return the first markdown cell of a notebook """ return get_cell_n(nb, 'markdown', 0) def make_epilog(nb, caltype=None): """ Make an epilog from the notebook to add to parser help """ msg = "" header_cell = first_markdown_cell(nb) lines = header_cell.source.split("\n") if caltype: msg += "{:<15} {}".format(caltype, lines[0]) + "\n" else: msg += "{}".format(lines[0]) + "\n" pp = pprint.PrettyPrinter(indent=(17 if caltype else 0)) if len(lines[1:]): plines = pp.pformat(lines[1:])[1:-1].split("\n") for line in plines: sline = line.replace("'", "", 1) sline = sline.replace("', '", " " * (17 if caltype else 0), 1) sline = sline[::-1].replace("'", "", 1)[::-1] sline = sline.replace(" ,", " ") if len(sline) > 1 and sline[0] == ",": sline = sline[1:] msg += sline + "\n" msg += "\n" return msg def get_notebook_function(nb, fname): flines = [] def_found = False indent = None for cell in nb.cells: if cell.cell_type == 'code': lines = cell.source.split("\n") for line in lines: if def_found: lin = len(line) - len(line.lstrip()) if indent is None: if lin != 0: indent = lin flines.append(line) elif lin >= indent: flines.append(line) else: return "\n".join(flines) if re.search(r"def\s+{}\(.*\):\s*".format(fname), line) and not def_found: # print("Found {} in line {}".format(fname, line)) # set this to indent level def_found = True flines.append(line) return None def balance_sequences(in_folder, run, sequences, sequences_per_node, path_inset, max_nodes=8): """ Return balance list of sequences to be executed on slurm nodes Total list of sequences is splitted to several nodes, with a number of sequences per node defined in the input parameter. if number of required nodes is more than max_nodes, the number of sequences_per_node will be increased to run on all on max_nodes. :param in_folder: Path to raw data :param run: Run number :param sequences: List of sequences :param sequences_per_node: Number of sequences per node :param path_inset: Inset of path to data file :param max_nodes: Maximum number of Maxwell nodes to use :return: Balanced list of list of sequences """ import glob import numpy as np if sequences[0] == -1: path = os.path.join(in_folder, f"r{run:04d}", f"*{path_inset}-S*.h5") sequence_files = glob.glob(path) seq_nums = set() for sf in sequence_files: seqnum = re.findall(r".*-S([0-9]*).h5", sf)[0] seq_nums.add(int(seqnum)) seq_nums -= set(sequences) else: seq_nums = set(sequences) nsplits = len(seq_nums) // sequences_per_node + 1 while nsplits > max_nodes: sequences_per_node += 1 nsplits = len(seq_nums) // sequences_per_node + 1 print("Changed to {} sequences per node".format(sequences_per_node)) print(f"to have a maximum of {max_nodes} concurrent jobs") return [l.tolist() for l in np.array_split(list(seq_nums), nsplits) if l.size > 0] def make_extended_parser() -> argparse.ArgumentParser: """Create an ArgumentParser using information from the notebooks""" # extend the parser according to user input # the first case is if a detector was given, but no calibration type if len(sys.argv) == 3 and "-h" in sys.argv[2]: detector = sys.argv[1].upper() try: det_notebooks = notebooks[detector] except KeyError: print("Not one of the known detectors: {}".format(notebooks.keys())) sys.exit(1) msg = "Options for detector {}\n".format(detector) msg += "*" * len(msg) + "\n\n" # basically, this creates help in the form of # # TYPE some description that is # indented for this type. # # The information is extracted from the first markdown cell of # the notebook. for caltype, notebook in det_notebooks.items(): nbpath = os.path.join(PKG_DIR, notebook["notebook"]) nb = nbformat.read(nbpath, as_version=4) msg += make_epilog(nb, caltype=caltype) return make_initial_parser(epilog=msg) elif len(sys.argv) <= 3: return make_initial_parser() # A detector and type was given. We derive the arguments # from the corresponding notebook args, _ = make_initial_parser(add_help=False).parse_known_args() try: nb_info = notebooks[args.detector.upper()][args.type.upper()] except KeyError: print("Not one of the known calibrations or detectors") sys.exit(1) notebook = os.path.join(PKG_DIR, nb_info["notebook"]) cvar = nb_info.get("concurrency", {}).get("parameter", None) nb = nbformat.read(notebook, as_version=4) # extend parameters if needed ext_func = nb_info.get("extend parms", None) if ext_func is not None: extend_params(nb, ext_func) # No extend parms function - add statically defined parameters from the # first code cell parser = make_initial_parser() add_args_from_nb(nb, parser, cvar=cvar) return parser def add_args_from_nb(nb, parser, cvar=None, no_required=False): """Add argparse arguments for parameters in the first cell of a notebook. Uses nbparameterise to extract the parameter information. Each foo_bar parameter gets a --foo-bar command line option. Boolean parameters get a pair of flags like --abc and --no-abc. :param nb: NotebookNode object representing a loaded .ipynb file :param parser: argparse.ArgumentParser instance :param str cvar: Name of the concurrency parameter. :param bool no_required: If True, none of the added options are required. """ parser.description = make_epilog(nb) parms = extract_parameters(nb) for p in parms: helpstr = ("Default: %(default)s" if not p.comment else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip())) required = (p.comment is not None and "required" in p.comment and not no_required and p.name != cvar) # This may be not a public API # May require reprogramming in case of argparse updates pars_group = parser._action_groups[2 if required else 1] default = p.value if (not required) else None if p.type == list or p.name == cvar: if p.type is list: ltype = type(p.value[0]) else: ltype = p.type range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False pars_group.add_argument(f"--{consolize_name(p.name)}", nargs='+', type=ltype if not range_allowed else str, default=default, help=helpstr, required=required, action=make_intelli_list(ltype) if range_allowed else None) elif p.type == bool: # For a boolean, make --XYZ and --no-XYZ options. alt_group = pars_group.add_mutually_exclusive_group(required=required) alt_group.add_argument(f"--{consolize_name(p.name)}", action="store_true", default=default, help=helpstr, dest=p.name) alt_group.add_argument(f"--no-{consolize_name(p.name)}", action="store_false", default=default, help=f"Opposite of --{consolize_name(p.name)}", dest=p.name) else: pars_group.add_argument(f"--{consolize_name(p.name)}", type=p.type, default=default, help=helpstr, required=required) def extend_params(nb, extend_func_name): """Add parameters in the first code cell by calling a function in the notebook """ func = get_notebook_function(nb, extend_func_name) if func is None: warnings.warn( f"Didn't find concurrency function {extend_func_name} in notebook", RuntimeWarning ) return # Make a temporary parser that won't exit if it sees -h or --help pre_parser = make_initial_parser(add_help=False) add_args_from_nb(nb, pre_parser, no_required=True) known, _ = pre_parser.parse_known_args() args = deconsolize_args(vars(known)) df = {} exec(func, df) f = df[extend_func_name] sig = inspect.signature(f) extension = f(*[args[p] for p in sig.parameters]) fcc = first_code_cell(nb) fcc["source"] += "\n" + extension def has_parm(parms, name): """ Check if a parameter of `name` exists in parms """ for p in parms: if p.name == name: return True return False def get_par_attr(parms, key, attr, default=None): """ Return the type of parameter with name key :param parms: List of parameters :param key: Name of the parameter to be considered :param attr: Name of the parameter attribute (e.g. value, type) :param default: Type to be returned if interested name is not found :return: The attribute of the parameter """ for p in parms: if p.name == key: return getattr(p, attr, default) return default def flatten_list(l): """ Make a string representation of a list :param l: List or a string :return: Same string or string with first and last entry of a list """ if isinstance(l, list): if len(l) > 1: return '{}-{}'.format(l[0], l[-1]) elif len(l) == 1: return '{}'.format(l[0]) else: return '' else: return str(l) def set_figure_format(nb, enable_vector_format): """Set svg format in inline backend for figures If parameter enable_vector_format is set to True, svg format will be used for figures in the notebook rendering. Subsequently vector graphics figures will be used for report. """ if enable_vector_format: cell = get_cell_n(nb, 'code', 1) cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n" def create_finalize_script(fmt_args, temp_path, job_list): """ Create a finalize script to produce output report :param fmt_args: Dictionary of fmt arguments :param temp_path: Path to temporary folder to run slurm job :param job_list: List of slurm jobs """ tmpl = Template(''' import sys from xfel_calibrate.finalize import finalize finalize(joblist={{joblist}}, finaljob=sys.argv[1], run_path='{{run_path}}', out_path='{{out_path}}', project='{{project}}', calibration='{{calibration}}', author='{{author}}', version='{{version}}', report_to='{{report_to}}', data_path='{{in_folder}}', request_time='{{request_time}}', submission_time='{{submission_time}}')" ''') fmt_args['joblist'] = job_list f_name = os.path.join(temp_path, "finalize.py") with open(f_name, "w") as finfile: finfile.write(textwrap.dedent(tmpl.render(**fmt_args))) # change rights of the file to be: # executed and writable for user, readable for user, group and others all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH os.chmod(f_name, all_stats) def save_executed_command(run_tmp_path, version): """ Create a file with string used to execute `xfel_calibrate` :param run_tmp_path: path to temporary directory for running job outputs :parm version: git version of the pycalibration package """ f_name = os.path.join(run_tmp_path, "run_calibrate.sh") with open(f_name, "w") as finfile: finfile.write(f'# pycalibration version: {version}\n') finfile.write(' '.join(sys.argv)) def get_launcher_command(args, temp_path, dep_jids): """ Return a slurm launcher command :param args: Command line arguments :param temp_path: Temporary path to run job :param dep_jids: A list of dependent jobs :return: List of commands and parameters to be used by subprocess """ launcher_slurm = launcher_command.format(temp_path=temp_path) # calculate number of general nodes available free = int(check_output(free_nodes_cmd, shell=True).decode('utf8')) preempt = int(check_output(preempt_nodes_cmd, shell=True).decode('utf8')) ureservation = args['reservation'] priority = args['priority'] if (ureservation == "" and (free + preempt >= max_reserved or priority > 1 or reservation == "")): launcher_slurm += " --partition {}".format(sprof) else: this_res = reservation if priority == 1 else reservation_char if ureservation != "": this_res = ureservation launcher_slurm += " --reservation={}".format(this_res) job_name = args.get('slurm_name', 'xfel_calibrate') launcher_slurm += " --job-name {}".format(job_name) if args.get('slurm_priority'): launcher_slurm += " --nice={}".format(args.get('slurm_priority')) launcher_slurm += " --mem {}G".format(args.get('slurm_mem', '500')) if len(dep_jids): srun_dep = " --dependency=afterok" for jobid in dep_jids: srun_dep += ":{}".format(jobid) launcher_slurm += srun_dep return launcher_slurm.split() def remove_duplications(l): """ Remove duplicated elements in the list :param l: Input list :return: Output list of unique elements """ unique_l = [] for elem in l: if elem not in unique_l: unique_l.append(elem) if unique_l != l: print("Duplicated concurrency parameters were removed") return unique_l def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, final_job=False, job_list=[], fmt_args={}, cluster_cores=8, sequential=False, dep_jids=[], show_title=True, cluster_profile='NO_CLUSTER'): """ Launch a concurrent job on the cluster via SLURM """ if cparm is not None: args[cparm] = cval suffix = flatten_list(cval) if "cluster_profile" in args: args["cluster_profile"] = "{}_{}".format(cluster_profile, suffix) # first convert the notebook parms = extract_parameters(nb) params = parameter_values(parms, **args) new_nb = replace_definitions(nb, params, execute=False) if not show_title: first_markdown_cell(new_nb).source = '' set_figure_format(new_nb, args["vector_figs"]) base_name = nbname.replace(".ipynb", "") new_name = "{}__{}__{}.ipynb".format( os.path.basename(base_name), cparm, suffix) nbpath = os.path.join(temp_path, new_name) with open(nbpath, "w") as f: f.write(nbconvert.exporters.export( nbconvert.NotebookExporter, new_nb)[0]) # add finalization to the last job if final_job: create_finalize_script(fmt_args, temp_path, job_list) # then run an sbatch job srun_base = [] if not sequential: srun_base = get_launcher_command(args, temp_path, dep_jids) print(" ".join(srun_base)) srun_base += [os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh os.path.abspath(nbpath), # path to notebook python_path, # path to python args.get("cluster_profile", "NO_CLUSTER"), '"{}"'.format(base_name.upper()), '"{}"'.format(args["detector"].upper()), '"{}"'.format(args["type"].upper()), "FINAL" if final_job else "NONFINAL", "{}/finalize.py".format(os.path.abspath(temp_path)), str(cluster_cores)] output = check_output(srun_base).decode('utf8') jobid = None if not sequential: for line in output.split("\n"): if "Submitted batch job " in line: jobid = line.split(" ")[3] print("Submitted job: {}".format(jobid)) return jobid def make_par_table(parms, run_tmp_path): """ Create a table with input parameters of the notebook :param parms: parameters of the notebook :param run_tmp_path: path to temporary directory for running job outputs """ # Add space in long strings without line breakers ` ,-/` to # wrap them in latex def split_len(seq, length): lbc = set(' ,-/') line = '' for i in range(0, len(seq), length): sub_line = seq[i:i + length] line += sub_line.replace('/', '/\-') if not any(c in lbc for c in sub_line): line += '\-' return line # Prepare strings and estimate their length l_parms = [] len_parms = [0, 0] max_len = [20, 20] for p in parms: name = p.name.replace('_', '-') if len(name) > max_len[0]: len_parms[0] = max_len[0] name = split_len(name, max_len[0]) value = tex_escape(str(p.value)) if len(value) > max_len[1]: len_parms[1] = max_len[1] value = split_len(value, max_len[1]) if p.type is str: value = "``{}''".format(value) comment = tex_escape(str(p.comment)[1:]) l_parms.append([name, value, comment]) # Fix column width is needed col_type = ['l', 'c', 'p{.3\\textwidth}'] if len_parms[0] == max_len[0]: col_type[0] = col_type[2] if len_parms[1] == max_len[1]: col_type[1] = col_type[2] tmpl = Template(''' Input of the calibration pipeline ================================= .. raw:: latex \\begin{longtable}{ {% for k in p %}{{k}}{%- endfor %} } \hline {% for k in lines %} {{ k[0] }} & {{ k[1] }} & {{ k[2] }} \\\\ {%- endfor %} \hline \end{longtable} ''') f_name = "{}/InputParameters.rst".format(run_tmp_path) with open(f_name, "w") as finfile: finfile.write(textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))) def run(): """ Run a calibration task with parser arguments """ parser = make_extended_parser() args = deconsolize_args(vars(parser.parse_args())) detector = args["detector"].upper() caltype = args["type"].upper() sequential = args["no_cluster_job"] if sequential: print("Not running on cluster") try: nb_info = notebooks[detector][caltype] except KeyError: print("Not one of the known calibrations or detectors") return 1 pre_notebooks = nb_info.get("pre_notebooks", []) notebook = nb_info["notebook"] dep_notebooks = nb_info.get("dep_notebooks", []) concurrency = nb_info.get("concurrency", None) if args["concurrency_par"] is not None: concurrency["parameter"] = args["concurrency_par"] notebook_path = os.path.join(PKG_DIR, notebook) nb = nbformat.read(notebook_path, as_version=4) # extend parameters if needed ext_func = nb_info.get("extend parms", None) if ext_func is not None: extend_params(nb, ext_func) parms = extract_parameters(nb) title, author, version = extract_title_author_version(nb) if not title: title = "{} {} Calibration".format(detector, caltype) if not author: author = "anonymous" if not version: version = "" title = title.rstrip() run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" # check if concurrency parameter is given and we run concurrently if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None: msg = "Notebook cannot be run concurrently: no {} parameter".format( concurrency["parameter"]) warnings.warn(msg, RuntimeWarning) cluster_profile = "NO_CLUSTER" if not has_parm(parms, "cluster_profile"): warnings.warn("Notebook has no cluster_profile parameter, " + "running on cluster will likely fail!", RuntimeWarning) elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): cluster_profile = "slurm_prof_{}".format(run_uuid) # create a temporary output directory to work in run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}") os.makedirs(run_tmp_path) # Write all input parameters to rst file to be included to final report parms = parameter_values(parms, **args) make_par_table(parms, run_tmp_path) save_executed_command(run_tmp_path, version) # wait on all jobs to run and then finalize the run by creating a report from the notebooks out_path = os.path.join( report_path, detector.upper(), caltype.upper(), datetime.now().isoformat() ) if try_report_to_output: if "out_folder" in args: out_path = os.path.abspath(args["out_folder"]) else: print("No 'out_folder' defined as argument, outputting to '{}' instead.".format( out_path)) os.makedirs(out_path, exist_ok=True) report_to = title.replace(" ", "") if args["report_to"] is not None: report_to = args["report_to"] folder = get_par_attr(parms, 'in_folder', 'value', '') if args["request_time"] == "Now": request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') else: request_time = args["request_time"] submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') fmt_args = {'run_path': run_tmp_path, 'out_path': out_path, 'project': title, 'calibration': title, 'author': author, 'version': version, 'report_to': report_to, 'in_folder': folder, 'request_time': request_time, 'submission_time': submission_time } joblist = [] cluster_cores = concurrency.get("cluster cores", 8) # Check if there are pre-notebooks for pre_notebook in pre_notebooks: pre_notebook_path = os.path.join(PKG_DIR, pre_notebook) lead_nb = nbformat.read(pre_notebook_path, as_version=4) jobid = concurrent_run(run_tmp_path, lead_nb, os.path.basename(pre_notebook_path), args, job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, cluster_profile=cluster_profile) joblist.append(jobid) if concurrency.get("parameter", None) is None: jobid = concurrent_run(run_tmp_path, nb, os.path.basename(notebook), args, final_job=True, job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, dep_jids=joblist, cluster_profile=cluster_profile) joblist.append(jobid) else: cvar = concurrency["parameter"] cvals = args.get(cvar, None) con_func = concurrency.get("use function", None) # Consider [-1] as None if cvals is None or cvals == [-1]: defcval = concurrency.get("default concurrency", None) if defcval is not None: print(f"Concurrency parameter '{cvar}' " f"is taken from notebooks.py") if not isinstance(defcval, (list, tuple)): cvals = range(defcval) else: cvals = defcval if cvals is None: defcval = get_par_attr(parms, cvar, 'value') if defcval is not None: print(f"Concurrency parameter '{cvar}' " f"is taken from '{notebook}'") if not isinstance(defcval, (list, tuple)): cvals = [defcval] else: cvals = defcval if con_func: func = get_notebook_function(nb, con_func) if func is None: warnings.warn(f"Didn't find concurrency function {con_func} in notebook", RuntimeWarning) else: df = {} exec(func, df) f = df[con_func] import inspect sig = inspect.signature(f) callargs = [] if cvals: # in case default needs to be used for function call args[cvar] = cvals for arg in sig.parameters: callargs.append(args[arg]) cvals = f(*callargs) print(f"Split concurrency into {cvals}") # get expected type cvtype = get_par_attr(parms, cvar, 'type', list) cvals = remove_duplications(cvals) jlist = [] for cnum, cval in enumerate(cvals): show_title = cnum == 0 # Job is not final if there are dependent notebooks final_job = (cnum == len(list(cvals)) - 1 and len(dep_notebooks) == 0) cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval jobid = concurrent_run(run_tmp_path, nb, notebook, args, cvar, cval, final_job, jlist, fmt_args, cluster_cores=cluster_cores, sequential=sequential, show_title=show_title, dep_jids=joblist, cluster_profile=cluster_profile) jlist.append(jobid) joblist.extend(jlist) # Run dependent notebooks (e.g. summaries after correction) for i, dep_notebook in enumerate(dep_notebooks): dep_notebook_path = os.path.join(PKG_DIR, dep_notebook) dep_nb = nbformat.read(dep_notebook_path, as_version=4) final_job = i == len(dep_notebooks) - 1 jobid = concurrent_run(run_tmp_path, dep_nb, os.path.basename(dep_notebook_path), args, dep_jids=joblist, final_job=final_job, job_list=joblist, fmt_args=fmt_args, cluster_cores=cluster_cores, sequential=sequential, cluster_profile=cluster_profile) joblist.append(jobid) if not all([j is None for j in joblist]): print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) if __name__ == "__main__": sys.exit(run())