diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index b8d5a496e8d8e130ba0f525a6eb61f1e71dc8c43..c6f9983ea378ac9c34d11f56b391a4be6de8b8e7 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -22,6 +22,8 @@ import textwrap from .finalize import tex_escape +PKG_DIR = os.path.dirname(os.path.abspath(__file__)) + # Add a class combining raw description formatting with # Metavariable default outputs @@ -775,8 +777,7 @@ def run(): pre_notebooks = notebooks[detector][caltype].get("pre_notebooks", []) notebook = notebooks[detector][caltype]["notebook"] dep_notebooks = notebooks[detector][caltype].get("dep_notebooks", []) - notebook = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebook)) + notebook_path = os.path.join(PKG_DIR, notebook) concurrency = notebooks[detector][caltype].get("concurrency", None) except KeyError: @@ -786,221 +787,217 @@ def run(): if args["concurrency_par"] is not None: concurrency["parameter"] = args["concurrency_par"] - with open(notebook, "r") as f: - nb = nbformat.read(f, as_version=4) + nb = nbformat.read(notebook_path, as_version=4) - # extend parameters if needed - ext_func = notebooks[detector][caltype].get("extend parms", None) - if ext_func is not None: - func = get_notebook_function(nb, ext_func) + # extend parameters if needed + ext_func = notebooks[detector][caltype].get("extend parms", None) + if ext_func is not None: + func = get_notebook_function(nb, ext_func) + + if func is None: + warnings.warn(f"Didn't find concurrency function {ext_func} in notebook", + RuntimeWarning) + else: + # remove help calls as they will cause the argument parser to exit + known, remaining = parser.parse_known_args() + args = deconsolize_args(vars(known)) + df = {} + exec(func, df) + f = df[ext_func] + import inspect + sig = inspect.signature(f) + callargs = [] + for arg in sig.parameters: + callargs.append(args[arg]) + + extention = f(*callargs) + fcc = first_code_cell(nb) + fcc["source"] += "\n" + extention + + parms = extract_parameters(nb) + + title, author, version = extract_title_author_version(nb) + + if not title: + title = "{} {} Calibration".format(detector, caltype) + if not author: + author = "anonymous" + if not version: + version = "" + + title = title.rstrip() + + run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" + + # check if concurrency parameter is given and we run concurrently + if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None: + msg = "Notebook cannot be run concurrently: no {} parameter".format( + concurrency["parameter"]) + warnings.warn(msg, RuntimeWarning) + + cluster_profile = "NO_CLUSTER" + if not has_parm(parms, "cluster_profile"): + warnings.warn("Notebook has no cluster_profile parameter, " + + "running on cluster will likely fail!", RuntimeWarning) + elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): + cluster_profile = "slurm_prof_{}".format(run_uuid) + + # create a temporary output directory to work in + run_tmp_path = "{}/slurm_out_{}_{}_{}".format(temp_path, detector, caltype, run_uuid) + os.makedirs(run_tmp_path) + + # Write all input parameters to rst file to be included to final report + parms = parameter_values(parms, **args) + make_par_table(parms, run_tmp_path) + save_executed_command(run_tmp_path, version) + + # wait on all jobs to run and then finalize the run by creating a report from the notebooks + out_path = "{}/{}/{}/{}".format(report_path, detector.upper(), + caltype.upper(), datetime.now().isoformat()) + if try_report_to_output: + if "out_folder" in args: + out_path = os.path.abspath(args["out_folder"]) + else: + print("No 'out_folder' defined as argument, outputting to '{}' instead.".format( + out_path)) + + os.makedirs(out_path, exist_ok=True) + + report_to = title.replace(" ", "") + if args["report_to"] is not None: + report_to = args["report_to"] + + folder = get_par_attr(parms, 'in_folder', 'value', '') + + if args["request_time"] == "Now": + request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + else: + request_time = args["request_time"] + + submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') + fmt_args = {'run_path': run_tmp_path, + 'out_path': out_path, + 'project': title, + 'calibration': title, + 'author': author, + 'version': version, + 'report_to': report_to, + 'in_folder': folder, + 'request_time': request_time, + 'submission_time': submission_time + } + + joblist = [] + cluster_cores = concurrency.get("cluster cores", 8) + # Check if there are pre-notebooks + for pre_notebook in pre_notebooks: + pre_notebook_path = os.path.join(PKG_DIR, pre_notebook) + lead_nb = nbformat.read(pre_notebook_path, as_version=4) + jobid = concurrent_run(run_tmp_path, lead_nb, + os.path.basename(pre_notebook_path), + args, + job_list=joblist, fmt_args=fmt_args, + cluster_cores=cluster_cores, + sequential=sequential, + cluster_profile=cluster_profile) + joblist.append(jobid) + + if concurrency.get("parameter", None) is None: + jobid = concurrent_run(run_tmp_path, nb, + os.path.basename(notebook), args, + final_job=True, job_list=joblist, + fmt_args=fmt_args, + cluster_cores=cluster_cores, + sequential=sequential, + dep_jids=joblist, + cluster_profile=cluster_profile) + joblist.append(jobid) + else: + cvar = concurrency["parameter"] + cvals = args.get(cvar, None) + + con_func = concurrency.get("use function", None) + # Consider [-1] as None + if cvals is None or cvals == [-1]: + defcval = concurrency.get("default concurrency", None) + if defcval is not None: + print(f"Concurrency parameter '{cvar}' " + f"is taken from notebooks.py") + if not isinstance(defcval, (list, tuple)): + cvals = range(defcval) + else: + cvals = defcval + + if cvals is None: + defcval = get_par_attr(parms, cvar, 'value') + if defcval is not None: + print(f"Concurrency parameter '{cvar}' " + f"is taken from '{notebook}'") + if not isinstance(defcval, (list, tuple)): + cvals = [defcval] + else: + cvals = defcval + if con_func: + func = get_notebook_function(nb, con_func) if func is None: - warnings.warn(f"Didn't find concurrency function {ext_func} in notebook", + warnings.warn(f"Didn't find concurrency function {con_func} in notebook", RuntimeWarning) else: - # remove help calls as they will cause the argument parser to exit - known, remaining = parser.parse_known_args() - args = deconsolize_args(vars(known)) df = {} exec(func, df) - f = df[ext_func] + f = df[con_func] import inspect sig = inspect.signature(f) callargs = [] + if cvals: + # in case default needs to be used for function call + args[cvar] = cvals for arg in sig.parameters: callargs.append(args[arg]) - - extention = f(*callargs) - fcc = first_code_cell(nb) - fcc["source"] += "\n" + extention - - parms = extract_parameters(nb) - - title, author, version = extract_title_author_version(nb) - - if not title: - title = "{} {} Calibration".format(detector, caltype) - if not author: - author = "anonymous" - if not version: - version = "" - - title = title.rstrip() - - run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" - - # check if concurrency parameter is given and we run concurrently - if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None: - msg = "Notebook cannot be run concurrently: no {} parameter".format( - concurrency["parameter"]) - warnings.warn(msg, RuntimeWarning) - - cluster_profile = "NO_CLUSTER" - if not has_parm(parms, "cluster_profile"): - warnings.warn("Notebook has no cluster_profile parameter, " + - "running on cluster will likely fail!", RuntimeWarning) - elif "cluster_profile" not in args or args["cluster_profile"] == parser.get_default('cluster_profile'): - cluster_profile = "slurm_prof_{}".format(run_uuid) - - # create a temporary output directory to work in - run_tmp_path = "{}/slurm_out_{}_{}_{}".format(temp_path, detector, caltype, run_uuid) - os.makedirs(run_tmp_path) - - # Write all input parameters to rst file to be included to final report - parms = parameter_values(parms, **args) - make_par_table(parms, run_tmp_path) - save_executed_command(run_tmp_path, version) - - # wait on all jobs to run and then finalize the run by creating a report from the notebooks - out_path = "{}/{}/{}/{}".format(report_path, detector.upper(), - caltype.upper(), datetime.now().isoformat()) - if try_report_to_output: - if "out_folder" in args: - out_path = os.path.abspath(args["out_folder"]) - else: - print("No 'out_folder' defined as argument, outputting to '{}' instead.".format( - out_path)) - - os.makedirs(out_path, exist_ok=True) - - report_to = title.replace(" ", "") - if args["report_to"] is not None: - report_to = args["report_to"] - - folder = get_par_attr(parms, 'in_folder', 'value', '') - - if args["request_time"] == "Now": - request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - else: - request_time = args["request_time"] - - submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') - fmt_args = {'run_path': run_tmp_path, - 'out_path': out_path, - 'project': title, - 'calibration': title, - 'author': author, - 'version': version, - 'report_to': report_to, - 'in_folder': folder, - 'request_time': request_time, - 'submission_time': submission_time - } - - joblist = [] - cluster_cores = concurrency.get("cluster cores", 8) - # Check if there are pre-notebooks - for i, rel_path in enumerate(pre_notebooks): - notebook_path = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), rel_path)) - with open(notebook_path, "r") as f: - lead_nb = nbformat.read(f, as_version=4) - jobid = concurrent_run(run_tmp_path, lead_nb, - os.path.basename(notebook_path), - args, - job_list=joblist, fmt_args=fmt_args, - cluster_cores=cluster_cores, - sequential=sequential, - cluster_profile=cluster_profile) - joblist.append(jobid) - - if concurrency.get("parameter", None) is None: - jobid = concurrent_run(run_tmp_path, nb, - os.path.basename(notebook), args, - final_job=True, job_list=joblist, - fmt_args=fmt_args, + cvals = f(*callargs) + print(f"Split concurrency into {cvals}") + + # get expected type + cvtype = get_par_attr(parms, cvar, 'type', list) + cvals = remove_duplications(cvals) + + jlist = [] + for cnum, cval in enumerate(cvals): + show_title = cnum == 0 + # Job is not final if there are dependent notebooks + final_job = (cnum == len(list(cvals)) - 1 and len(dep_notebooks) == 0) + cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval + + jobid = concurrent_run(run_tmp_path, nb, notebook, args, + cvar, cval, final_job, + jlist, fmt_args, cluster_cores=cluster_cores, sequential=sequential, + show_title=show_title, dep_jids=joblist, cluster_profile=cluster_profile) - joblist.append(jobid) - else: - cvar = concurrency["parameter"] - cvals = args.get(cvar, None) - - con_func = concurrency.get("use function", None) - # Consider [-1] as None - if cvals is None or cvals == [-1]: - defcval = concurrency.get("default concurrency", None) - if defcval is not None: - print(f"Concurrency parameter '{cvar}' " - f"is taken from notebooks.py") - if not isinstance(defcval, (list, tuple)): - cvals = range(defcval) - else: - cvals = defcval - - if cvals is None: - defcval = get_par_attr(parms, cvar, 'value') - if defcval is not None: - print(f"Concurrency parameter '{cvar}' " - f"is taken from '{notebook}'") - if not isinstance(defcval, (list, tuple)): - cvals = [defcval] - else: - cvals = defcval - - if con_func: - func = get_notebook_function(nb, con_func) - if func is None: - warnings.warn(f"Didn't find concurrency function {con_func} in notebook", - RuntimeWarning) - else: - df = {} - exec(func, df) - f = df[con_func] - import inspect - sig = inspect.signature(f) - callargs = [] - if cvals: - # in case default needs to be used for function call - args[cvar] = cvals - for arg in sig.parameters: - callargs.append(args[arg]) - cvals = f(*callargs) - print(f"Split concurrency into {cvals}") - - # get expected type - cvtype = get_par_attr(parms, cvar, 'type', list) - cvals = remove_duplications(cvals) - - jlist = [] - for cnum, cval in enumerate(cvals): - show_title = cnum == 0 - # Job is not final if there are dependent notebooks - final_job = (cnum == len(list(cvals)) - 1 and len(dep_notebooks) == 0) - cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval - - jobid = concurrent_run(run_tmp_path, nb, notebook, args, - cvar, cval, final_job, - jlist, fmt_args, - cluster_cores=cluster_cores, - sequential=sequential, - show_title=show_title, - dep_jids=joblist, - cluster_profile=cluster_profile) - jlist.append(jobid) - joblist.extend(jlist) - # Run dependent notebooks - for i, notebook in enumerate(dep_notebooks): - notebook_path = os.path.abspath( - "{}/{}".format(os.path.dirname(__file__), notebook)) - with open(notebook_path, "r") as f: - nb = nbformat.read(f, as_version=4) - final_job = i == len(dep_notebooks) - 1 - jobid = concurrent_run(run_tmp_path, nb, - os.path.basename(notebook), - args, - dep_jids=joblist, - final_job=final_job, - job_list=joblist, fmt_args=fmt_args, - cluster_cores=cluster_cores, - sequential=sequential, - cluster_profile=cluster_profile) - joblist.append(jobid) - - if not all([j is None for j in joblist]): - print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) + jlist.append(jobid) + joblist.extend(jlist) + + # Run dependent notebooks (e.g. summaries after correction) + for i, dep_notebook in enumerate(dep_notebooks): + dep_notebook_path = os.path.join(PKG_DIR, dep_notebook) + dep_nb = nbformat.read(dep_notebook_path, as_version=4) + final_job = i == len(dep_notebooks) - 1 + jobid = concurrent_run(run_tmp_path, dep_nb, + os.path.basename(dep_notebook_path), + args, + dep_jids=joblist, + final_job=final_job, + job_list=joblist, fmt_args=fmt_args, + cluster_cores=cluster_cores, + sequential=sequential, + cluster_profile=cluster_profile) + joblist.append(jobid) + + if not all([j is None for j in joblist]): + print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) if __name__ == "__main__":