"""Manipulating notebooks & translating parameters to command-line options """ import argparse import inspect import os.path import pprint import re import string import sys import warnings from dataclasses import dataclass from pathlib import Path from typing import Any, Dict, List, Optional, Tuple import nbformat from nbparameterise import extract_parameters, Parameter from .notebooks import notebooks PKG_DIR = os.path.dirname(os.path.abspath(__file__)) # Add a class combining raw description formatting with # Metavariable default outputs class RawTypeFormatter(argparse.RawDescriptionHelpFormatter, argparse.MetavarTypeHelpFormatter, argparse.ArgumentDefaultsHelpFormatter): pass # The argument parser for calibrate.py, will be extended depending # on the options given. def make_initial_parser(**kwargs): parser = argparse.ArgumentParser( description="Main entry point for offline calibration", formatter_class=RawTypeFormatter, **kwargs ) parser.add_argument('detector', metavar='DETECTOR', type=str, help='The detector to calibrate: ' + ", ".join(notebooks)) parser.add_argument('type', metavar='TYPE', type=str, help='Type of calibration.') parser.add_argument('--no-cluster-job', action="store_true", default=False, help="Do not run as a cluster job") parser.add_argument('--prepare-only', action="store_true", help="Prepare notebooks but don't run them") parser.add_argument('--report-to', type=str, help='Filename (and optionally path) for output' ' report') parser.add_argument('--not-reproducible', action='store_true', help='Disable checks to allow the processing result ' 'to not be reproducible based on its metadata.') parser.add_argument('--skip-report', action='store_true', help='Skip report generation in finalize step.') parser.add_argument('--skip-env-freeze', action='store_true', help='Skip recording the Python environment for ' 'reproducibility purposes, requires ' '--not-reproducible to run.') parser.add_argument('--concurrency-par', type=str, help='Name of concurrency parameter.' 'If not given, it is taken from configuration.') parser.add_argument('--constants-from', type=str, help=( "Path to a calibration-metadata.yml file. If given, " "retrieved-constants will be copied to use for a new correction." )) parser.add_argument('--priority', type=int, default=2, help="Priority of batch jobs. If priority<=1, reserved" " nodes become available.") parser.add_argument('--vector-figs', action="store_true", default=False, help="Use vector graphics for figures in the report.") parser.add_argument('--slurm-mem', type=int, default=500, help="Requested node RAM in GB") parser.add_argument('--slurm-name', type=str, default='xfel_calibrate', help='Name of slurm job') parser.add_argument('--slurm-scheduling', type=int, default=0, help='Change scheduling priority for a slurm job ' '+- 2147483645 (negative value increases ' 'priority)') parser.add_argument('--request-time', type=str, default='Now', help='Time of request to process notebook. Iso format') parser.add_argument_group('required arguments') parser.add_argument('--slurm-partition', type=str, default="", help="Submit jobs in this Slurm partition") parser.add_argument('--reservation', type=str, default="", help="Submit jobs in this Slurm reservation, " "overriding --slurm-partition if both are set") return parser # Helper functions for parser extensions def make_intelli_list(ltype): """ Parses a list from range and comma expressions. An expression of the form "1-5,6" will be parsed into the following list: [1,2,3,4,6] """ class IntelliListAction(argparse.Action): element_type = ltype def __init__(self, *args, **kwargs): super(IntelliListAction, self).__init__(*args, **kwargs) def __call__(self, parser, namespace, values, option_string=None): parsed_values = [] values = ",".join(values) if isinstance(values, str): for rcomp in values.split(","): if "-" in rcomp: start, end = rcomp.split("-") parsed_values += list(range(int(start), int(end))) else: parsed_values += [int(rcomp)] elif isinstance(values, (list, tuple)): parsed_values = values else: parsed_values = [values, ] parsed_values = [self.element_type(p) for p in parsed_values] print("Parsed input {} to {}".format(values, parsed_values)) setattr(namespace, self.dest, parsed_values) return IntelliListAction def consolize_name(name): """ Names of console parameters don't have underscores """ return name.replace("_", "-") def add_args_from_nb(parms, parser, cvar=None, no_required=False): """Add argparse arguments for parameters in the first cell of a notebook. Uses nbparameterise to extract the parameter information. Each foo_bar parameter gets a --foo-bar command line option. Boolean parameters get a pair of flags like --abc and --no-abc. :param parms: List of nbparameterise Parameter objects :param parser: argparse.ArgumentParser instance to modify :param str cvar: Name of the concurrency parameter. :param bool no_required: If True, none of the added options are required. """ for p in parms: helpstr = ("Default: %(default)s" if not p.comment else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip())) required = (p.comment is not None and "required" in p.comment and not no_required and p.name != cvar) # This may be not a public API # May require reprogramming in case of argparse updates pars_group = parser._action_groups[2 if required else 1] default = p.value if (not required) else None if issubclass(p.type, list) or p.name == cvar: ltype = type(p.value[0]) if issubclass(p.type, list) else p.type range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False pars_group.add_argument(f"--{consolize_name(p.name)}", nargs='+', type=ltype if not range_allowed else str, default=default, help=helpstr, required=required, action=make_intelli_list(ltype) if range_allowed else None) elif issubclass(p.type, bool): # For a boolean, make --XYZ and --no-XYZ options. alt_group = pars_group.add_mutually_exclusive_group(required=required) alt_group.add_argument(f"--{consolize_name(p.name)}", action="store_true", default=default, help=helpstr, dest=p.name) alt_group.add_argument(f"--no-{consolize_name(p.name)}", action="store_false", default=default, help=f"Opposite of --{consolize_name(p.name)}", dest=p.name) else: pars_group.add_argument(f"--{consolize_name(p.name)}", type=p.type, default=default, help=helpstr, required=required) def get_cell_n(nb, cell_type, cell_n): """ Return notebook cell with given number and given type :param nb: jupyter notebook :param cell_type: cell type, 'code' or 'markdown' :param cell_n: cell number (count from 0) :return: notebook cell """ counter = 0 for cell in nb.cells: if cell.cell_type == cell_type: if counter == cell_n: return cell counter += 1 def first_code_cell(nb): """ Return the first code cell of a notebook """ return get_cell_n(nb, 'code', 0) def first_markdown_cell(nb): """ Return the first markdown cell of a notebook """ return get_cell_n(nb, 'markdown', 0) def set_figure_format(nb, enable_vector_format): """Set svg format in inline backend for figures If parameter enable_vector_format is set to True, svg format will be used for figures in the notebook rendering. Subsequently vector graphics figures will be used for report. """ if enable_vector_format: cell = get_cell_n(nb, 'code', 1) cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n" def get_notebook_function(nb, fname): flines = [] def_found = False indent = None for cell in nb.cells: if cell.cell_type == 'code': lines = cell.source.split("\n") for line in lines: if def_found: lin = len(line) - len(line.lstrip()) if indent is None: if lin != 0: indent = lin flines.append(line) elif lin >= indent: flines.append(line) else: return "\n".join(flines) if re.search(r"def\s+{}\(.*\):\s*".format(fname), line) and not def_found: # print("Found {} in line {}".format(fname, line)) # set this to indent level def_found = True flines.append(line) return None def make_epilog(nb, caltype=None): """ Make an epilog from the notebook to add to parser help """ msg = "" header_cell = first_markdown_cell(nb) lines = header_cell.source.split("\n") if caltype: msg += "{:<15} {}".format(caltype, lines[0]) + "\n" else: msg += "{}".format(lines[0]) + "\n" pp = pprint.PrettyPrinter(indent=(17 if caltype else 0)) if len(lines[1:]): plines = pp.pformat(lines[1:])[1:-1].split("\n") for line in plines: sline = line.replace("'", "", 1) sline = sline.replace("', '", " " * (17 if caltype else 0), 1) sline = sline[::-1].replace("'", "", 1)[::-1] sline = sline.replace(" ,", " ") if len(sline) > 1 and sline[0] == ",": sline = sline[1:] msg += sline + "\n" msg += "\n" return msg def deconsolize_args(args): """ Variable names have underscores """ return {k.replace("-", "_"): v for k, v in args.items()} def extend_params(nb, extend_func_name, argv): """Add parameters in the first code cell by calling a function in the notebook """ func = get_notebook_function(nb, extend_func_name) if func is None: warnings.warn( f"Didn't find concurrency function {extend_func_name} in notebook", RuntimeWarning ) return # Make a temporary parser that won't exit if it sees -h or --help pre_parser = make_initial_parser(add_help=False) add_args_from_nb(nb, pre_parser, no_required=True) known, _ = pre_parser.parse_known_args(argv[1:]) args = deconsolize_args(vars(known)) df = {} exec(func, df) f = df[extend_func_name] sig = inspect.signature(f) extension = f(*[args[p] for p in sig.parameters]) fcc = first_code_cell(nb) fcc["source"] += "\n" + extension @dataclass class NBDetails: """Details of a notebook-based workflow to run""" detector: str # e.g. AGIPD caltype: str # e.g. CORRECT path: Path pre_paths: List[Path] # Notebooks to run before the main notebook dep_paths: List[Path] # Notebooks to run after the main notebooks contents: nbformat.NotebookNode default_params: List[Parameter] concurrency: Dict[str, Any] # Contents as in notebooks.py user_venv: Optional[Path] def parse_argv_and_load_nb(argv) -> Tuple[Dict, NBDetails]: """Parse command-line arguments for xfel-calibrate to run a notebook""" # extend the parser according to user input # the first case is if a detector was given, but no calibration type if len(argv) == 3 and "-h" in argv[2]: detector = argv[1].upper() try: det_notebooks = notebooks[detector] except KeyError: # TODO: This should really go to stderr not stdout print("Not one of the known detectors: {}".format(notebooks.keys())) sys.exit(1) msg = "Options for detector {}\n".format(detector) msg += "*" * len(msg) + "\n\n" # basically, this creates help in the form of # # TYPE some description that is # indented for this type. # # The information is extracted from the first markdown cell of # the notebook. for caltype, notebook in det_notebooks.items(): if notebook.get("notebook") is None: if notebook.get("user", {}).get("notebook") is None: raise KeyError( f"`{detector}` does not have a notebook path, for " "notebooks that are stored in pycalibration set the " "`notebook` key to a relative path or set the " "`['user']['notebook']` key to an absolute path/path " "pattern. Notebook configuration dictionary contains " f"only: `{notebook}`" "" ) # Everything should be indented by 17 spaces msg += caltype.ljust(17) + "User defined notebook, arguments may vary\n" msg += " "*17 + "User notebook expected to be at path:\n" msg += " "*17 + notebook["user"]["notebook"] + "\n" else: nbpath = os.path.join(PKG_DIR, notebook["notebook"]) nb = nbformat.read(nbpath, as_version=4) msg += make_epilog(nb, caltype=caltype) make_initial_parser(epilog=msg).parse_args(argv[1:]) sys.exit() # parse_args should already exit for --help elif len(argv) <= 3: make_initial_parser().parse_args(argv[1:]) sys.exit() # parse_args should already exit - not enough args # A detector and type was given. We derive the arguments # from the corresponding notebook args, _ = make_initial_parser(add_help=False).parse_known_args(argv[1:]) try: nb_info = notebooks[args.detector.upper()][args.type.upper()] except KeyError: print("Not one of the known calibrations or detectors") sys.exit(1) # Pick out any arguments that may prevent reproducibility from # working, sorted alphabetically and converted back to their # canonical representation. not_reproducible_args = sorted( ('--' + x.replace('_', '-') for x in ['skip_env_freeze'] if args[x])) # If any of these arguments are set, present a warning. if not_reproducible_args: print('WARNING: One or more command line arguments ({}) may prevent ' 'this specific correction result from being reproducible based ' 'on its metadata. It may not be possible to restore identical ' 'output data files when they have been deleted or lost. Please ' 'ensure that the data retention policy of the chosen storage ' 'location is sufficient for your ' 'needs.'.format(', '.join(not_reproducible_args))) if not args['not_reproducible']: # If not explicitly specified that reproducibility may be # broken, remind the user and exit. print('To proceed, you can explicitly allow reproducibility to ' 'be broken by adding --not-reproducible') sys.exit(1) if nb_info["notebook"]: notebook = os.path.join(PKG_DIR, nb_info["notebook"]) else: # If `"notebook"` entry is None, then set it to the user provided # notebook TODO: This is a very hacky workaround, better implementation # is not really possible with the current state of this module user_notebook_path = nb_info["user"]["notebook"] # Pull out the variables in the templated path string, and get values # from command line args (e.g. --proposal 1234 -> {proposal}) user_notebook_variables = [ name for (_, name, _, _) in string.Formatter().parse(user_notebook_path) if name is not None ] user_notebook_parser = argparse.ArgumentParser(add_help=False) for var in user_notebook_variables: user_notebook_parser.add_argument(f"--{var}") user_notebook_args, _ = user_notebook_parser.parse_known_args(argv[1:]) notebook = user_notebook_path.format(**vars(user_notebook_args)) concurrency = nb_info.get("concurrency", {'parameter': None}) nb = nbformat.read(notebook, as_version=4) # extend parameters if needed ext_func = nb_info.get("extend parms", None) if ext_func is not None: extend_params(nb, ext_func, argv) default_params = extract_parameters(nb, lang='python') parser = make_initial_parser() parser.description = make_epilog(nb) add_args_from_nb(default_params, parser, cvar=concurrency['parameter']) arg_dict = deconsolize_args(vars(parser.parse_args(argv[1:]))) user_venv = nb_info.get("user", {}).get("venv") if user_venv is not None: user_venv = Path(user_venv.format(**arg_dict)) return arg_dict, NBDetails( detector=args.detector.upper(), caltype=args.type.upper(), path=Path(notebook), pre_paths=[Path(PKG_DIR, p) for p in nb_info.get('pre_notebooks', [])], dep_paths=[Path(PKG_DIR, p) for p in nb_info.get('dep_notebooks', [])], contents=nb, default_params=default_params, concurrency=concurrency, user_venv=user_venv, )