Skip to content
Snippets Groups Projects

Store CalCat requests/responses for reproducibility

Merged Thomas Kluyver requested to merge calparrot into master
Files
3
@@ -385,11 +385,14 @@ class JobArgs:
"""Run this job in a local process, return exit status"""
return call(self.format_cmd(python), cwd=work_dir)
def submit_job(self, work_dir, python, slurm_opts, after_ok=(), after_any=()):
def submit_job(
self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None
):
"""Submit this job to Slurm, return its job ID"""
cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
cmd += self.format_cmd(python)
output = check_output(cmd, cwd=work_dir).decode('utf-8')
# sbatch propagates environment variables into the job by default
output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8')
return output.partition(';')[0].strip() # job ID
@@ -440,7 +443,7 @@ class JobChain:
'steps': [step.to_dict() for step in self.steps]
}, f, indent=2)
def submit_jobs(self, slurm_opts: SlurmOptions):
def submit_jobs(self, slurm_opts: SlurmOptions, env=None):
"""Submit these jobs to Slurm, return a list of job IDs
Slurm dependencies are used to manage the sequence of jobs.
@@ -453,7 +456,9 @@ class JobChain:
step_job_ids = []
kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
for job_desc in step.jobs:
jid = job_desc.submit_job(self.work_dir, self.python, slurm_opts, **kw)
jid = job_desc.submit_job(
self.work_dir, self.python, slurm_opts, env=env, **kw
)
step_job_ids.append(jid)
dep_job_ids = step_job_ids
all_job_ids.extend(step_job_ids)
Loading