diff --git a/setup.py b/setup.py index 98ed04e9229610571fb2e6a351dda0bb6a683902..0fb4508df9e91921f3c1500f34c8ed23c948bb45 100644 --- a/setup.py +++ b/setup.py @@ -140,7 +140,7 @@ setup( "pre-commit", ], }, - python_requires=">=3.6", + python_requires=">=3.8", classifiers=[ "Development Status :: 5 - Production/Stable", "Environment :: Console", diff --git a/src/xfel_calibrate/calibrate.py b/src/xfel_calibrate/calibrate.py index b44252144dbc8d59654534c226d153a226668959..26e45f0fe1e7e439f85a166bb92c8e97df86a27b 100755 --- a/src/xfel_calibrate/calibrate.py +++ b/src/xfel_calibrate/calibrate.py @@ -1,34 +1,37 @@ #!/usr/bin/env python -import argparse -import inspect import json import locale import math import os -import pprint import re +import shlex import shutil import stat -import string import sys import textwrap import warnings from datetime import datetime from pathlib import Path -from subprocess import DEVNULL, check_output, check_call, call +from subprocess import DEVNULL, call, check_call, check_output from typing import List, Union import nbformat import numpy as np +import yaml from jinja2 import Template from nbparameterise import extract_parameters, parameter_values, replace_definitions -import yaml import cal_tools.tools from .finalize import tex_escape -from .notebooks import notebooks +from .nb_args import ( + consolize_name, + first_markdown_cell, + get_notebook_function, + parse_argv_and_load_nb, + set_figure_format, +) from .settings import ( default_report_path, finalize_time_limit, @@ -47,143 +50,6 @@ from .settings import ( PKG_DIR = os.path.dirname(os.path.abspath(__file__)) -# Add a class combining raw description formatting with -# Metavariable default outputs -class RawTypeFormatter(argparse.RawDescriptionHelpFormatter, - argparse.MetavarTypeHelpFormatter, - argparse.ArgumentDefaultsHelpFormatter): - pass - - -# The argument parser for calibrate.py, will be extended depending -# on the options given. - -def make_initial_parser(**kwargs): - parser = argparse.ArgumentParser( - description="Main entry point for offline calibration", - formatter_class=RawTypeFormatter, - **kwargs - ) - - parser.add_argument('detector', metavar='DETECTOR', type=str, - help='The detector to calibrate: ' + ", ".join(notebooks)) - - parser.add_argument('type', metavar='TYPE', type=str, - help='Type of calibration.') - - parser.add_argument('--no-cluster-job', - action="store_true", - default=False, - help="Do not run as a cluster job") - - parser.add_argument('--prepare-only', action="store_true", - help="Prepare notebooks but don't run them") - - parser.add_argument('--report-to', type=str, - help='Filename (and optionally path) for output' - ' report') - - parser.add_argument('--not-reproducible', action='store_true', - help='Disable checks to allow the processing result ' - 'to not be reproducible based on its metadata.') - - parser.add_argument('--skip-report', action='store_true', - help='Skip report generation in finalize step.') - - parser.add_argument('--skip-env-freeze', action='store_true', - help='Skip recording the Python environment for ' - 'reproducibility purposes, requires ' - '--not-reproducible to run.') - - parser.add_argument('--concurrency-par', type=str, - help='Name of concurrency parameter.' - 'If not given, it is taken from configuration.') - - parser.add_argument('--constants-from', type=str, help=( - "Path to a calibration-metadata.yml file. If given, " - "retrieved-constants will be copied to use for a new correction." - )) - - parser.add_argument('--priority', type=int, default=2, - help="Priority of batch jobs. If priority<=1, reserved" - " nodes become available.") - - parser.add_argument('--vector-figs', action="store_true", default=False, - help="Use vector graphics for figures in the report.") - - parser.add_argument('--slurm-mem', type=int, default=500, - help="Requested node RAM in GB") - - parser.add_argument('--slurm-name', type=str, default='xfel_calibrate', - help='Name of slurm job') - - parser.add_argument('--slurm-scheduling', type=int, default=0, - help='Change scheduling priority for a slurm job ' - '+- 2147483645 (negative value increases ' - 'priority)') - - parser.add_argument('--request-time', type=str, default='Now', - help='Time of request to process notebook. Iso format') - - parser.add_argument_group('required arguments') - - parser.add_argument('--slurm-partition', type=str, default="", - help="Submit jobs in this Slurm partition") - - parser.add_argument('--reservation', type=str, default="", - help="Submit jobs in this Slurm reservation, " - "overriding --slurm-partition if both are set") - - return parser - - -# Helper functions for parser extensions - -def make_intelli_list(ltype): - """ Parses a list from range and comma expressions. - - An expression of the form "1-5,6" will be parsed into the following - list: [1,2,3,4,6] - - """ - class IntelliListAction(argparse.Action): - element_type = ltype - - def __init__(self, *args, **kwargs): - super(IntelliListAction, self).__init__(*args, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - parsed_values = [] - values = ",".join(values) - if isinstance(values, str): - for rcomp in values.split(","): - if "-" in rcomp: - start, end = rcomp.split("-") - parsed_values += list(range(int(start), int(end))) - else: - parsed_values += [int(rcomp)] - elif isinstance(values, (list, tuple)): - parsed_values = values - else: - parsed_values = [values, ] - - parsed_values = [self.element_type(p) for p in parsed_values] - print("Parsed input {} to {}".format(values, parsed_values)) - setattr(namespace, self.dest, parsed_values) - - return IntelliListAction - - -def consolize_name(name): - """ Names of console parameters don't have underscores """ - return name.replace("_", "-") - - -def deconsolize_args(args): - """ Variable names have underscores """ - return {k.replace("-", "_"): v for k, v in args.items()} - - def extract_title_author(nb): """ Tries to extract title, author from markdown. @@ -191,6 +57,8 @@ def extract_title_author(nb): """ first_md = first_markdown_cell(nb) + if first_md is None: + return None, None source = first_md["source"] title = re.findall(r'#+\s*(.*)\s*#+', source) author = re.findall( @@ -226,86 +94,6 @@ def get_python_version(python_exe): return check_output([python_exe, '--version']).decode('utf-8').split()[1] -def get_cell_n(nb, cell_type, cell_n): - """ - Return notebook cell with given number and given type - - :param nb: jupyter notebook - :param cell_type: cell type, 'code' or 'markdown' - :param cell_n: cell number (count from 0) - :return: notebook cell - """ - counter = 0 - for cell in nb.cells: - if cell.cell_type == cell_type: - if counter == cell_n: - return cell - counter += 1 - - -def first_code_cell(nb): - """ Return the first code cell of a notebook """ - return get_cell_n(nb, 'code', 0) - - -def first_markdown_cell(nb): - """ Return the first markdown cell of a notebook """ - return get_cell_n(nb, 'markdown', 0) - - -def make_epilog(nb, caltype=None): - """ Make an epilog from the notebook to add to parser help - """ - msg = "" - header_cell = first_markdown_cell(nb) - lines = header_cell.source.split("\n") - if caltype: - msg += "{:<15} {}".format(caltype, lines[0]) + "\n" - else: - msg += "{}".format(lines[0]) + "\n" - pp = pprint.PrettyPrinter(indent=(17 if caltype else 0)) - if len(lines[1:]): - plines = pp.pformat(lines[1:])[1:-1].split("\n") - for line in plines: - sline = line.replace("'", "", 1) - sline = sline.replace("', '", " " * (17 if caltype else 0), 1) - sline = sline[::-1].replace("'", "", 1)[::-1] - sline = sline.replace(" ,", " ") - if len(sline) > 1 and sline[0] == ",": - sline = sline[1:] - msg += sline + "\n" - msg += "\n" - return msg - - -def get_notebook_function(nb, fname): - flines = [] - def_found = False - indent = None - for cell in nb.cells: - if cell.cell_type == 'code': - lines = cell.source.split("\n") - for line in lines: - - if def_found: - lin = len(line) - len(line.lstrip()) - if indent is None: - if lin != 0: - indent = lin - flines.append(line) - elif lin >= indent: - flines.append(line) - else: - return "\n".join(flines) - - if re.search(r"def\s+{}\(.*\):\s*".format(fname), line) and not def_found: - # print("Found {} in line {}".format(fname, line)) - # set this to indent level - def_found = True - flines.append(line) - return None - - def balance_sequences(in_folder: str, run: int, sequences: List[int], sequences_per_node: int, karabo_da: Union[list, str], max_nodes: int = 8): @@ -367,189 +155,6 @@ def balance_sequences(in_folder: str, run: int, sequences: List[int], if l.size > 0] -def make_extended_parser() -> argparse.ArgumentParser: - """Create an ArgumentParser using information from the notebooks""" - - # extend the parser according to user input - # the first case is if a detector was given, but no calibration type - if len(sys.argv) == 3 and "-h" in sys.argv[2]: - detector = sys.argv[1].upper() - try: - det_notebooks = notebooks[detector] - except KeyError: - # TODO: This should really go to stderr not stdout - print("Not one of the known detectors: {}".format(notebooks.keys())) - sys.exit(1) - - msg = "Options for detector {}\n".format(detector) - msg += "*" * len(msg) + "\n\n" - - # basically, this creates help in the form of - # - # TYPE some description that is - # indented for this type. - # - # The information is extracted from the first markdown cell of - # the notebook. - for caltype, notebook in det_notebooks.items(): - if notebook.get("notebook") is None: - if notebook.get("user", {}).get("notebook") is None: - raise KeyError( - f"`{detector}` does not have a notebook path, for " - "notebooks that are stored in pycalibration set the " - "`notebook` key to a relative path or set the " - "`['user']['notebook']` key to an absolute path/path " - "pattern. Notebook configuration dictionary contains " - f"only: `{notebook}`" - "" - ) - # Everything should be indented by 17 spaces - msg += caltype.ljust(17) + "User defined notebook, arguments may vary\n" - msg += " "*17 + "User notebook expected to be at path:\n" - msg += " "*17 + notebook["user"]["notebook"] + "\n" - else: - nbpath = os.path.join(PKG_DIR, notebook["notebook"]) - nb = nbformat.read(nbpath, as_version=4) - msg += make_epilog(nb, caltype=caltype) - - return make_initial_parser(epilog=msg) - elif len(sys.argv) <= 3: - return make_initial_parser() - - # A detector and type was given. We derive the arguments - # from the corresponding notebook - args, _ = make_initial_parser(add_help=False).parse_known_args() - try: - nb_info = notebooks[args.detector.upper()][args.type.upper()] - except KeyError: - print("Not one of the known calibrations or detectors") - sys.exit(1) - - if nb_info["notebook"]: - notebook = os.path.join(PKG_DIR, nb_info["notebook"]) - else: - # If `"notebook"` entry is None, then set it to the user provided - # notebook TODO: This is a very hacky workaround, better implementation - # is not really possible with the current state of this module - user_notebook_path = nb_info["user"]["notebook"] - # Pull out the variables in the templated path string, and get values - # from command line args (e.g. --proposal 1234 -> {proposal}) - user_notebook_variables = [ - name for (_, name, _, _) in string.Formatter().parse(user_notebook_path) - if name is not None - ] - - user_notebook_parser = argparse.ArgumentParser(add_help=False) - for var in user_notebook_variables: - user_notebook_parser.add_argument(f"--{var}") - - user_notebook_args, _ = user_notebook_parser.parse_known_args() - - nb_info["notebook"] = user_notebook_path.format(**vars(user_notebook_args)) - notebook = nb_info["notebook"] - - cvar = nb_info.get("concurrency", {}).get("parameter", None) - - nb = nbformat.read(notebook, as_version=4) - - # extend parameters if needed - ext_func = nb_info.get("extend parms", None) - if ext_func is not None: - extend_params(nb, ext_func) - - # No extend parms function - add statically defined parameters from the - # first code cell - parser = make_initial_parser() - add_args_from_nb(nb, parser, cvar=cvar) - return parser - -def add_args_from_nb(nb, parser, cvar=None, no_required=False): - """Add argparse arguments for parameters in the first cell of a notebook. - - Uses nbparameterise to extract the parameter information. Each foo_bar - parameter gets a --foo-bar command line option. - Boolean parameters get a pair of flags like --abc and --no-abc. - - :param nb: NotebookNode object representing a loaded .ipynb file - :param parser: argparse.ArgumentParser instance - :param str cvar: Name of the concurrency parameter. - :param bool no_required: If True, none of the added options are required. - """ - parser.description = make_epilog(nb) - parms = extract_parameters(nb, lang='python') - - for p in parms: - helpstr = ("Default: %(default)s" if not p.comment - else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip())) - required = (p.comment is not None - and "required" in p.comment - and not no_required - and p.name != cvar) - - # This may be not a public API - # May require reprogramming in case of argparse updates - pars_group = parser._action_groups[2 if required else 1] - - default = p.value if (not required) else None - - if issubclass(p.type, list) or p.name == cvar: - ltype = type(p.value[0]) if issubclass(p.type, list) else p.type - range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False - pars_group.add_argument(f"--{consolize_name(p.name)}", - nargs='+', - type=ltype if not range_allowed else str, - default=default, - help=helpstr, - required=required, - action=make_intelli_list(ltype) if range_allowed else None) - elif issubclass(p.type, bool): - # For a boolean, make --XYZ and --no-XYZ options. - alt_group = pars_group.add_mutually_exclusive_group(required=required) - alt_group.add_argument(f"--{consolize_name(p.name)}", - action="store_true", - default=default, - help=helpstr, - dest=p.name) - alt_group.add_argument(f"--no-{consolize_name(p.name)}", - action="store_false", - default=default, - help=f"Opposite of --{consolize_name(p.name)}", - dest=p.name) - else: - pars_group.add_argument(f"--{consolize_name(p.name)}", - type=p.type, - default=default, - help=helpstr, - required=required) - -def extend_params(nb, extend_func_name): - """Add parameters in the first code cell by calling a function in the notebook - """ - func = get_notebook_function(nb, extend_func_name) - - if func is None: - warnings.warn( - f"Didn't find concurrency function {extend_func_name} in notebook", - RuntimeWarning - ) - return - - # Make a temporary parser that won't exit if it sees -h or --help - pre_parser = make_initial_parser(add_help=False) - add_args_from_nb(nb, pre_parser, no_required=True) - known, _ = pre_parser.parse_known_args() - args = deconsolize_args(vars(known)) - - df = {} - exec(func, df) - f = df[extend_func_name] - sig = inspect.signature(f) - - extension = f(*[args[p] for p in sig.parameters]) - fcc = first_code_cell(nb) - fcc["source"] += "\n" + extension - - def get_par_attr(parms, key, attr, default=None): """ Return the type of parameter with name key @@ -581,19 +186,6 @@ def flatten_list(l): return '' -def set_figure_format(nb, enable_vector_format): - """Set svg format in inline backend for figures - - If parameter enable_vector_format is set to True, svg format will - be used for figures in the notebook rendering. Subsequently vector - graphics figures will be used for report. - """ - - if enable_vector_format: - cell = get_cell_n(nb, 'code', 1) - cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n" - - def create_finalize_script(fmt_args, temp_path, job_list) -> str: """ Create a finalize script to produce output report @@ -667,7 +259,7 @@ def run_finalize(fmt_args, temp_path, job_list, sequential=False): return jobid -def save_executed_command(run_tmp_path, version): +def save_executed_command(run_tmp_path, version, argv): """ Create a file with string used to execute `xfel_calibrate` @@ -678,7 +270,7 @@ def save_executed_command(run_tmp_path, version): f_name = os.path.join(run_tmp_path, "run_calibrate.sh") with open(f_name, "w") as finfile: finfile.write(f'# pycalibration version: {version}\n') - finfile.write(' '.join(sys.argv)) + finfile.write(shlex.join(argv)) class SlurmOptions: @@ -785,7 +377,9 @@ def prepare_job( params = parameter_values(params, cluster_profile=cluster_profile) new_nb = replace_definitions(nb, params, execute=False, lang='python') if not show_title: - first_markdown_cell(new_nb).source = '' + title_cell = first_markdown_cell(new_nb) + if title_cell is not None: + title_cell.source = '' set_figure_format(new_nb, args["vector_figs"]) new_name = f"{nb_path.stem}__{cparm}__{suffix}.ipynb" @@ -979,57 +573,17 @@ def make_par_table(parms, run_tmp_path: str): finfile.write(textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))) -def run(): +def run(argv=None): """ Run a calibration task with parser arguments """ # Ensure files are opened as UTF-8 by default, regardless of environment. locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8')) - parser = make_extended_parser() - args = deconsolize_args(vars(parser.parse_args())) - detector = args["detector"].upper() - caltype = args["type"].upper() - sequential = args["no_cluster_job"] - - # Pick out any arguments that may prevent reproducibility from - # working, sorted alphabetically and converted back to their - # canonical representation. - not_reproducible_args = sorted( - ('--' + x.replace('_', '-') - for x in ['skip_env_freeze'] - if args[x])) - - # If any of these arguments are set, present a warning. - if not_reproducible_args: - print('WARNING: One or more command line arguments ({}) may prevent ' - 'this specific correction result from being reproducible based ' - 'on its metadata. It may not be possible to restore identical ' - 'output data files when they have been deleted or lost. Please ' - 'ensure that the data retention policy of the chosen storage ' - 'location is sufficient for your ' - 'needs.'.format(', '.join(not_reproducible_args))) - - if not args['not_reproducible']: - # If not explicitly specified that reproducibility may be - # broken, remind the user and exit. - print('To proceed, you can explicitly allow reproducibility to ' - 'be broken by adding --not-reproducible') - sys.exit(1) - - reproducible = False - else: - reproducible = True - - try: - nb_info = notebooks[detector][caltype] - except KeyError: - print("Not one of the known calibrations or detectors") - return 1 + if argv is None: + argv = sys.argv - pre_notebooks = nb_info.get("pre_notebooks", []) - notebook = nb_info["notebook"] - dep_notebooks = nb_info.get("dep_notebooks", []) - concurrency = nb_info.get("concurrency", {'parameter': None}) + args, nb_details = parse_argv_and_load_nb(argv) + concurrency = nb_details.concurrency concurrency_par = args["concurrency_par"] or concurrency['parameter'] if concurrency_par == concurrency['parameter']: # Use the defaults from notebook.py to split the work into several jobs @@ -1040,22 +594,14 @@ def run(): # don't use the associated settings from there. concurrency_defval = concurrency_func = None - - notebook_path = Path(PKG_DIR, notebook) - nb = nbformat.read(notebook_path, as_version=4) - - # extend parameters if needed - ext_func = nb_info.get("extend parms", None) - if ext_func is not None: - extend_params(nb, ext_func) - - parms = extract_parameters(nb, lang='python') + notebook_path = nb_details.path + nb = nb_details.contents title, author = extract_title_author(nb) version = get_pycalib_version() if not title: - title = "{} {} Calibration".format(detector, caltype) + title = f"{nb_details.detector} {nb_details.caltype} Calibration" if not author: author = "anonymous" if not version: @@ -1066,23 +612,29 @@ def run(): run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}" # check if concurrency parameter is given and we run concurrently - if not any(p.name == "parameter" for p in parms) and concurrency_par is not None: + if concurrency_par is not None and not any( + p.name == concurrency_par for p in nb_details.default_params + ): msg = f"Notebook cannot be run concurrently: no {concurrency_par} parameter" warnings.warn(msg, RuntimeWarning) # If not explicitly specified, use a new profile for ipcluster - if args.get("cluster_profile") in {None, parser.get_default("cluster_profile")}: - args['cluster_profile'] = "slurm_prof_{}".format(run_uuid) + default_params_by_name = {p.name: p.value for p in nb_details.default_params} + if 'cluster_profile' in default_params_by_name: + if args.get("cluster_profile") == default_params_by_name["cluster_profile"]: + args['cluster_profile'] = "slurm_prof_{}".format(run_uuid) # create a temporary output directory to work in - run_tmp_path = os.path.join(temp_path, f"slurm_out_{detector}_{caltype}_{run_uuid}") + run_tmp_path = os.path.join( + temp_path, f"slurm_out_{nb_details.detector}_{nb_details.caltype}_{run_uuid}" + ) os.makedirs(run_tmp_path) # Write all input parameters to rst file to be included to final report - parms = parameter_values(parms, **args) + parms = parameter_values(nb_details.default_params, **args) make_par_table(parms, run_tmp_path) # And save the invocation of this script itself - save_executed_command(run_tmp_path, version) + save_executed_command(run_tmp_path, version, argv) # Copy the bash script which will be used to run notebooks shutil.copy2( @@ -1091,7 +643,7 @@ def run(): ) # wait on all jobs to run and then finalize the run by creating a report from the notebooks - out_path = Path(default_report_path) / detector.upper() / caltype.upper() / datetime.now().isoformat() + out_path = Path(default_report_path) / nb_details.detector / nb_details.caltype / datetime.now().isoformat() if try_report_to_output: if "out_folder" in args: out_path = Path(args["out_folder"]).absolute() @@ -1115,11 +667,9 @@ def run(): print(f"report_to path contained no path, saving report in '{out_path}'") report_to = out_path / report_to - user_venv = nb_info.get("user", {}).get("venv") - if user_venv: - user_venv = Path(user_venv.format(**args)) - print("Using specified venv:", user_venv) - python_exe = str(user_venv / 'bin' / 'python') + if nb_details.user_venv: + print("Using specified venv:", nb_details.user_venv) + python_exe = str(nb_details.user_venv / 'bin' / 'python') else: python_exe = python_path @@ -1134,7 +684,7 @@ def run(): metadata["pycalibration-version"] = version metadata["report-path"] = f"{report_to}.pdf" if report_to \ else '# REPORT SKIPPED #' - metadata['reproducible'] = reproducible + metadata['reproducible'] = not args['not_reproducible'] metadata["concurrency"] = { 'parameter': concurrency_par, 'default': concurrency_defval, @@ -1169,8 +719,7 @@ def run(): pre_jobs = [] cluster_cores = concurrency.get("cluster cores", 8) # Check if there are pre-notebooks - for pre_notebook in pre_notebooks: - pre_notebook_path = Path(PKG_DIR, pre_notebook) + for pre_notebook_path in nb_details.pre_paths: lead_nb = nbformat.read(pre_notebook_path, as_version=4) pre_jobs.append(prepare_job( run_tmp_path, lead_nb, pre_notebook_path, args, @@ -1197,7 +746,7 @@ def run(): defcval = get_par_attr(parms, concurrency_par, 'value') if defcval is not None: print(f"Concurrency parameter '{concurrency_par}' " - f"is taken from '{notebook}'") + f"is taken from '{notebook_path}'") cvals = defcval if isinstance(defcval, (list, tuple)) else [defcval] if concurrency_func: @@ -1220,6 +769,11 @@ def run(): cvals = f(*callargs) print(f"Split concurrency into {cvals}") + if cvals is None: + raise ValueError( + f"No values found for {concurrency_par} (concurrency parameter)" + ) + # get expected type cvtype = get_par_attr(parms, concurrency_par, 'type', list) cvals = remove_duplications(cvals) @@ -1237,8 +791,7 @@ def run(): # Prepare dependent notebooks (e.g. summaries after correction) dep_jobs = [] - for i, dep_notebook in enumerate(dep_notebooks): - dep_notebook_path = Path(PKG_DIR, dep_notebook) + for i, dep_notebook_path in enumerate(nb_details.dep_paths): dep_nb = nbformat.read(dep_notebook_path, as_version=4) dep_jobs.append(prepare_job( run_tmp_path, dep_nb, dep_notebook_path, args, @@ -1262,7 +815,7 @@ def run(): print("Files prepared, not executing now (--prepare-only option).") print("To execute the notebooks, run:") rpt_opts = '' - if user_venv is not None: + if nb_details.user_venv is not None: rpt_opts = f'--python {python_exe}' print(f" python -m xfel_calibrate.repeat {run_tmp_path} {rpt_opts}") return @@ -1270,7 +823,7 @@ def run(): submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') # Launch the calibration work - if sequential: + if args["no_cluster_job"]: print("Running notebooks directly, not via Slurm...") errors = job_chain.run_direct() joblist = [] @@ -1302,7 +855,7 @@ def run(): fmt_args=fmt_args, temp_path=run_tmp_path, job_list=joblist, - sequential=sequential, + sequential=args["no_cluster_job"], )) if any(j is not None for j in joblist): diff --git a/src/xfel_calibrate/nb_args.py b/src/xfel_calibrate/nb_args.py new file mode 100644 index 0000000000000000000000000000000000000000..511223ed69462c5cab82ad87025d9a848481c9b6 --- /dev/null +++ b/src/xfel_calibrate/nb_args.py @@ -0,0 +1,490 @@ +"""Manipulating notebooks & translating parameters to command-line options +""" +import argparse +import inspect +import os.path +import pprint +import re +import string +import sys +import warnings +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple + +import nbformat +from nbparameterise import Parameter, extract_parameters + +from .notebooks import notebooks + +PKG_DIR = os.path.dirname(os.path.abspath(__file__)) + + +# Add a class combining raw description formatting with +# Metavariable default outputs +class RawTypeFormatter(argparse.RawDescriptionHelpFormatter, + argparse.MetavarTypeHelpFormatter, + argparse.ArgumentDefaultsHelpFormatter): + pass + + +# The argument parser for calibrate.py, will be extended depending +# on the options given. + +def make_initial_parser(**kwargs): + parser = argparse.ArgumentParser( + description="Main entry point for offline calibration", + formatter_class=RawTypeFormatter, + **kwargs + ) + + parser.add_argument('detector', metavar='DETECTOR', type=str, + help='The detector to calibrate: ' + ", ".join(notebooks)) + + parser.add_argument('type', metavar='TYPE', type=str, + help='Type of calibration.') + + parser.add_argument('--no-cluster-job', + action="store_true", + default=False, + help="Do not run as a cluster job") + + parser.add_argument('--prepare-only', action="store_true", + help="Prepare notebooks but don't run them") + + parser.add_argument('--report-to', type=str, + help='Filename (and optionally path) for output' + ' report') + + parser.add_argument('--not-reproducible', action='store_true', + help='Disable checks to allow the processing result ' + 'to not be reproducible based on its metadata.') + + parser.add_argument('--skip-report', action='store_true', + help='Skip report generation in finalize step.') + + parser.add_argument('--skip-env-freeze', action='store_true', + help='Skip recording the Python environment for ' + 'reproducibility purposes, requires ' + '--not-reproducible to run.') + + parser.add_argument('--concurrency-par', type=str, + help='Name of concurrency parameter.' + 'If not given, it is taken from configuration.') + + parser.add_argument('--constants-from', type=str, help=( + "Path to a calibration-metadata.yml file. If given, " + "retrieved-constants will be copied to use for a new correction." + )) + + parser.add_argument('--priority', type=int, default=2, + help="Priority of batch jobs. If priority<=1, reserved" + " nodes become available.") + + parser.add_argument('--vector-figs', action="store_true", default=False, + help="Use vector graphics for figures in the report.") + + parser.add_argument('--slurm-mem', type=int, default=500, + help="Requested node RAM in GB") + + parser.add_argument('--slurm-name', type=str, default='xfel_calibrate', + help='Name of slurm job') + + parser.add_argument('--slurm-scheduling', type=int, default=0, + help='Change scheduling priority for a slurm job ' + '+- 2147483645 (negative value increases ' + 'priority)') + + parser.add_argument('--request-time', type=str, default='Now', + help='Time of request to process notebook. Iso format') + + parser.add_argument_group('required arguments') + + parser.add_argument('--slurm-partition', type=str, default="", + help="Submit jobs in this Slurm partition") + + parser.add_argument('--reservation', type=str, default="", + help="Submit jobs in this Slurm reservation, " + "overriding --slurm-partition if both are set") + + return parser + + +# Helper functions for parser extensions + +def make_intelli_list(ltype): + """ Parses a list from range and comma expressions. + + An expression of the form "1-5,6" will be parsed into the following + list: [1,2,3,4,6] + + """ + class IntelliListAction(argparse.Action): + element_type = ltype + + def __init__(self, *args, **kwargs): + super(IntelliListAction, self).__init__(*args, **kwargs) + + def __call__(self, parser, namespace, values, option_string=None): + parsed_values = [] + values = ",".join(values) + if isinstance(values, str): + for rcomp in values.split(","): + if "-" in rcomp: + start, end = rcomp.split("-") + parsed_values += list(range(int(start), int(end))) + else: + parsed_values += [int(rcomp)] + elif isinstance(values, (list, tuple)): + parsed_values = values + else: + parsed_values = [values, ] + + parsed_values = [self.element_type(p) for p in parsed_values] + print("Parsed input {} to {}".format(values, parsed_values)) + setattr(namespace, self.dest, parsed_values) + + return IntelliListAction + + +def consolize_name(name): + """ Names of console parameters don't have underscores """ + return name.replace("_", "-") + + +def add_args_from_nb(parms, parser, cvar=None, no_required=False): + """Add argparse arguments for parameters in the first cell of a notebook. + + Uses nbparameterise to extract the parameter information. Each foo_bar + parameter gets a --foo-bar command line option. + Boolean parameters get a pair of flags like --abc and --no-abc. + + :param parms: List of nbparameterise Parameter objects + :param parser: argparse.ArgumentParser instance to modify + :param str cvar: Name of the concurrency parameter. + :param bool no_required: If True, none of the added options are required. + """ + + for p in parms: + helpstr = ("Default: %(default)s" if not p.comment + else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip())) + required = (p.comment is not None + and "required" in p.comment + and not no_required + and p.name != cvar) + + # This may be not a public API + # May require reprogramming in case of argparse updates + pars_group = parser._action_groups[2 if required else 1] + + default = p.value if (not required) else None + + if issubclass(p.type, list) or p.name == cvar: + ltype = type(p.value[0]) if issubclass(p.type, list) else p.type + range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False + pars_group.add_argument(f"--{consolize_name(p.name)}", + nargs='+', + type=ltype if not range_allowed else str, + default=default, + help=helpstr, + required=required, + action=make_intelli_list(ltype) if range_allowed else None) + elif issubclass(p.type, bool): + # For a boolean, make --XYZ and --no-XYZ options. + alt_group = pars_group.add_mutually_exclusive_group(required=required) + alt_group.add_argument(f"--{consolize_name(p.name)}", + action="store_true", + default=default, + help=helpstr, + dest=p.name) + alt_group.add_argument(f"--no-{consolize_name(p.name)}", + action="store_false", + default=default, + help=f"Opposite of --{consolize_name(p.name)}", + dest=p.name) + else: + pars_group.add_argument(f"--{consolize_name(p.name)}", + type=p.type, + default=default, + help=helpstr, + required=required) + +def get_cell_n(nb, cell_type, cell_n): + """ + Return notebook cell with given number and given type + + :param nb: jupyter notebook + :param cell_type: cell type, 'code' or 'markdown' + :param cell_n: cell number (count from 0) + :return: notebook cell + """ + counter = 0 + for cell in nb.cells: + if cell.cell_type == cell_type: + if counter == cell_n: + return cell + counter += 1 + + +def first_code_cell(nb): + """ Return the first code cell of a notebook """ + return get_cell_n(nb, 'code', 0) + + +def first_markdown_cell(nb): + """ Return the first markdown cell of a notebook """ + return get_cell_n(nb, 'markdown', 0) + + +def set_figure_format(nb, enable_vector_format): + """Set svg format in inline backend for figures + + If parameter enable_vector_format is set to True, svg format will + be used for figures in the notebook rendering. Subsequently vector + graphics figures will be used for report. + """ + + if enable_vector_format: + cell = get_cell_n(nb, 'code', 1) + cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n" + + +def get_notebook_function(nb, fname): + flines = [] + def_found = False + indent = None + for cell in nb.cells: + if cell.cell_type == 'code': + lines = cell.source.split("\n") + for line in lines: + + if def_found: + lin = len(line) - len(line.lstrip()) + if indent is None: + if lin != 0: + indent = lin + flines.append(line) + elif lin >= indent: + flines.append(line) + else: + return "\n".join(flines) + + if re.search(r"def\s+{}\(.*\):\s*".format(fname), line) and not def_found: + # print("Found {} in line {}".format(fname, line)) + # set this to indent level + def_found = True + flines.append(line) + return None + + +def make_epilog(nb, caltype=None): + """ Make an epilog from the notebook to add to parser help + """ + msg = "" + header_cell = first_markdown_cell(nb) + lines = header_cell.source.split("\n") if header_cell is not None else [''] + if caltype: + msg += "{:<15} {}".format(caltype, lines[0]) + "\n" + else: + msg += "{}".format(lines[0]) + "\n" + pp = pprint.PrettyPrinter(indent=(17 if caltype else 0)) + if len(lines[1:]): + plines = pp.pformat(lines[1:])[1:-1].split("\n") + for line in plines: + sline = line.replace("'", "", 1) + sline = sline.replace("', '", " " * (17 if caltype else 0), 1) + sline = sline[::-1].replace("'", "", 1)[::-1] + sline = sline.replace(" ,", " ") + if len(sline) > 1 and sline[0] == ",": + sline = sline[1:] + msg += sline + "\n" + msg += "\n" + return msg + + +def deconsolize_args(args): + """ Variable names have underscores """ + return {k.replace("-", "_"): v for k, v in args.items()} + + +def extend_params(nb, extend_func_name, argv): + """Add parameters in the first code cell by calling a function in the notebook + """ + func = get_notebook_function(nb, extend_func_name) + + if func is None: + warnings.warn( + f"Didn't find concurrency function {extend_func_name} in notebook", + RuntimeWarning + ) + return + + # Make a temporary parser that won't exit if it sees -h or --help + pre_parser = make_initial_parser(add_help=False) + add_args_from_nb(nb, pre_parser, no_required=True) + known, _ = pre_parser.parse_known_args(argv[1:]) + args = deconsolize_args(vars(known)) + + df = {} + exec(func, df) + f = df[extend_func_name] + sig = inspect.signature(f) + + extension = f(*[args[p] for p in sig.parameters]) + fcc = first_code_cell(nb) + fcc["source"] += "\n" + extension + + +@dataclass +class NBDetails: + """Details of a notebook-based workflow to run""" + detector: str # e.g. AGIPD + caltype: str # e.g. CORRECT + path: Path + pre_paths: List[Path] # Notebooks to run before the main notebook + dep_paths: List[Path] # Notebooks to run after the main notebooks + contents: nbformat.NotebookNode + default_params: List[Parameter] + concurrency: Dict[str, Any] # Contents as in notebooks.py + user_venv: Optional[Path] + + +def parse_argv_and_load_nb(argv) -> Tuple[Dict, NBDetails]: + """Parse command-line arguments for xfel-calibrate to run a notebook""" + # extend the parser according to user input + # the first case is if a detector was given, but no calibration type + if len(argv) == 3 and "-h" in argv[2]: + detector = argv[1].upper() + try: + det_notebooks = notebooks[detector] + except KeyError: + # TODO: This should really go to stderr not stdout + print("Not one of the known detectors: {}".format(notebooks.keys())) + sys.exit(1) + + msg = "Options for detector {}\n".format(detector) + msg += "*" * len(msg) + "\n\n" + + # basically, this creates help in the form of + # + # TYPE some description that is + # indented for this type. + # + # The information is extracted from the first markdown cell of + # the notebook. + for caltype, notebook in det_notebooks.items(): + if notebook.get("notebook") is None: + if notebook.get("user", {}).get("notebook") is None: + raise KeyError( + f"`{detector}` does not have a notebook path, for " + "notebooks that are stored in pycalibration set the " + "`notebook` key to a relative path or set the " + "`['user']['notebook']` key to an absolute path/path " + "pattern. Notebook configuration dictionary contains " + f"only: `{notebook}`" + "" + ) + # Everything should be indented by 17 spaces + msg += caltype.ljust(17) + "User defined notebook, arguments may vary\n" + msg += " "*17 + "User notebook expected to be at path:\n" + msg += " "*17 + notebook["user"]["notebook"] + "\n" + else: + nbpath = os.path.join(PKG_DIR, notebook["notebook"]) + nb = nbformat.read(nbpath, as_version=4) + msg += make_epilog(nb, caltype=caltype) + + make_initial_parser(epilog=msg).parse_args(argv[1:]) + sys.exit() # parse_args should already exit for --help + elif len(argv) <= 3: + make_initial_parser().parse_args(argv[1:]) + sys.exit() # parse_args should already exit - not enough args + + # A detector and type was given. We derive the arguments + # from the corresponding notebook + args, _ = make_initial_parser(add_help=False).parse_known_args(argv[1:]) + try: + nb_info = notebooks[args.detector.upper()][args.type.upper()] + except KeyError: + print("Not one of the known calibrations or detectors") + sys.exit(1) + + # Pick out any arguments that may prevent reproducibility from + # working, sorted alphabetically and converted back to their + # canonical representation. + not_reproducible_args = sorted( + ('--' + x.replace('_', '-') + for x in ['skip_env_freeze'] + if getattr(args, x)) + ) + + # If any of these arguments are set, present a warning. + if not_reproducible_args: + print('WARNING: One or more command line arguments ({}) may prevent ' + 'this specific correction result from being reproducible based ' + 'on its metadata. It may not be possible to restore identical ' + 'output data files when they have been deleted or lost. Please ' + 'ensure that the data retention policy of the chosen storage ' + 'location is sufficient for your ' + 'needs.'.format(', '.join(not_reproducible_args))) + + if not args['not_reproducible']: + # If not explicitly specified that reproducibility may be + # broken, remind the user and exit. + print('To proceed, you can explicitly allow reproducibility to ' + 'be broken by adding --not-reproducible') + sys.exit(1) + + if nb_info["notebook"]: + notebook = os.path.join(PKG_DIR, nb_info["notebook"]) + else: + # If `"notebook"` entry is None, then set it to the user provided + # notebook TODO: This is a very hacky workaround, better implementation + # is not really possible with the current state of this module + user_notebook_path = nb_info["user"]["notebook"] + # Pull out the variables in the templated path string, and get values + # from command line args (e.g. --proposal 1234 -> {proposal}) + user_notebook_variables = [ + name for (_, name, _, _) in string.Formatter().parse(user_notebook_path) + if name is not None + ] + + user_notebook_parser = argparse.ArgumentParser(add_help=False) + for var in user_notebook_variables: + user_notebook_parser.add_argument(f"--{var}") + + user_notebook_args, _ = user_notebook_parser.parse_known_args(argv[1:]) + + notebook = user_notebook_path.format(**vars(user_notebook_args)) + + concurrency = nb_info.get("concurrency", {'parameter': None}) + + nb = nbformat.read(notebook, as_version=4) + + # extend parameters if needed + ext_func = nb_info.get("extend parms", None) + if ext_func is not None: + extend_params(nb, ext_func, argv) + + default_params = extract_parameters(nb, lang='python') + + parser = make_initial_parser() + parser.description = make_epilog(nb) + add_args_from_nb(default_params, parser, cvar=concurrency['parameter']) + + arg_dict = deconsolize_args(vars(parser.parse_args(argv[1:]))) + + user_venv = nb_info.get("user", {}).get("venv") + if user_venv is not None: + user_venv = Path(user_venv.format(**arg_dict)) + + return arg_dict, NBDetails( + detector=args.detector.upper(), + caltype=args.type.upper(), + path=Path(notebook), + pre_paths=[Path(PKG_DIR, p) for p in nb_info.get('pre_notebooks', [])], + dep_paths=[Path(PKG_DIR, p) for p in nb_info.get('dep_notebooks', [])], + contents=nb, + default_params=default_params, + concurrency=concurrency, + user_venv=user_venv, + ) diff --git a/tests/test_xfel_calibrate/conftest.py b/tests/test_xfel_calibrate/conftest.py index 739182a846b744a1c167623039f320ec5af55d88..95bc80f2e7fb0e71bbae94921df10514bc26c658 100644 --- a/tests/test_xfel_calibrate/conftest.py +++ b/tests/test_xfel_calibrate/conftest.py @@ -227,14 +227,13 @@ class CalibrateCall: if extra_args: self.args.extend(extra_args) - with mock.patch.object(sys, "argv", self.args): - with mock.patch.object(calibrate, "temp_path", str(tmp_path)): - calibrate.run() + with mock.patch.object(calibrate, "temp_path", str(tmp_path)): + calibrate.run(argv=self.args) - out, err = capsys.readouterr() + out, err = capsys.readouterr() - self.out: str = out - self.err: str = err + self.out: str = out + self.err: str = err Paths = NamedTuple( "Paths", diff --git a/tests/test_xfel_calibrate/test_cli.py b/tests/test_xfel_calibrate/test_cli.py index 8eeae450e17dd88225c6f080786f83208cdcdfec..1bb0c1a2794bfde1932f4b4702e63b18facdfa79 100644 --- a/tests/test_xfel_calibrate/test_cli.py +++ b/tests/test_xfel_calibrate/test_cli.py @@ -30,10 +30,9 @@ class TestBasicCalls: that the expected output is present in stdout """ - @mock.patch.object(sys, "argv", ["xfel-calibrate", "--help"]) def test_help(self, capsys): with pytest.raises(SystemExit): - calibrate.run() + calibrate.run(["xfel-calibrate", "--help"]) out, err = capsys.readouterr() @@ -43,10 +42,9 @@ class TestBasicCalls: assert err == "" - @mock.patch.object(sys, "argv", ["xfel-calibrate", "TEST", "-h"]) def test_help_detector(self, capsys): with pytest.raises(SystemExit): - calibrate.run() + calibrate.run(["xfel-calibrate", "TEST", "-h"]) out, err = capsys.readouterr() @@ -55,10 +53,9 @@ class TestBasicCalls: assert err == "" - @mock.patch.object(sys, "argv", ["xfel-calibrate", "TEST", "-h"]) def test_help_user_notebook(self, capsys): with pytest.raises(SystemExit): - calibrate.run() + calibrate.run(["xfel-calibrate", "TEST", "-h"]) out, err = capsys.readouterr() @@ -67,15 +64,13 @@ class TestBasicCalls: assert err == "" - @mock.patch.object(sys, "argv", ["xfel-calibrate", "TEST-RAISES-ERRORS", "--help"]) def test_help_bad_config(self): with pytest.raises(KeyError): - calibrate.run() + calibrate.run(["xfel-calibrate", "TEST-RAISES-ERRORS", "--help"]) - @mock.patch.object(sys, "argv", ["xfel-calibrate", "NotADetector", "beep", "-h"]) def test_unknown_detector(self, capsys): with pytest.raises(SystemExit) as exit_exception: - calibrate.run() + calibrate.run(["xfel-calibrate", "NotADetector", "beep", "-h"]) out, err = capsys.readouterr() @@ -85,10 +80,9 @@ class TestBasicCalls: assert err == "" - @mock.patch.object(sys, "argv", ["xfel-calibrate", "NotADetector", "-h"]) def test_unknown_detector_h(self, capsys): with pytest.raises(SystemExit) as exit_exception: - calibrate.run() + calibrate.run(["xfel-calibrate", "NotADetector", "-h"]) out, err = capsys.readouterr() @@ -98,10 +92,9 @@ class TestBasicCalls: assert err == "" - @mock.patch.object(sys, "argv", ["xfel-calibrate", "Tutorial", "TEST", "--help"]) def test_help_nb(self, capsys): with pytest.raises(SystemExit): - calibrate.run() + calibrate.run(["xfel-calibrate", "Tutorial", "TEST", "--help"]) out, err = capsys.readouterr() @@ -165,6 +158,7 @@ class TestTutorialNotebook: def test_expected_processes_called( self, + calibrate_call: CalibrateCall, fake_process_calibrate: FakeProcessCalibrate, ): process_calls = [ diff --git a/tests/test_xfel_calibrate/test_user_configs.py b/tests/test_xfel_calibrate/test_user_configs.py index 21feb55a995ec06e3deb1d71c64d6fe7abdc67cd..cbae495977b217409b64acc8c753facd63f36070 100644 --- a/tests/test_xfel_calibrate/test_user_configs.py +++ b/tests/test_xfel_calibrate/test_user_configs.py @@ -126,6 +126,7 @@ class TestUserVenv: def test_expected_processes_called( self, + calibrate_call: CalibrateCall, mock_proposal: MockProposal, fake_process_calibrate: FakeProcessCalibrate, ):