Skip to content
Snippets Groups Projects
Commit 4e64ee3d authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Rename JobGroup -> JobChain, add docstrings

parent 57cc19e0
No related branches found
No related tags found
1 merge request!544Reproducibility, step 1
......@@ -790,6 +790,7 @@ def prepare_job(
class JobArgs:
"""Command line arguments for running one calibration job"""
def __init__(self, args: List[str]):
self.args = args
......@@ -803,9 +804,11 @@ class JobArgs:
return [a.format(python=python) for a in self.args]
def run_direct(self, work_dir, python) -> int:
"""Run this job in a local process, return exit status"""
return call(self.format_cmd(python), cwd=work_dir)
def submit_job(self, work_dir, python, slurm_opts, after_ok=(), after_any=()):
"""Submit this job to Slurm, return its job ID"""
cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
cmd += self.format_cmd(python)
output = check_output(cmd, cwd=work_dir).decode('utf-8')
......@@ -813,6 +816,11 @@ class JobArgs:
class Step:
"""A group of jobs which may run in parallel
If after_error is True, this step should run even if previous steps failed.
Otherwise, it will only run if the previous steps succeed.
"""
def __init__(self, jobs: List[JobArgs], after_error=False):
self.jobs = jobs
self.after_error = after_error
......@@ -828,7 +836,13 @@ class Step:
return cls([JobArgs(args) for args in d['jobs']], d['after_error'])
class JobGroup: # TODO: Naming - task? request? commission?
class JobChain:
"""A collection of jobs to run for one call to xfel-calibrate
This is a chain of steps, each of which may contain several jobs.
It also holds the work directory, where the parameterised notebooks are
saved, and the path to the Python interpreter to run the notebooks.
"""
def __init__(self, steps: List[Step], work_dir: Path, python):
self.steps = [s for s in steps if s.jobs]
self.work_dir = work_dir
......@@ -836,17 +850,23 @@ class JobGroup: # TODO: Naming - task? request? commission?
@classmethod
def from_dir(cls, work_dir: Path, python):
"""Load a JobChain from a work directory containing steps.json"""
d = json.loads((work_dir / 'steps.json').read_text('utf-8'))
steps = [Step.from_dict(sd) for sd in d['steps']]
return cls(steps, work_dir, python)
def save(self):
"""Save the steps of this chain to steps.json in the work directory"""
with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f:
json.dump({
'steps': [step.to_dict() for step in self.steps]
}, f, indent=2)
def submit_jobs(self, slurm_opts: SlurmOptions):
"""Submit these jobs to Slurm, return a list of job IDs
Slurm dependencies are used to manage the sequence of jobs.
"""
print("Submitting jobs with Slurm options:")
print(" ".join(slurm_opts.get_launcher_command(self.work_dir)))
all_job_ids = []
......@@ -862,6 +882,7 @@ class JobGroup: # TODO: Naming - task? request? commission?
return all_job_ids
def run_direct(self) -> bool:
"""Run these jobs in local processes, return True if any failed"""
errors = False
for i, step in enumerate(self.steps):
if errors and not step.after_error:
......@@ -1175,14 +1196,14 @@ def run():
cluster_cores=cluster_cores,
))
job_group = JobGroup([
job_chain = JobChain([
Step(pre_jobs),
Step(main_jobs),
Step(dep_jobs, after_error=True)
], Path(run_tmp_path), python_exe)
# Save information about jobs for reproducibility
job_group.save()
job_chain.save()
if args['prepare_only']:
# FIXME: Clean up where this file goes when.
......@@ -1202,12 +1223,12 @@ def run():
# Launch the calibration work
if sequential:
print("Running notebooks directly, not via Slurm...")
errors = job_group.run_direct()
errors = job_chain.run_direct()
joblist = []
else:
print("Submitting jobs to Slurm...")
joblist = job_group.submit_jobs(SlurmOptions(
joblist = job_chain.submit_jobs(SlurmOptions(
job_name=args.get('slurm_name', 'xfel_calibrate'),
nice=args['slurm_scheduling'],
mem=args['slurm_mem'],
......
......@@ -9,7 +9,7 @@ from nbparameterise import extract_parameters, parameter_values, replace_definit
from cal_tools.tools import CalibrationMetadata
from .calibrate import (
JobGroup, SlurmOptions, run_finalize, get_pycalib_version,
JobChain, SlurmOptions, run_finalize, get_pycalib_version,
)
from .settings import temp_path
......@@ -68,14 +68,14 @@ def main(argv=None):
Path(out_folder).mkdir(parents=True, exist_ok=True)
shutil.copy(working_dir / 'calibration_metadata.yml', out_folder)
job_group = JobGroup.from_dir(
job_chain = JobChain.from_dir(
working_dir, python=(args.python or sys.executable)
)
if args.no_cluster_job:
job_group.run_direct()
job_chain.run_direct()
joblist = []
else:
joblist = job_group.submit_jobs(SlurmOptions(
joblist = job_chain.submit_jobs(SlurmOptions(
partition=args.slurm_partition,
))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment