Skip to content
Snippets Groups Projects
Commit 63b1c17f authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Break up code into smaller functions

parent 74af7abf
No related branches found
No related tags found
1 merge request!331Some cleanup in calibrate.py
......@@ -3,6 +3,7 @@
import argparse
from datetime import datetime
import dateutil.parser
import inspect
import nbconvert
import nbformat
from nbparameterise import (
......@@ -36,10 +37,12 @@ class RawTypeFormatter(argparse.RawDescriptionHelpFormatter,
# The argument parser for calibrate.py, will be extended depending
# on the options given.
def make_initial_parser():
parser = argparse.ArgumentParser(description="Main entry point "
"for offline calibration",
formatter_class=RawTypeFormatter)
def make_initial_parser(**kwargs):
parser = argparse.ArgumentParser(
description="Main entry point for offline calibration",
formatter_class=RawTypeFormatter,
**kwargs
)
parser.add_argument('detector', metavar='DETECTOR', type=str,
help='The detector to calibrate: ' + ", ".join(notebooks))
......@@ -88,9 +91,6 @@ def make_initial_parser():
return parser
parser = make_initial_parser()
# Helper functions for parser extensions
def make_intelli_list(ltype):
......@@ -298,165 +298,151 @@ def balance_sequences(in_folder, run, sequences, sequences_per_node,
l.size > 0]
# extend the parser according to user input
# the first case is if a detector was given, but no calibration type
if len(sys.argv) == 3 and "-h" in sys.argv[2]:
detector = sys.argv[1].upper()
def make_extended_parser() -> argparse.ArgumentParser:
"""Create an ArgumentParser using information from the notebooks"""
# extend the parser according to user input
# the first case is if a detector was given, but no calibration type
if len(sys.argv) == 3 and "-h" in sys.argv[2]:
detector = sys.argv[1].upper()
try:
det_notebooks = notebooks[detector]
except KeyError:
print("Not one of the known detectors: {}".format(notebooks.keys()))
sys.exit(1)
msg = "Options for detector {}\n".format(detector)
msg += "*" * len(msg) + "\n\n"
# basically, this creates help in the form of
#
# TYPE some description that is
# indented for this type.
#
# The information is extracted from the first markdown cell of
# the notebook.
for caltype, notebook in det_notebooks.items():
nbpath = os.path.abspath(
"{}/{}".format(os.path.dirname(__file__), notebook["notebook"]))
with open(nbpath, "r") as f:
nb = nbformat.read(f, as_version=4)
msg += make_epilog(nb, caltype=caltype)
return make_initial_parser(epilog=msg)
elif len(sys.argv) <= 3:
return make_initial_parser()
# A detector and type was given. We derive the arguments
# from the corresponding notebook
args, _ = make_initial_parser(add_help=False).parse_known_args()
try:
det_notebooks = notebooks[detector]
except KeyError:
print("Not one of the known detectors: {}".format(notebooks.keys()))
exit()
msg = "Options for detector {}\n".format(detector)
msg += "*" * len(msg) + "\n\n"
# basically, this creates help in the form of
#
# TYPE some description that is
# indented for this type.
#
# The information is extracted from the first markdown cell of
# the notebook.
for caltype, notebook in det_notebooks.items():
nbpath = os.path.abspath(
"{}/{}".format(os.path.dirname(__file__), notebook["notebook"]))
with open(nbpath, "r") as f:
nb = nbformat.read(f, as_version=4)
msg += make_epilog(nb, caltype=caltype)
parser.epilog = msg
# second case is if no detector was given either
elif len(sys.argv) == 2 and "-h" in sys.argv[1]:
epilog = "Available detectors are: {}".format(
", ".join([k for k in notebooks.keys()]))
parser.epilog = epilog
# final case: a detector and type was given. We derive the arguments
# from the corresponding notebook
elif len(sys.argv) >= 3:
detector = sys.argv[1].upper()
caltype = sys.argv[2].upper()
try:
notebook = os.path.abspath(
"{}/{}".format(os.path.dirname(__file__), notebooks[detector][caltype]["notebook"]))
cvar = notebooks[detector][caltype].get("concurrency",
{"parameter": None,
"default concurrency": None,
"cluster cores": 8})["parameter"]
nb_info = notebooks[args.detector.upper()][args.type.upper()]
except KeyError:
print("Not one of the known calibrations or detectors")
exit()
with open(notebook, "r") as f:
nb = nbformat.read(f, as_version=4)
ext_func = notebooks[detector][caltype].get("extend parms", None)
def do_parse(nb, parser, overwrite_reqs=False):
parser.description = make_epilog(nb)
parms = extract_parameters(nb)
for p in parms:
helpstr = ("Default: %(default)s" if not p.comment
else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip()))
required = (p.comment is not None
and "required" in p.comment
and not overwrite_reqs
and p.name != cvar)
# This may be not a public API
# May require reprogramming in case of argparse updates
pars_group = parser._action_groups[2 if required else 1]
default = p.value if (not required) else None
if p.type == list or p.name == cvar:
if p.type is list:
try:
ltype = type(p.value[0])
except:
print(
"List '{}' is empty. Parameter type can not be defined.".format(p.name))
print("See first code cell in jupyter-notebook: '{}'".format(
notebooks[detector][caltype]["notebook"]))
exit()
else:
ltype = p.type
range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False
pars_group.add_argument("--{}".format(consolize_name(p.name)),
nargs='+',
type=ltype if not range_allowed else str,
default=default,
help=helpstr,
required=required,
action=make_intelli_list(ltype) if range_allowed else None)
elif p.type == bool:
# check if an input arg is given with an extra "-no" for
# forcing to convert a bool to False.
# Otherwise leave the default value from the notebook
# or convert to true if the bool arg is given.
if consolize_name("--no-{}".format(p.name)) in sys.argv:
pars_group.add_argument("--{}".format(consolize_name(p.name)),
action="store_false",
default=False,
help=helpstr,
required=required)
sys.argv.remove(consolize_name("--no-{}".format(p.name)))
else:
pars_group.add_argument("--{}".format(consolize_name(p.name)),
action="store_true",
default=default,
help=helpstr,
required=required)
else:
pars_group.add_argument("--{}".format(consolize_name(p.name)),
type=p.type,
default=default,
help=helpstr,
required=required)
sys.exit(1)
do_parse(nb, parser, True)
notebook = os.path.join(PKG_DIR, nb_info["notebook"])
cvar = nb_info.get("concurrency", {}).get("parameter", None)
nb = nbformat.read(notebook, as_version=4)
# extend parameters if needed
ext_func = notebooks[detector][caltype].get("extend parms", None)
if ext_func is not None:
func = get_notebook_function(nb, ext_func)
if func is None:
warnings.warn(f"Didn't find concurrency function {ext_func} in notebook",
RuntimeWarning)
# extend parameters if needed
ext_func = nb_info.get("extend parms", None)
if ext_func is not None:
extend_params(nb, ext_func)
# No extend parms function - add statically defined parameters from the
# first code cell
parser = make_initial_parser()
add_args_from_nb(nb, parser, cvar=cvar)
return parser
def add_args_from_nb(nb, parser, cvar=None, overwrite_reqs=False):
parser.description = make_epilog(nb)
parms = extract_parameters(nb)
for p in parms:
helpstr = ("Default: %(default)s" if not p.comment
else "{}. Default: %(default)s".format(p.comment.replace("#", " ").strip()))
required = (p.comment is not None
and "required" in p.comment
and not overwrite_reqs
and p.name != cvar)
# This may be not a public API
# May require reprogramming in case of argparse updates
pars_group = parser._action_groups[2 if required else 1]
default = p.value if (not required) else None
if p.type == list or p.name == cvar:
if p.type is list:
ltype = type(p.value[0])
else:
ltype = p.type
range_allowed = "RANGE ALLOWED" in p.comment.upper() if p.comment else False
pars_group.add_argument("--{}".format(consolize_name(p.name)),
nargs='+',
type=ltype if not range_allowed else str,
default=default,
help=helpstr,
required=required,
action=make_intelli_list(ltype) if range_allowed else None)
elif p.type == bool:
# check if an input arg is given with an extra "-no" for
# forcing to convert a bool to False.
# Otherwise leave the default value from the notebook
# or convert to true if the bool arg is given.
if consolize_name("--no-{}".format(p.name)) in sys.argv:
pars_group.add_argument("--{}".format(consolize_name(p.name)),
action="store_false",
default=False,
help=helpstr,
required=required)
sys.argv.remove(consolize_name("--no-{}".format(p.name)))
else:
# remove help calls as they will cause the argument parser to exit
add_help = False
if "-h" in sys.argv:
sys.argv.remove("-h")
add_help = True
if "--help" in sys.argv:
sys.argv.remove("--help")
add_help = True
known, remaining = parser.parse_known_args()
if add_help:
sys.argv.append("--help")
args = deconsolize_args(vars(known))
pars_group.add_argument("--{}".format(consolize_name(p.name)),
action="store_true",
default=default,
help=helpstr,
required=required)
else:
pars_group.add_argument("--{}".format(consolize_name(p.name)),
type=p.type,
default=default,
help=helpstr,
required=required)
def extend_params(nb, extend_func_name):
"""Add parameters in the first code cell by calling a function in the notebook
"""
func = get_notebook_function(nb, extend_func_name)
df = {}
if func is None:
warnings.warn(
f"Didn't find concurrency function {extend_func_name} in notebook",
RuntimeWarning
)
return
exec(func, df)
f = df[ext_func]
import inspect
sig = inspect.signature(f)
callargs = []
for arg in sig.parameters:
callargs.append(args[arg])
# Make a temporary parser that won't exit if it sees -h or --help
pre_parser = make_initial_parser(add_help=False)
add_args_from_nb(nb, pre_parser, overwrite_reqs=True)
known, _ = pre_parser.parse_known_args()
args = deconsolize_args(vars(known))
extention = f(*callargs)
fcc = first_code_cell(nb)
fcc["source"] += "\n" + extention
parser = make_initial_parser()
do_parse(nb, parser, False)
df = {}
exec(func, df)
f = df[extend_func_name]
sig = inspect.signature(f)
extension = f(*[args[p] for p in sig.parameters])
fcc = first_code_cell(nb)
fcc["source"] += "\n" + extension
def has_parm(parms, name):
......@@ -764,7 +750,7 @@ def make_par_table(parms, run_tmp_path):
def run():
""" Run a calibration task with parser arguments """
parser = make_extended_parser()
args = deconsolize_args(vars(parser.parse_args()))
detector = args["detector"].upper()
caltype = args["type"].upper()
......@@ -793,27 +779,7 @@ def run():
# extend parameters if needed
ext_func = nb_info.get("extend parms", None)
if ext_func is not None:
func = get_notebook_function(nb, ext_func)
if func is None:
warnings.warn(f"Didn't find concurrency function {ext_func} in notebook",
RuntimeWarning)
else:
# remove help calls as they will cause the argument parser to exit
known, remaining = parser.parse_known_args()
args = deconsolize_args(vars(known))
df = {}
exec(func, df)
f = df[ext_func]
import inspect
sig = inspect.signature(f)
callargs = []
for arg in sig.parameters:
callargs.append(args[arg])
extention = f(*callargs)
fcc = first_code_cell(nb)
fcc["source"] += "\n" + extention
extend_params(nb, ext_func)
parms = extract_parameters(nb)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment