Something went wrong on our end
-
Thomas Kluyver authoredThomas Kluyver authored
nb_args.py 18.66 KiB
"""Manipulating notebooks & translating parameters to command-line options
"""
import argparse
import inspect
import os.path
import pprint
import re
import string
import sys
import warnings
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import nbformat
from nbparameterise import Parameter, extract_parameters
from .notebooks import notebooks
PKG_DIR = os.path.dirname(os.path.abspath(__file__))
# Add a class combining raw description formatting with
# Metavariable default outputs
class RawTypeFormatter(argparse.RawDescriptionHelpFormatter,
argparse.MetavarTypeHelpFormatter,
argparse.ArgumentDefaultsHelpFormatter):
pass
# The argument parser for calibrate.py, will be extended depending
# on the options given.
def make_initial_parser(**kwargs):
parser = argparse.ArgumentParser(
description="Main entry point for offline calibration",
formatter_class=RawTypeFormatter,
**kwargs
)
parser.add_argument('detector', metavar='DETECTOR', type=str,
help='The detector to calibrate: ' + ", ".join(notebooks))
parser.add_argument('type', metavar='TYPE', type=str,
help='Type of calibration.')
parser.add_argument('--no-cluster-job',
action="store_true",
default=False,
help="Do not run as a cluster job")
parser.add_argument('--prepare-only', action="store_true",
help="Prepare notebooks but don't run them")
parser.add_argument('--report-to', type=str,
help='Full path for the PDF report output')
parser.add_argument('--not-reproducible', action='store_true',
help='Disable checks to allow the processing result '
'to not be reproducible based on its metadata.')
parser.add_argument('--skip-report', action='store_true',
help='Skip report generation in finalize step.')
parser.add_argument('--skip-env-freeze', action='store_true',
help='Skip recording the Python environment for '
'reproducibility purposes, requires '
'--not-reproducible to run.')
parser.add_argument('--concurrency-par', type=str,
help='Name of concurrency parameter.'
'If not given, it is taken from configuration.')
parser.add_argument('--constants-from', type=str, help=(
"Path to a calibration-metadata.yml file. If given, "
"retrieved-constants will be copied to use for a new correction."
))
parser.add_argument('--vector-figs', action="store_true", default=False,
help="Use vector graphics for figures in the report.")
parser.add_argument('--slurm-mem', type=int, default=500,
help="Requested node RAM in GB")
parser.add_argument('--slurm-name', type=str, default='xfel_calibrate',
help='Name of slurm job')
parser.add_argument('--slurm-scheduling', type=int, default=0,
help='Change scheduling priority for a slurm job '
'+- 2147483645 (negative value increases '
'priority)')
parser.add_argument('--request-time', type=str, default='Now',
help='Time of request to process notebook. Iso format')
parser.add_argument_group('required arguments')
parser.add_argument('--slurm-partition', type=str, default="",
help="Submit jobs in this Slurm partition")
parser.add_argument('--reservation', type=str, default="",
help="Submit jobs in this Slurm reservation, "
"overriding --slurm-partition if both are set")
return parser
# Helper functions for parser extensions
def make_intelli_list(ltype):
""" Parses a list from range and comma expressions.
An expression of the form "1-5,6" will be parsed into the following
list: [1,2,3,4,6]
"""
class IntelliListAction(argparse.Action):
element_type = ltype
def __init__(self, *args, **kwargs):
super(IntelliListAction, self).__init__(*args, **kwargs)
def __call__(self, parser, namespace, values, option_string=None):
parsed_values = []
values = ",".join(values)
if isinstance(values, str):
for rcomp in values.split(","):
if "-" in rcomp:
start, end = rcomp.split("-")
parsed_values += list(range(int(start), int(end)))
else:
parsed_values += [int(rcomp)]
elif isinstance(values, (list, tuple)):
parsed_values = values
else:
parsed_values = [values, ]
parsed_values = [self.element_type(p) for p in parsed_values]
print("Parsed input {} to {}".format(values, parsed_values))
setattr(namespace, self.dest, parsed_values)
return IntelliListAction
def consolize_name(name):
""" Names of console parameters don't have underscores """
return name.replace("_", "-")
def add_args_from_nb(parms, parser, cvar=None, no_required=False):
"""Add argparse arguments for parameters in the first cell of a notebook.
Uses nbparameterise to extract the parameter information. Each foo_bar
parameter gets a --foo-bar command line option.
Boolean parameters get a pair of flags like --abc and --no-abc.
:param parms: List of nbparameterise Parameter objects
:param parser: argparse.ArgumentParser instance to modify
:param str cvar: Name of the concurrency parameter.
:param bool no_required: If True, none of the added options are required.
"""
for p in parms:
if p.name == 'metadata_folder':
continue # Comes from xfel-calibrate machinery, can't be supplied
helpstr = ("Default: %(default)s" if not p.comment
else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip()))
required = (p.comment is not None
and "required" in p.comment
and not no_required
and p.name != cvar)
# This may be not a public API
# May require reprogramming in case of argparse updates
pars_group = parser._action_groups[2 if required else 1]
default = p.value if (not required) else None
if issubclass(p.type, list) or p.name == cvar:
ltype = type(p.value[0]) if issubclass(p.type, list) else p.type
range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False
pars_group.add_argument(f"--{consolize_name(p.name)}",
nargs='+',
type=ltype if not range_allowed else str,
default=default,
help=helpstr,
required=required,
action=make_intelli_list(ltype) if range_allowed else None)
elif issubclass(p.type, bool):
# For a boolean, make --XYZ and --no-XYZ options.
alt_group = pars_group.add_mutually_exclusive_group(required=required)
alt_group.add_argument(f"--{consolize_name(p.name)}",
action="store_true",
default=default,
help=helpstr,
dest=p.name)
alt_group.add_argument(f"--no-{consolize_name(p.name)}",
action="store_false",
default=default,
help=f"Opposite of --{consolize_name(p.name)}",
dest=p.name)
else:
pars_group.add_argument(f"--{consolize_name(p.name)}",
type=p.type,
default=default,
help=helpstr,
required=required)
def get_cell_n(nb, cell_type, cell_n):
"""
Return notebook cell with given number and given type
:param nb: jupyter notebook
:param cell_type: cell type, 'code' or 'markdown'
:param cell_n: cell number (count from 0)
:return: notebook cell
"""
counter = 0
for cell in nb.cells:
if cell.cell_type == cell_type:
if counter == cell_n:
return cell
counter += 1
def first_code_cell(nb):
""" Return the first code cell of a notebook """
return get_cell_n(nb, 'code', 0)
def first_markdown_cell(nb):
""" Return the first markdown cell of a notebook """
return get_cell_n(nb, 'markdown', 0)
def set_figure_format(nb, enable_vector_format):
"""Set svg format in inline backend for figures
If parameter enable_vector_format is set to True, svg format will
be used for figures in the notebook rendering. Subsequently vector
graphics figures will be used for report.
"""
if enable_vector_format:
cell = get_cell_n(nb, 'code', 1)
cell.source += "\n%config InlineBackend.figure_formats = ['svg']\n"
def get_notebook_function(nb, fname):
flines = []
def_found = False
indent = None
for cell in nb.cells:
if cell.cell_type == 'code':
lines = cell.source.split("\n")
for line in lines:
if def_found:
lin = len(line) - len(line.lstrip())
if indent is None:
if lin != 0:
indent = lin
flines.append(line)
elif lin >= indent:
flines.append(line)
else:
return "\n".join(flines)
if re.search(r"def\s+{}\(.*\):\s*".format(fname), line) and not def_found:
# print("Found {} in line {}".format(fname, line))
# set this to indent level
def_found = True
flines.append(line)
return None
def make_epilog(nb, caltype=None):
""" Make an epilog from the notebook to add to parser help
"""
msg = ""
header_cell = first_markdown_cell(nb)
lines = header_cell.source.split("\n") if header_cell is not None else ['']
if caltype:
msg += "{:<15} {}".format(caltype, lines[0]) + "\n"
else:
msg += "{}".format(lines[0]) + "\n"
pp = pprint.PrettyPrinter(indent=(17 if caltype else 0))
if len(lines[1:]):
plines = pp.pformat(lines[1:])[1:-1].split("\n")
for line in plines:
sline = line.replace("'", "", 1)
sline = sline.replace("', '", " " * (17 if caltype else 0), 1)
sline = sline[::-1].replace("'", "", 1)[::-1]
sline = sline.replace(" ,", " ")
if len(sline) > 1 and sline[0] == ",":
sline = sline[1:]
msg += sline + "\n"
msg += "\n"
return msg
def deconsolize_args(args):
""" Variable names have underscores """
return {k.replace("-", "_"): v for k, v in args.items()}
def extend_params(nb, extend_func_name, argv):
"""Add parameters in the first code cell by calling a function in the notebook
"""
func = get_notebook_function(nb, extend_func_name)
if func is None:
warnings.warn(
f"Didn't find concurrency function {extend_func_name} in notebook",
RuntimeWarning
)
return
# Make a temporary parser that won't exit if it sees -h or --help
pre_parser = make_initial_parser(add_help=False)
params = extract_parameters(nb, lang='python')
add_args_from_nb(params, pre_parser, no_required=True)
known, _ = pre_parser.parse_known_args(argv[1:])
args = deconsolize_args(vars(known))
df = {}
exec(func, df)
f = df[extend_func_name]
sig = inspect.signature(f)
extension = f(*[args[p] for p in sig.parameters])
fcc = first_code_cell(nb)
fcc["source"] += "\n" + extension if extension else "\n"
@dataclass
class NBDetails:
"""Details of a notebook-based workflow to run"""
detector: str # e.g. AGIPD
caltype: str # e.g. CORRECT
path: Path
pre_paths: List[Path] # Notebooks to run before the main notebook
dep_paths: List[Path] # Notebooks to run after the main notebooks
contents: nbformat.NotebookNode
default_params: List[Parameter]
concurrency: Dict[str, Any] # Contents as in notebooks.py
user_venv: Optional[Path]
def parse_argv_and_load_nb(argv) -> Tuple[Dict, NBDetails]:
"""Parse command-line arguments for xfel-calibrate to run a notebook"""
# extend the parser according to user input
# the first case is if a detector was given, but no calibration type
if len(argv) == 3 and "-h" in argv[2]:
detector = argv[1].upper()
try:
det_notebooks = notebooks[detector]
except KeyError:
# TODO: This should really go to stderr not stdout
print("Not one of the known detectors: {}".format(notebooks.keys()))
sys.exit(1)
msg = "Options for detector {}\n".format(detector)
msg += "*" * len(msg) + "\n\n"
# basically, this creates help in the form of
#
# TYPE some description that is
# indented for this type.
#
# The information is extracted from the first markdown cell of
# the notebook.
for caltype, notebook in det_notebooks.items():
if notebook.get("notebook") is None:
if notebook.get("user", {}).get("notebook") is None:
raise KeyError(
f"`{detector}` does not have a notebook path, for "
"notebooks that are stored in pycalibration set the "
"`notebook` key to a relative path or set the "
"`['user']['notebook']` key to an absolute path/path "
"pattern. Notebook configuration dictionary contains "
f"only: `{notebook}`"
""
)
# Everything should be indented by 17 spaces
msg += caltype.ljust(17) + "User defined notebook, arguments may vary\n"
msg += " "*17 + "User notebook expected to be at path:\n"
msg += " "*17 + notebook["user"]["notebook"] + "\n"
else:
nbpath = os.path.join(PKG_DIR, notebook["notebook"])
nb = nbformat.read(nbpath, as_version=4)
msg += make_epilog(nb, caltype=caltype)
make_initial_parser(epilog=msg).parse_args(argv[1:])
sys.exit() # parse_args should already exit for --help
elif len(argv) <= 3:
make_initial_parser().parse_args(argv[1:])
sys.exit() # parse_args should already exit - not enough args
# A detector and type was given. We derive the arguments
# from the corresponding notebook
args, _ = make_initial_parser(add_help=False).parse_known_args(argv[1:])
try:
nb_info = notebooks[args.detector.upper()][args.type.upper()]
except KeyError:
print("Not one of the known calibrations or detectors")
sys.exit(1)
# Pick out any arguments that may prevent reproducibility from
# working, sorted alphabetically and converted back to their
# canonical representation.
not_reproducible_args = sorted(
('--' + x.replace('_', '-')
for x in ['skip_env_freeze']
if getattr(args, x))
)
# If any of these arguments are set, present a warning.
if not_reproducible_args:
print('WARNING: One or more command line arguments ({}) may prevent '
'this specific correction result from being reproducible based '
'on its metadata. It may not be possible to restore identical '
'output data files when they have been deleted or lost. Please '
'ensure that the data retention policy of the chosen storage '
'location is sufficient for your '
'needs.'.format(', '.join(not_reproducible_args)))
if not args.not_reproducible:
# If not explicitly specified that reproducibility may be
# broken, remind the user and exit.
print('To proceed, you can explicitly allow reproducibility to '
'be broken by adding --not-reproducible')
sys.exit(1)
if nb_info["notebook"]:
notebook = os.path.join(PKG_DIR, nb_info["notebook"])
else:
# If `"notebook"` entry is None, then set it to the user provided
# notebook TODO: This is a very hacky workaround, better implementation
# is not really possible with the current state of this module
user_notebook_path = nb_info["user"]["notebook"]
# Pull out the variables in the templated path string, and get values
# from command line args (e.g. --proposal 1234 -> {proposal})
user_notebook_variables = [
name for (_, name, _, _) in string.Formatter().parse(user_notebook_path)
if name is not None
]
user_notebook_parser = argparse.ArgumentParser(add_help=False)
for var in user_notebook_variables:
user_notebook_parser.add_argument(f"--{var}")
user_notebook_args, _ = user_notebook_parser.parse_known_args(argv[1:])
notebook = user_notebook_path.format(**vars(user_notebook_args))
concurrency = nb_info.get("concurrency", {'parameter': None})
nb = nbformat.read(notebook, as_version=4)
# extend parameters if needed
ext_func = nb_info.get("extend parms", None)
if ext_func is not None:
extend_params(nb, ext_func, argv)
default_params = extract_parameters(nb, lang='python')
parser = make_initial_parser()
parser.description = make_epilog(nb)
add_args_from_nb(default_params, parser, cvar=concurrency['parameter'])
arg_dict = deconsolize_args(vars(parser.parse_args(argv[1:])))
user_venv = nb_info.get("user", {}).get("venv")
if user_venv is not None:
user_venv = Path(user_venv.format(**arg_dict))
return arg_dict, NBDetails(
detector=args.detector.upper(),
caltype=args.type.upper(),
path=Path(notebook),
pre_paths=[Path(PKG_DIR, p) for p in nb_info.get('pre_notebooks', [])],
dep_paths=[Path(PKG_DIR, p) for p in nb_info.get('dep_notebooks', [])],
contents=nb,
default_params=default_params,
concurrency=concurrency,
user_venv=user_venv,
)