diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index b84de4027c6076fe7bd4bd31c232b12150f228c0..890e02588b758bc28debdcdc7bafff5eb5c75a6b 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -2,7 +2,7 @@ import argparse from datetime import datetime -import dateutil.parser +import inspect import nbconvert import nbformat from nbparameterise import ( @@ -11,7 +11,7 @@ from nbparameterise import ( import os import pprint import re -from subprocess import check_output +from subprocess import check_output, DEVNULL import sys import warnings from .settings import * @@ -22,6 +22,8 @@ import textwrap from .finalize import tex_escape +PKG_DIR = os.path.dirname(os.path.abspath(__file__)) + # Add a class combining raw description formatting with # Metavariable default outputs @@ -34,16 +36,18 @@ class RawTypeFormatter(argparse.RawDescriptionHelpFormatter, # The argument parser for calibrate.py, will be extended depending # on the options given. -def make_initial_parser(): - parser = argparse.ArgumentParser(description="Main entry point " - "for offline calibration", - formatter_class=RawTypeFormatter) +def make_initial_parser(**kwargs): + parser = argparse.ArgumentParser( + description="Main entry point for offline calibration", + formatter_class=RawTypeFormatter, + **kwargs + ) parser.add_argument('detector', metavar='DETECTOR', type=str, - help='The detector to calibrate') + help='The detector to calibrate: ' + ", ".join(notebooks)) parser.add_argument('type', metavar='TYPE', type=str, - help='Type of calibration: ' + ",".join(notebooks.keys())) + help='Type of calibration.') parser.add_argument('--no-cluster-job', action="store_true", @@ -86,9 +90,6 @@ def make_initial_parser(): return parser -parser = make_initial_parser() - - # Helper functions for parser extensions def make_intelli_list(ltype): @@ -99,6 +100,7 @@ def make_intelli_list(ltype): """ class IntelliListAction(argparse.Action): + element_type = ltype def __init__(self, *args, **kwargs): super(IntelliListAction, self).__init__(*args, **kwargs) @@ -107,24 +109,22 @@ def make_intelli_list(ltype): parsed_values = [] values = ",".join(values) - try: - if isinstance(values, str): - for rcomp in values.split(","): - if "-" in rcomp: - start, end = rcomp.split("-") - parsed_values += list(range(int(start), int(end))) - else: - parsed_values += [int(rcomp)] - elif isinstance(values, (list, tuple)): - parsed_values = values - else: - parsed_values = [values, ] - except Exception as e: - print('ERROR:', e) - parsed_values = [self.ltype(p) for p in parsed_values] + if isinstance(values, str): + for rcomp in values.split(","): + if "-" in rcomp: + start, end = rcomp.split("-") + parsed_values += list(range(int(start), int(end))) + else: + parsed_values += [int(rcomp)] + elif isinstance(values, (list, tuple)): + parsed_values = values + else: + parsed_values = [values, ] + + parsed_values = [self.element_type(p) for p in parsed_values] print("Parsed input {} to {}".format(values, parsed_values)) setattr(namespace, self.dest, parsed_values) - IntelliListAction.ltype = ltype + return IntelliListAction @@ -149,7 +149,7 @@ def extract_title_author_version(nb): first_md = first_markdown_cell(nb) source = first_md["source"] - title = re.findall(r'\#+\s*(.*)\s*\#+', source) + title = re.findall(r'#+\s*(.*)\s*#+', source) author = re.findall( r'author[\s]*[:][\s]*(.*?)\s*(?:[,?]|version)', source, flags=re.IGNORECASE) @@ -162,12 +162,10 @@ def extract_title_author_version(nb): # In case of a standard installation a version is stored # in the _version.py file try: - git_dir = '{}/../.git'.format(os.path.dirname(__file__)) - FNULL = open(os.devnull, 'w') - version = check_output(['git', - '--git-dir={}'.format(git_dir), - 'describe', '--tag'], - stderr=FNULL).decode('utf8') + git_dir = os.path.join(PKG_DIR, '..', '.git') + version = check_output([ + 'git', f'--git-dir={git_dir}', 'describe', '--tag', + ], stderr=DEVNULL).decode('utf8') version = version.replace("\n", "") except: from .VERSION import __version__ @@ -228,7 +226,6 @@ def make_epilog(nb, caltype=None): def get_notebook_function(nb, fname): - import re flines = [] def_found = False indent = None @@ -274,10 +271,9 @@ def balance_sequences(in_folder, run, sequences, sequences_per_node, :return: Balanced list of list of sequences """ import glob - import re import numpy as np if sequences[0] == -1: - path = "{}/r{:04d}/*{}-S*.h5".format(in_folder, run, path_inset) + path = os.path.join(in_folder, f"r{run:04d}", f"*{path_inset}-S*.h5") sequence_files = glob.glob(path) seq_nums = set() for sf in sequence_files: @@ -296,165 +292,155 @@ def balance_sequences(in_folder, run, sequences, sequences_per_node, l.size > 0] -# extend the parser according to user input -# the first case is if a detector was given, but no calibration type -if len(sys.argv) == 3 and "-h" in sys.argv[2]: - detector = sys.argv[1].upper() - try: - det_notebooks = notebooks[detector] - except KeyError: - print("Not one of the known detectors: {}".format(notebooks.keys())) - exit() - - msg = "Options for detector {}\n".format(detector) - msg += "*" * len(msg) + "\n\n" - - # basically, this creates help in the form of - # - # TYPE some description that is - # indented for this type. - # - # The information is extracted from the first markdown cell of - # the notebook. - for caltype, notebook in det_notebooks.items(): - nbpath = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebook["notebook"])) - with open(nbpath, "r") as f: - nb = nbformat.read(f, as_version=4) +def make_extended_parser() -> argparse.ArgumentParser: + """Create an ArgumentParser using information from the notebooks""" + + # extend the parser according to user input + # the first case is if a detector was given, but no calibration type + if len(sys.argv) == 3 and "-h" in sys.argv[2]: + detector = sys.argv[1].upper() + try: + det_notebooks = notebooks[detector] + except KeyError: + print("Not one of the known detectors: {}".format(notebooks.keys())) + sys.exit(1) + + msg = "Options for detector {}\n".format(detector) + msg += "*" * len(msg) + "\n\n" + + # basically, this creates help in the form of + # + # TYPE some description that is + # indented for this type. + # + # The information is extracted from the first markdown cell of + # the notebook. + for caltype, notebook in det_notebooks.items(): + nbpath = os.path.join(PKG_DIR, notebook["notebook"]) + nb = nbformat.read(nbpath, as_version=4) msg += make_epilog(nb, caltype=caltype) - parser.epilog = msg -# second case is if no detector was given either -elif len(sys.argv) == 2 and "-h" in sys.argv[1]: - epilog = "Available detectors are: {}".format( - ", ".join([k for k in notebooks.keys()])) - parser.epilog = epilog -# final case: a detector and type was given. We derive the arguments -# from the corresponding notebook -elif len(sys.argv) >= 3: - detector = sys.argv[1].upper() - caltype = sys.argv[2].upper() + return make_initial_parser(epilog=msg) + elif len(sys.argv) <= 3: + return make_initial_parser() + + # A detector and type was given. We derive the arguments + # from the corresponding notebook + args, _ = make_initial_parser(add_help=False).parse_known_args() try: - notebook = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebooks[detector][caltype]["notebook"])) - cvar = notebooks[detector][caltype].get("concurrency", - {"parameter": None, - "default concurrency": None, - "cluster cores": 8})["parameter"] + nb_info = notebooks[args.detector.upper()][args.type.upper()] except KeyError: print("Not one of the known calibrations or detectors") - exit() - with open(notebook, "r") as f: - nb = nbformat.read(f, as_version=4) - - ext_func = notebooks[detector][caltype].get("extend parms", None) - - - def do_parse(nb, parser, overwrite_reqs=False): - parser.description = make_epilog(nb) - parms = extract_parameters(nb) - - for p in parms: - - helpstr = ("Default: %(default)s" if not p.comment - else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip())) - required = (p.comment is not None - and "required" in p.comment - and not overwrite_reqs - and p.name != cvar) - - # This may be not a public API - # May require reprogramming in case of argparse updates - pars_group = parser._action_groups[2 if required else 1] - - default = p.value if (not required) else None - - if p.type == list or p.name == cvar: - if p.type is list: - try: - ltype = type(p.value[0]) - except: - print( - "List '{}' is empty. Parameter type can not be defined.".format(p.name)) - print("See first code cell in jupyter-notebook: '{}'".format( - notebooks[detector][caltype]["notebook"])) - exit() - else: - ltype = p.type - - range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False - pars_group.add_argument("--{}".format(consolize_name(p.name)), - nargs='+', - type=ltype if not range_allowed else str, - default=default, - help=helpstr, - required=required, - action=make_intelli_list(ltype) if range_allowed else None) - elif p.type == bool: - # check if an input arg is given with an extra "-no" for - # forcing to convert a bool to False. - # Otherwise leave the default value from the notebook - # or convert to true if the bool arg is given. - if consolize_name("--no-{}".format(p.name)) in sys.argv: - pars_group.add_argument("--{}".format(consolize_name(p.name)), - action="store_false", - default=False, - help=helpstr, - required=required) - sys.argv.remove(consolize_name("--no-{}".format(p.name))) - else: - pars_group.add_argument("--{}".format(consolize_name(p.name)), - action="store_true", - default=default, - help=helpstr, - required=required) - else: - pars_group.add_argument("--{}".format(consolize_name(p.name)), - type=p.type, - default=default, - help=helpstr, - required=required) + sys.exit(1) - do_parse(nb, parser, True) + notebook = os.path.join(PKG_DIR, nb_info["notebook"]) + cvar = nb_info.get("concurrency", {}).get("parameter", None) - # extend parameters if needed - ext_func = notebooks[detector][caltype].get("extend parms", None) - if ext_func is not None: - func = get_notebook_function(nb, ext_func) + nb = nbformat.read(notebook, as_version=4) - if func is None: - warnings.warn(f"Didn't find concurrency function {ext_func} in notebook", - RuntimeWarning) + + # extend parameters if needed + ext_func = nb_info.get("extend parms", None) + if ext_func is not None: + extend_params(nb, ext_func) + + # No extend parms function - add statically defined parameters from the + # first code cell + parser = make_initial_parser() + add_args_from_nb(nb, parser, cvar=cvar) + return parser + +def add_args_from_nb(nb, parser, cvar=None, no_required=False): + """Add argparse arguments for parameters in the first cell of a notebook. + + Uses nbparameterise to extract the parameter information. Each foo_bar + parameter gets a --foo-bar command line option. + Boolean parameters get a pair of flags like --abc and --no-abc. + + :param nb: NotebookNode object representing a loaded .ipynb file + :param parser: argparse.ArgumentParser instance + :param str cvar: Name of the concurrency parameter. + :param bool no_required: If True, none of the added options are required. + """ + parser.description = make_epilog(nb) + parms = extract_parameters(nb) + + for p in parms: + + helpstr = ("Default: %(default)s" if not p.comment + else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip())) + required = (p.comment is not None + and "required" in p.comment + and not no_required + and p.name != cvar) + + # This may be not a public API + # May require reprogramming in case of argparse updates + pars_group = parser._action_groups[2 if required else 1] + + default = p.value if (not required) else None + + if p.type == list or p.name == cvar: + if p.type is list: + ltype = type(p.value[0]) else: - # remove help calls as they will cause the argument parser to exit - add_help = False - if "-h" in sys.argv: - sys.argv.remove("-h") - add_help = True - if "--help" in sys.argv: - sys.argv.remove("--help") - add_help = True - known, remaining = parser.parse_known_args() - if add_help: - sys.argv.append("--help") - args = deconsolize_args(vars(known)) + ltype = p.type + + range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False + pars_group.add_argument(f"--{consolize_name(p.name)}", + nargs='+', + type=ltype if not range_allowed else str, + default=default, + help=helpstr, + required=required, + action=make_intelli_list(ltype) if range_allowed else None) + elif p.type == bool: + # For a boolean, make --XYZ and --no-XYZ options. + alt_group = pars_group.add_mutually_exclusive_group(required=required) + alt_group.add_argument(f"--{consolize_name(p.name)}", + action="store_true", + default=default, + help=helpstr, + dest=p.name) + alt_group.add_argument(f"--no-{consolize_name(p.name)}", + action="store_false", + default=default, + help=f"Opposite of --{consolize_name(p.name)}", + dest=p.name) + else: + pars_group.add_argument(f"--{consolize_name(p.name)}", + type=p.type, + default=default, + help=helpstr, + required=required) + +def extend_params(nb, extend_func_name): + """Add parameters in the first code cell by calling a function in the notebook + """ + func = get_notebook_function(nb, extend_func_name) - df = {} + if func is None: + warnings.warn( + f"Didn't find concurrency function {extend_func_name} in notebook", + RuntimeWarning + ) + return - exec(func, df) - f = df[ext_func] - import inspect - sig = inspect.signature(f) - callargs = [] - for arg in sig.parameters: - callargs.append(args[arg]) + # Make a temporary parser that won't exit if it sees -h or --help + pre_parser = make_initial_parser(add_help=False) + add_args_from_nb(nb, pre_parser, no_required=True) + known, _ = pre_parser.parse_known_args() + args = deconsolize_args(vars(known)) + + df = {} + exec(func, df) + f = df[extend_func_name] + sig = inspect.signature(f) - extention = f(*callargs) - fcc = first_code_cell(nb) - fcc["source"] += "\n" + extention - parser = make_initial_parser() - do_parse(nb, parser, False) + extension = f(*[args[p] for p in sig.parameters]) + fcc = first_code_cell(nb) + fcc["source"] += "\n" + extension def has_parm(parms, name): @@ -498,15 +484,11 @@ def flatten_list(l): def set_figure_format(nb, enable_vector_format): - """ - Set svg format in inline backend for figures + """Set svg format in inline backend for figures - If parameter 'vector_figs' is set to True svg format will + If parameter enable_vector_format is set to True, svg format will be used for figures in the notebook rendering. Subsequently vector graphics figures will be used for report. - - :param nb: jupyter notebook - :param param: value of corresponding parameter """ if enable_vector_format: @@ -543,7 +525,7 @@ def create_finalize_script(fmt_args, temp_path, job_list): ''') fmt_args['joblist'] = job_list - f_name = "{}/finalize.sh".format(temp_path) + f_name = os.path.join(temp_path, "finalize.sh") with open(f_name, "w") as finfile: finfile.write(textwrap.dedent(tmpl.render(**fmt_args))) @@ -561,7 +543,7 @@ def save_executed_command(run_tmp_path, version): :parm version: git version of the pycalibration package """ - f_name = "{}/run_calibrate.sh".format(run_tmp_path) + f_name = os.path.join(run_tmp_path, "run_calibrate.sh") with open(f_name, "w") as finfile: finfile.write(f'# pycalibration version: {version}\n') finfile.write(' '.join(sys.argv)) @@ -653,7 +635,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, new_name = "{}__{}__{}.ipynb".format( os.path.basename(base_name), cparm, suffix) - nbpath = "{}/{}".format(temp_path, new_name) + nbpath = os.path.join(temp_path, new_name) with open(nbpath, "w") as f: f.write(nbconvert.exporters.export( nbconvert.NotebookExporter, new_nb)[0]) @@ -668,14 +650,14 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, srun_base = get_launcher_command(args, temp_path, dep_jids) print(" ".join(srun_base)) - srun_base += [os.path.abspath("{}/bin/slurm_calibrate.sh".format(os.path.dirname(__file__))), # path to helper sh + srun_base += [os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), # path to helper sh os.path.abspath(nbpath), # path to notebook python_path, # path to python ipython_path, # path to ipython jupyter_path, # path to jupyter ipcluster_path, # path to ipcluster # python activate path - activate_path if activate_path!="" else "{}/bin/activate.sh".format(os.path.dirname(__file__)), + activate_path if activate_path!="" else os.path.join(PKG_DIR, "bin", "activate.sh"), args.get("cluster_profile", "NO_CLUSTER"), '"{}"'.format(base_name.upper()), '"{}"'.format(args["detector"].upper()), @@ -762,7 +744,7 @@ def make_par_table(parms, run_tmp_path): def run(): """ Run a calibration task with parser arguments """ - + parser = make_extended_parser() args = deconsolize_args(vars(parser.parse_args())) detector = args["detector"].upper() caltype = args["type"].upper() @@ -772,236 +754,213 @@ def run(): print("Not running on cluster") try: - pre_notebooks = notebooks[detector][caltype].get("pre_notebooks", []) - notebook = notebooks[detector][caltype]["notebook"] - dep_notebooks = notebooks[detector][caltype].get("dep_notebooks", []) - notebook = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebook)) - concurrency = notebooks[detector][caltype].get("concurrency", None) - + nb_info = notebooks[detector][caltype] except KeyError: print("Not one of the known calibrations or detectors") - return + return 1 + + pre_notebooks = nb_info.get("pre_notebooks", []) + notebook = nb_info["notebook"] + dep_notebooks = nb_info.get("dep_notebooks", []) + concurrency = nb_info.get("concurrency", None) if args["concurrency_par"] is not None: concurrency["parameter"] = args["concurrency_par"] - with open(notebook, "r") as f: - nb = nbformat.read(f, as_version=4) + notebook_path = os.path.join(PKG_DIR, notebook) + nb = nbformat.read(notebook_path, as_version=4) + + # extend parameters if needed + ext_func = nb_info.get("extend parms", None) + if ext_func is not None: + extend_params(nb, ext_func) + + parms = extract_parameters(nb) + + title, author, version = extract_title_author_version(nb) + + if not title: + title = "{} {} Calibration".format(detector, caltype) + if not author: + author = "anonymous" + if not version: + version = "" + + title = title.rstrip() + + run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" + + # check if concurrency parameter is given and we run concurrently + if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None: + msg = "Notebook cannot be run concurrently: no {} parameter".format( + concurrency["parameter"]) + warnings.warn(msg, RuntimeWarning) + + cluster_profile = "NO_CLUSTER" + if not has_parm(parms, "cluster_profile"): + warnings.warn("Notebook has no cluster_profile parameter, " + + "running on cluster will likely fail!", RuntimeWarning) + elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): + cluster_profile = "slurm_prof_{}".format(run_uuid) + + # create a temporary output directory to work in + run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}") + os.makedirs(run_tmp_path) + + # Write all input parameters to rst file to be included to final report + parms = parameter_values(parms, **args) + make_par_table(parms, run_tmp_path) + save_executed_command(run_tmp_path, version) + + # wait on all jobs to run and then finalize the run by creating a report from the notebooks + out_path = os.path.join( + report_path, detector.upper(), caltype.upper(), datetime.now().isoformat() + ) + if try_report_to_output: + if "out_folder" in args: + out_path = os.path.abspath(args["out_folder"]) + else: + print("No 'out_folder' defined as argument, outputting to '{}' instead.".format( + out_path)) + + os.makedirs(out_path, exist_ok=True) - # extend parameters if needed - ext_func = notebooks[detector][caltype].get("extend parms", None) - if ext_func is not None: - func = get_notebook_function(nb, ext_func) + report_to = title.replace(" ", "") + if args["report_to"] is not None: + report_to = args["report_to"] + folder = get_par_attr(parms, 'in_folder', 'value', '') + + if args["request_time"] == "Now": + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + else: + request_time = args["request_time"] + + submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + fmt_args = {'run_path': run_tmp_path, + 'out_path': out_path, + 'project': title, + 'calibration': title, + 'author': author, + 'version': version, + 'report_to': report_to, + 'in_folder': folder, + 'request_time': request_time, + 'submission_time': submission_time + } + + joblist = [] + cluster_cores = concurrency.get("cluster cores", 8) + # Check if there are pre-notebooks + for pre_notebook in pre_notebooks: + pre_notebook_path = os.path.join(PKG_DIR, pre_notebook) + lead_nb = nbformat.read(pre_notebook_path, as_version=4) + jobid = concurrent_run(run_tmp_path, lead_nb, + os.path.basename(pre_notebook_path), + args, + job_list=joblist, fmt_args=fmt_args, + cluster_cores=cluster_cores, + sequential=sequential, + cluster_profile=cluster_profile) + joblist.append(jobid) + + if concurrency.get("parameter", None) is None: + jobid = concurrent_run(run_tmp_path, nb, + os.path.basename(notebook), args, + final_job=True, job_list=joblist, + fmt_args=fmt_args, + cluster_cores=cluster_cores, + sequential=sequential, + dep_jids=joblist, + cluster_profile=cluster_profile) + joblist.append(jobid) + else: + cvar = concurrency["parameter"] + cvals = args.get(cvar, None) + + con_func = concurrency.get("use function", None) + # Consider [-1] as None + if cvals is None or cvals == [-1]: + defcval = concurrency.get("default concurrency", None) + if defcval is not None: + print(f"Concurrency parameter '{cvar}' " + f"is taken from notebooks.py") + if not isinstance(defcval, (list, tuple)): + cvals = range(defcval) + else: + cvals = defcval + + if cvals is None: + defcval = get_par_attr(parms, cvar, 'value') + if defcval is not None: + print(f"Concurrency parameter '{cvar}' " + f"is taken from '{notebook}'") + if not isinstance(defcval, (list, tuple)): + cvals = [defcval] + else: + cvals = defcval + + if con_func: + func = get_notebook_function(nb, con_func) if func is None: - warnings.warn(f"Didn't find concurrency function {ext_func} in notebook", + warnings.warn(f"Didn't find concurrency function {con_func} in notebook", RuntimeWarning) else: - # remove help calls as they will cause the argument parser to exit - known, remaining = parser.parse_known_args() - args = deconsolize_args(vars(known)) df = {} exec(func, df) - f = df[ext_func] + f = df[con_func] import inspect sig = inspect.signature(f) callargs = [] + if cvals: + # in case default needs to be used for function call + args[cvar] = cvals for arg in sig.parameters: callargs.append(args[arg]) - - extention = f(*callargs) - fcc = first_code_cell(nb) - fcc["source"] += "\n" + extention - - parms = extract_parameters(nb) - - title, author, version = extract_title_author_version(nb) - - if not title: - title = "{} {} Calibration".format(detector, caltype) - if not author: - author = "anonymous" - if not version: - version = "" - - title = title.rstrip() - - run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" - - # check if concurrency parameter is given and we run concurrently - if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None: - msg = "Notebook cannot be run concurrently: no {} parameter".format( - concurrency["parameter"]) - warnings.warn(msg, RuntimeWarning) - - cluster_profile = "NO_CLUSTER" - if not has_parm(parms, "cluster_profile"): - warnings.warn("Notebook has no cluster_profile parameter, " + - "running on cluster will likely fail!", RuntimeWarning) - elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): - cluster_profile = "slurm_prof_{}".format(run_uuid) - - # create a temporary output directory to work in - run_tmp_path = "{}/slurm_out_{}_{}_{}".format(temp_path, detector, caltype, run_uuid) - os.makedirs(run_tmp_path) - - # Write all input parameters to rst file to be included to final report - parms = parameter_values(parms, **args) - make_par_table(parms, run_tmp_path) - save_executed_command(run_tmp_path, version) - - # wait on all jobs to run and then finalize the run by creating a report from the notebooks - out_path = "{}/{}/{}/{}".format(report_path, detector.upper(), - caltype.upper(), datetime.now().isoformat()) - if try_report_to_output: - if "out_folder" in args: - out_path = os.path.abspath(args["out_folder"]) - else: - print("No 'out_folder' defined as argument, outputting to '{}' instead.".format( - out_path)) - - os.makedirs(out_path, exist_ok=True) - - report_to = title.replace(" ", "") - if args["report_to"] is not None: - report_to = args["report_to"] - - folder = get_par_attr(parms, 'in_folder', 'value', '') - - if args["request_time"] == "Now": - request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - else: - request_time = args["request_time"] - - submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - fmt_args = {'run_path': run_tmp_path, - 'out_path': out_path, - 'project': title, - 'calibration': title, - 'author': author, - 'version': version, - 'report_to': report_to, - 'in_folder': folder, - 'request_time': request_time, - 'submission_time': submission_time - } - - joblist = [] - cluster_cores = concurrency.get("cluster cores", 8) - # Check if there are pre-notebooks - for i, notebook in enumerate(pre_notebooks): - notebook_path = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebook)) - with open(notebook_path, "r") as f: - lead_nb = nbformat.read(f, as_version=4) - jobid = concurrent_run(run_tmp_path, lead_nb, - os.path.basename(notebook), - args, - job_list=joblist, fmt_args=fmt_args, - cluster_cores=cluster_cores, - sequential=sequential, - cluster_profile=cluster_profile) - joblist.append(jobid) - - if concurrency.get("parameter", None) is None: - jobid = concurrent_run(run_tmp_path, nb, - os.path.basename(notebook), args, - final_job=True, job_list=joblist, - fmt_args=fmt_args, + cvals = f(*callargs) + print(f"Split concurrency into {cvals}") + + # get expected type + cvtype = get_par_attr(parms, cvar, 'type', list) + cvals = remove_duplications(cvals) + + jlist = [] + for cnum, cval in enumerate(cvals): + show_title = cnum == 0 + # Job is not final if there are dependent notebooks + final_job = (cnum == len(list(cvals)) - 1 and len(dep_notebooks) == 0) + cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval + + jobid = concurrent_run(run_tmp_path, nb, notebook, args, + cvar, cval, final_job, + jlist, fmt_args, cluster_cores=cluster_cores, sequential=sequential, + show_title=show_title, dep_jids=joblist, cluster_profile=cluster_profile) - joblist.append(jobid) - else: - cvar = concurrency["parameter"] - cvals = args.get(cvar, None) - - con_func = concurrency.get("use function", None) - # Consider [-1] as None - if cvals is None or cvals == [-1]: - defcval = concurrency.get("default concurrency", None) - if defcval is not None: - print(f"Concurrency parameter '{cvar}' " - f"is taken from notebooks.py") - if not isinstance(defcval, (list, tuple)): - cvals = range(defcval) - else: - cvals = defcval - - if cvals is None: - defcval = get_par_attr(parms, cvar, 'value') - if defcval is not None: - print(f"Concurrency parameter '{cvar}' " - f"is taken from '{notebook}'") - if not isinstance(defcval, (list, tuple)): - cvals = [defcval] - else: - cvals = defcval - - if con_func: - func = get_notebook_function(nb, con_func) - if func is None: - warnings.warn(f"Didn't find concurrency function {con_func} in notebook", - RuntimeWarning) - else: - df = {} - exec(func, df) - f = df[con_func] - import inspect - sig = inspect.signature(f) - callargs = [] - if cvals: - # in case default needs to be used for function call - args[cvar] = cvals - for arg in sig.parameters: - callargs.append(args[arg]) - cvals = f(*callargs) - print(f"Split concurrency into {cvals}") - - # get expected type - cvtype = get_par_attr(parms, cvar, 'type', list) - cvals = remove_duplications(cvals) - - jlist = [] - for cnum, cval in enumerate(cvals): - show_title = cnum == 0 - # Job is not final if there are dependent notebooks - final_job = (cnum == len(list(cvals)) - 1 and len(dep_notebooks) == 0) - cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval - - jobid = concurrent_run(run_tmp_path, nb, notebook, args, - cvar, cval, final_job, - jlist, fmt_args, - cluster_cores=cluster_cores, - sequential=sequential, - show_title=show_title, - dep_jids=joblist, - cluster_profile=cluster_profile) - jlist.append(jobid) - joblist.extend(jlist) - # Run dependent notebooks - for i, notebook in enumerate(dep_notebooks): - notebook_path = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebook)) - with open(notebook_path, "r") as f: - nb = nbformat.read(f, as_version=4) - final_job = i == len(dep_notebooks) - 1 - jobid = concurrent_run(run_tmp_path, nb, - os.path.basename(notebook), - args, - dep_jids=joblist, - final_job=final_job, - job_list=joblist, fmt_args=fmt_args, - cluster_cores=cluster_cores, - sequential=sequential, - cluster_profile=cluster_profile) - joblist.append(jobid) - - if not all([j is None for j in joblist]): - print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) + jlist.append(jobid) + joblist.extend(jlist) + + # Run dependent notebooks (e.g. summaries after correction) + for i, dep_notebook in enumerate(dep_notebooks): + dep_notebook_path = os.path.join(PKG_DIR, dep_notebook) + dep_nb = nbformat.read(dep_notebook_path, as_version=4) + final_job = i == len(dep_notebooks) - 1 + jobid = concurrent_run(run_tmp_path, dep_nb, + os.path.basename(dep_notebook_path), + args, + dep_jids=joblist, + final_job=final_job, + job_list=joblist, fmt_args=fmt_args, + cluster_cores=cluster_cores, + sequential=sequential, + cluster_profile=cluster_profile) + joblist.append(jobid) + + if not all([j is None for j in joblist]): + print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) if __name__ == "__main__": - run() + sys.exit(run()) diff --git a/xfel_calibrate/finalize.py b/xfel_calibrate/finalize.py index 695eb293b710c4150934c6738a1c07201b3f6cf8..53bc72912ff3cbe105f9732fa0b7b78c8dd4d596 100644 --- a/xfel_calibrate/finalize.py +++ b/xfel_calibrate/finalize.py @@ -263,15 +263,18 @@ def make_report(run_path, tmp_path, out_path, project, author, version, lead_rstfiles = ['InputParameters.rst', 'timing_summary.rst'] # Order rst files based on the known order(lead_rstfiles). - for f in direntries: + # TODO: fix order somewhere else instead of munging filenames + def sort_key(f): if f in lead_rstfiles: - direntries.insert(lead_rstfiles.index(f), - direntries.pop(direntries.index(f))) - # Move summary to the top, if it is exists, - # after the known leading rst files. - if "summary" in f.lower() and f not in lead_rstfiles: - direntries.insert(len(lead_rstfiles), - direntries.pop(direntries.index(f))) + return lead_rstfiles.index(f), f + elif "summary" in f.lower(): + return len(lead_rstfiles), f + elif "precorrection" in f.lower(): + return len(lead_rstfiles) + 1, f + else: + return len(lead_rstfiles) + 2, f + direntries.sort(key=sort_key) + files_to_handle = [] for entry in direntries: if isfile("{}/{}".format(run_path, entry)): diff --git a/xfel_calibrate/settings.py b/xfel_calibrate/settings.py index fdf56f40ca519b4e7e16ec60a2558ba62de3aaf9..cbe4c21b66bca70ed8db0d5902a7ca4e15c5365b 100644 --- a/xfel_calibrate/settings.py +++ b/xfel_calibrate/settings.py @@ -1,7 +1,7 @@ import os # path into which temporary files from each run are placed -temp_path = "{}/temp/".format(os.getcwd()) +temp_path = os.path.abspath("temp/") # Path to use for calling Python. If the environment is correctly set, simply the command python_path = "python"