Skip to content
Snippets Groups Projects
Commit 0f55c690 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Rethink what information goes in which file

parent c4be4941
No related branches found
No related tags found
1 merge request!544Reproducibility, step 1
...@@ -15,7 +15,7 @@ import textwrap ...@@ -15,7 +15,7 @@ import textwrap
import warnings import warnings
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from subprocess import DEVNULL, check_output, call from subprocess import DEVNULL, check_output, check_call, call
from typing import List, Union from typing import List, Union
import nbformat import nbformat
...@@ -178,6 +178,7 @@ def extract_title_author(nb): ...@@ -178,6 +178,7 @@ def extract_title_author(nb):
author = author[0] if len(author) else None author = author[0] if len(author) else None
return title, author return title, author
def get_pycalib_version(): def get_pycalib_version():
# Try to get version from the git # Try to get version from the git
# Will work only in case of the development installation # Will work only in case of the development installation
...@@ -196,6 +197,13 @@ def get_pycalib_version(): ...@@ -196,6 +197,13 @@ def get_pycalib_version():
return version return version
def get_python_version(python_exe):
if python_exe == sys.executable:
return '{}.{}.{}'.format(*sys.version_info)
return check_output([python_exe, '--version']).decode('utf-8').split()[1]
def get_cell_n(nb, cell_type, cell_n): def get_cell_n(nb, cell_type, cell_n):
""" """
Return notebook cell with given number and given type Return notebook cell with given number and given type
...@@ -786,13 +794,13 @@ class JobArgs: ...@@ -786,13 +794,13 @@ class JobArgs:
python=python, pkg_dir=PKG_DIR python=python, pkg_dir=PKG_DIR
) for a in self.args] ) for a in self.args]
def run_direct(self, run_tmp_path, python) -> int: def run_direct(self, work_dir, python) -> int:
return call(self.format_cmd(python), cwd=run_tmp_path) return call(self.format_cmd(python), cwd=work_dir)
def submit_job(self, run_tmp_path, python, slurm_opts, after_ok=(), after_any=()): def submit_job(self, work_dir, python, slurm_opts, after_ok=(), after_any=()):
cmd = slurm_opts.get_launcher_command(run_tmp_path, after_ok, after_any) cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
cmd += self.format_cmd(python) cmd += self.format_cmd(python)
output = check_output(cmd, cwd=run_tmp_path).decode('utf-8') output = check_output(cmd, cwd=work_dir).decode('utf-8')
return output.partition(';')[0].strip() # job ID return output.partition(';')[0].strip() # job ID
...@@ -813,23 +821,22 @@ class Step: ...@@ -813,23 +821,22 @@ class Step:
class JobGroup: # TODO: Naming - task? request? commission? class JobGroup: # TODO: Naming - task? request? commission?
def __init__(self, steps: List[Step], run_tmp_path, python): def __init__(self, steps: List[Step], work_dir: Path, python):
self.steps = [s for s in steps if s.jobs] self.steps = [s for s in steps if s.jobs]
self.run_tmp_path = run_tmp_path self.work_dir = work_dir
self.python = python self.python = python
def get_info(self): @classmethod
return { def from_dir(cls, work_dir: Path, python):
'steps': [step.to_dict() for step in self.steps], d = json.loads((work_dir / 'steps.json').read_text('utf-8'))
'environment': { steps = [Step.from_dict(sd) for sd in d['steps']]
'python_version': check_output([ return cls(steps, work_dir, python)
self.python, '--version'
]).decode('utf-8').split()[1], def save(self):
'packages': json.loads(check_output([ with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f:
self.python, '-m', 'pip', 'list', '--format', 'json' json.dump({
]).decode('utf-8')), 'steps': [step.to_dict() for step in self.steps]
} }, f, indent=2)
}
def submit_jobs(self, slurm_opts: SlurmOptions): def submit_jobs(self, slurm_opts: SlurmOptions):
all_job_ids = [] all_job_ids = []
...@@ -838,7 +845,7 @@ class JobGroup: # TODO: Naming - task? request? commission? ...@@ -838,7 +845,7 @@ class JobGroup: # TODO: Naming - task? request? commission?
step_job_ids = [] step_job_ids = []
kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids} kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
for job_desc in step.jobs: for job_desc in step.jobs:
jid = job_desc.submit_job(self.run_tmp_path, self.python, slurm_opts, **kw) jid = job_desc.submit_job(self.work_dir, self.python, slurm_opts, **kw)
step_job_ids.append(jid) step_job_ids.append(jid)
dep_job_ids = step_job_ids dep_job_ids = step_job_ids
all_job_ids.extend(step_job_ids) all_job_ids.extend(step_job_ids)
...@@ -852,7 +859,7 @@ class JobGroup: # TODO: Naming - task? request? commission? ...@@ -852,7 +859,7 @@ class JobGroup: # TODO: Naming - task? request? commission?
continue continue
exit_codes = [ exit_codes = [
j.run_direct(self.run_tmp_path, self.python) for j in step.jobs j.run_direct(self.work_dir, self.python) for j in step.jobs
] ]
if any([ec != 0 for ec in exit_codes]): if any([ec != 0 for ec in exit_codes]):
errors = True errors = True
...@@ -1025,6 +1032,13 @@ def run(): ...@@ -1025,6 +1032,13 @@ def run():
print(f"report_to path contained no path, saving report in '{out_path}'") print(f"report_to path contained no path, saving report in '{out_path}'")
report_to = out_path / report_to report_to = out_path / report_to
user_venv = nb_info.get("user", {}).get("venv")
if user_venv:
user_venv = Path(user_venv.format(**args))
python_exe = str(user_venv / 'bin' / 'python')
else:
python_exe = python_path
# Write metadata about calibration job to output folder # Write metadata about calibration job to output folder
metadata = cal_tools.tools.CalibrationMetadata(out_path) metadata = cal_tools.tools.CalibrationMetadata(out_path)
...@@ -1044,8 +1058,16 @@ def run(): ...@@ -1044,8 +1058,16 @@ def run():
'title': title, 'title': title,
'author': author, 'author': author,
} }
metadata['python-environment'] = {
'path': user_venv,
'python-version': get_python_version(python_exe),
}
metadata.save() metadata.save()
# Record installed Python packages for reproducing the environment
with open(os.path.join(run_tmp_path, 'requirements.txt'), 'wb') as f:
check_call([python_exe, '-m', 'pip', 'freeze'], stdout=f)
folder = get_par_attr(parms, 'in_folder', 'value', '') folder = get_par_attr(parms, 'in_folder', 'value', '')
if args["request_time"] == "Now": if args["request_time"] == "Now":
...@@ -1053,13 +1075,6 @@ def run(): ...@@ -1053,13 +1075,6 @@ def run():
else: else:
request_time = args["request_time"] request_time = args["request_time"]
user_venv = nb_info.get("user", {}).get("venv")
if user_venv:
user_venv = Path(user_venv.format(**args))
python_exe = str(user_venv / 'bin' / 'python')
else:
python_exe = python_path
pre_jobs = [] pre_jobs = []
cluster_cores = concurrency.get("cluster cores", 8) cluster_cores = concurrency.get("cluster cores", 8)
# Check if there are pre-notebooks # Check if there are pre-notebooks
...@@ -1143,13 +1158,10 @@ def run(): ...@@ -1143,13 +1158,10 @@ def run():
Step(pre_jobs), Step(pre_jobs),
Step(main_jobs), Step(main_jobs),
Step(dep_jobs, after_error=True) Step(dep_jobs, after_error=True)
], run_tmp_path, python_exe) ], Path(run_tmp_path), python_exe)
# Save information about jobs and environment for reproducibility # Save information about jobs for reproducibility
repro_info = job_group.get_info() job_group.save()
repro_info['pycalibration'] = {'version': version}
with open(os.path.join(run_tmp_path, 'exec_details.json'), 'w') as f:
json.dump(repro_info, f, indent=2)
submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
......
import argparse import argparse
import json
import shutil import shutil
import sys import sys
from datetime import datetime from datetime import datetime
...@@ -10,7 +9,7 @@ import yaml ...@@ -10,7 +9,7 @@ import yaml
from nbparameterise import extract_parameters, parameter_values, replace_definitions from nbparameterise import extract_parameters, parameter_values, replace_definitions
from .calibrate import ( from .calibrate import (
Step, JobGroup, SlurmOptions, run_finalize, get_pycalib_version, JobGroup, SlurmOptions, run_finalize, get_pycalib_version,
) )
from .settings import temp_path from .settings import temp_path
...@@ -34,7 +33,7 @@ def new_report_path(old_out_folder, old_report_path, new_out_folder): ...@@ -34,7 +33,7 @@ def new_report_path(old_out_folder, old_report_path, new_out_folder):
def main(argv=None): def main(argv=None):
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("from_dir", help="A directory containing exec_details.json") ap.add_argument("from_dir", type=Path, help="A directory containing steps.json")
ap.add_argument("--python", help="Path to Python executable to run notebooks") ap.add_argument("--python", help="Path to Python executable to run notebooks")
ap.add_argument("--out-folder", help="Directory to put output data") ap.add_argument("--out-folder", help="Directory to put output data")
ap.add_argument("--slurm-partition", help="Submit jobs in this Slurm partition") ap.add_argument("--slurm-partition", help="Submit jobs in this Slurm partition")
...@@ -42,6 +41,9 @@ def main(argv=None): ...@@ -42,6 +41,9 @@ def main(argv=None):
help="Run notebooks here, not in cluster jobs") help="Run notebooks here, not in cluster jobs")
args = ap.parse_args(argv) args = ap.parse_args(argv)
if not (args.from_dir / 'steps.json').is_file():
sys.exit(f"Directory {args.from_dir} does not contain steps.json")
start_time = datetime.now() start_time = datetime.now()
run_uuid = f"t{start_time:%y%m%d_%H%M%S}" run_uuid = f"t{start_time:%y%m%d_%H%M%S}"
...@@ -51,7 +53,6 @@ def main(argv=None): ...@@ -51,7 +53,6 @@ def main(argv=None):
) )
print(f"New working directory: {working_dir}") print(f"New working directory: {working_dir}")
exec_details = json.loads((working_dir / 'exec_details.json').read_text('utf-8'))
cal_metadata = yaml.safe_load( cal_metadata = yaml.safe_load(
(working_dir / 'calibration_metadata.yml').read_text('utf-8') (working_dir / 'calibration_metadata.yml').read_text('utf-8')
) )
...@@ -68,10 +69,8 @@ def main(argv=None): ...@@ -68,10 +69,8 @@ def main(argv=None):
Path(out_folder).mkdir(parents=True, exist_ok=True) Path(out_folder).mkdir(parents=True, exist_ok=True)
shutil.copy(working_dir / 'calibration_metadata.yml', out_folder) shutil.copy(working_dir / 'calibration_metadata.yml', out_folder)
job_group = JobGroup( job_group = JobGroup.from_dir(
[Step.from_dict(d) for d in exec_details['steps']], working_dir, python=(args.python or sys.executable)
run_tmp_path=str(working_dir),
python=(args.python or sys.executable),
) )
if args.no_cluster_job: if args.no_cluster_job:
job_group.run_direct() job_group.run_direct()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment