From bb3a6a259c4bcea3092b1b8609bb08a549004554 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Wed, 18 Aug 2021 14:46:54 +0100 Subject: [PATCH] Run finalize step after repeating task --- src/xfel_calibrate/calibrate.py | 49 ++++++++++++++------------------- src/xfel_calibrate/finalize.py | 19 ++++++------- src/xfel_calibrate/repeat.py | 39 +++++++++++++++++++++++--- 3 files changed, 65 insertions(+), 42 deletions(-) diff --git a/src/xfel_calibrate/calibrate.py b/src/xfel_calibrate/calibrate.py index 24563c6c4..a5cf5c135 100755 --- a/src/xfel_calibrate/calibrate.py +++ b/src/xfel_calibrate/calibrate.py @@ -162,7 +162,7 @@ def deconsolize_args(args): return {k.replace("-", "_"): v for k, v in args.items()} -def extract_title_author_version(nb): +def extract_title_author(nb): """ Tries to extract title, author from markdown. The version is taken from git. @@ -176,7 +176,9 @@ def extract_title_author_version(nb): title = title[0] if len(title) else None author = author[0] if len(author) else None + return title, author +def get_pycalib_version(): # Try to get version from the git # Will work only in case of the development installation # Suppress output errors @@ -191,7 +193,7 @@ def extract_title_author_version(nb): except: from .VERSION import __version__ version = __version__ - return title, author, version + return version def get_cell_n(nb, cell_type, cell_n): @@ -579,9 +581,6 @@ def create_finalize_script(fmt_args, temp_path, job_list) -> str: finaljob=os.environ.get('SLURM_JOB_ID', ''), run_path='{{run_path}}', out_path='{{out_path}}', - project='{{project}}', - calibration='{{calibration}}', - author='{{author}}', version='{{version}}', report_to='{{report_to}}', data_path='{{in_folder}}', @@ -927,23 +926,6 @@ def make_par_table(parms, run_tmp_path: str): finfile.write(textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))) -def make_pipeline_yaml(parms, version, concurrency, report_path, output_dir): - """Adds information from arguments to metadata file""" - - metadata = cal_tools.tools.CalibrationMetadata(output_dir) - - parm_subdict = metadata.setdefault("calibration-configurations", {}) - for p in parms: - name = consolize_name(p.name) - parm_subdict[name] = p.value - - metadata["pycalibration-version"] = version - metadata["report-path"] = f"{report_path}.pdf" - metadata["concurrency"] = concurrency - - metadata.save() - - def run(): """ Run a calibration task with parser arguments """ # Ensure files are opened as UTF-8 by default, regardless of environment. @@ -987,7 +969,8 @@ def run(): parms = extract_parameters(nb, lang='python') - title, author, version = extract_title_author_version(nb) + title, author = extract_title_author(nb) + version = get_pycalib_version() if not title: title = "{} {} Calibration".format(detector, caltype) @@ -1043,12 +1026,25 @@ def run(): report_to = out_path / report_to # Write metadata about calibration job to output folder - concurr_details = { + metadata = cal_tools.tools.CalibrationMetadata(out_path) + + parm_subdict = metadata.setdefault("calibration-configurations", {}) + for p in parms: + name = consolize_name(p.name) + parm_subdict[name] = p.value + + metadata["pycalibration-version"] = version + metadata["report-path"] = f"{report_to}.pdf" + metadata["concurrency"] = { 'parameter': concurrency_par, 'default': concurrency_defval, 'function': concurrency_func, } - make_pipeline_yaml(parms, version, concurr_details, report_to, out_path) + metadata["notebook"] = { + 'title': title, + 'author': author, + } + metadata.save() folder = get_par_attr(parms, 'in_folder', 'value', '') @@ -1177,9 +1173,6 @@ def run(): fmt_args = {'run_path': run_tmp_path, 'out_path': out_path, - 'project': title, - 'calibration': title, - 'author': author, 'version': version, 'report_to': report_to, 'in_folder': folder, diff --git a/src/xfel_calibrate/finalize.py b/src/xfel_calibrate/finalize.py index c9c44a57a..30d3b8b9f 100644 --- a/src/xfel_calibrate/finalize.py +++ b/src/xfel_calibrate/finalize.py @@ -203,7 +203,7 @@ def make_timing_summary(run_path: Path, job_times: List[List[str]], time_table=time_table.split("\n")))) -def make_report(run_path: Path, tmp_path: Path, out_path: Path, project: str, +def make_report(run_path: Path, tmp_path: Path, project: str, author: str, version: str, report_to: Path): """ Create calibration report (pdf file) @@ -213,8 +213,6 @@ def make_report(run_path: Path, tmp_path: Path, out_path: Path, project: str, :param run_path: Path to sphinx run directory :param tmp_path: Run path of the slurm job - :param out_path: Output directory for report. - Overwritten if path to report is given in `report_to` :param project: Project title :param author: Author of the notebook :param version: Version of the notebook @@ -388,11 +386,9 @@ def tex_escape(text): return regex.sub(lambda match: conv[match.group()], text) -def finalize(joblist, finaljob, run_path, out_path, project, calibration, - author, version, report_to, data_path='Unknown', +def finalize(joblist, finaljob, run_path, out_path, version, report_to, data_path='Unknown', request_time='', submission_time=''): run_path = Path(run_path) - out_path = Path(out_path) print("Waiting on jobs to finish: {}".format(joblist)) while True: found_jobs = set() @@ -411,6 +407,10 @@ def finalize(joblist, finaljob, run_path, out_path, project, calibration, if finaljob: joblist.append(str(finaljob)) metadata = cal_tools.tools.CalibrationMetadata(out_path) + nb_info = metadata.get('notebook', {}) + title = nb_info.get('title', 'Unknown calibration') + author = nb_info.get('author', 'anonymous') + job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',') job_time_summary = get_job_info(joblist, job_time_fmt) pipeline_time_summary = { @@ -431,13 +431,12 @@ def finalize(joblist, finaljob, run_path, out_path, project, calibration, metadata.save() metadata.save_copy(run_path) - sphinx_path = combine_report(run_path, calibration) - make_titlepage(sphinx_path, project, data_path, version) + sphinx_path = combine_report(run_path, title) + make_titlepage(sphinx_path, title, data_path, version) make_report( Path(sphinx_path), run_path, - out_path, - project, + title, author, version, Path(report_to), diff --git a/src/xfel_calibrate/repeat.py b/src/xfel_calibrate/repeat.py index 580b1d3e8..3a7dcf55a 100644 --- a/src/xfel_calibrate/repeat.py +++ b/src/xfel_calibrate/repeat.py @@ -5,7 +5,11 @@ from datetime import datetime from pathlib import Path from shutil import copytree -from .calibrate import Step, JobGroup, SlurmOptions +import yaml + +from .calibrate import ( + Step, JobGroup, SlurmOptions, run_finalize, get_pycalib_version, +) from .settings import temp_path def main(argv=None): @@ -17,12 +21,18 @@ def main(argv=None): help="Run notebooks here, not in cluster jobs") args = ap.parse_args(argv) - run_uuid = f"t{datetime.now():%y%m%d_%H%M%S}" + start_time = datetime.now() + run_uuid = f"t{start_time:%y%m%d_%H%M%S}" working_dir = Path(temp_path, f'slurm_out_repeat_{run_uuid}') copytree(args.from_dir, working_dir) + print(f"New working directory: {working_dir}") - exec_details = json.loads((working_dir / 'exec_details.json').read_text('utf-8')) + exec_details = json.loads((working_dir / 'exec_details.json').read_text('utf-8')) + cal_metadata = yaml.safe_load( + (working_dir / 'calibration_metadata.yml').read_text('utf-8') + ) + prev_parameters = cal_metadata['calibration-configurations'] job_group = JobGroup( [Step.from_dict(d) for d in exec_details['steps']], @@ -31,10 +41,31 @@ def main(argv=None): ) if args.no_cluster_job: job_group.run_direct() + joblist = [] else: - job_group.submit_jobs(SlurmOptions( + joblist = job_group.submit_jobs(SlurmOptions( partition=args.slurm_partition, )) + fmt_args = {'run_path': working_dir, + 'out_path': prev_parameters['out-folder'], + 'version': get_pycalib_version(), + 'report_to': prev_parameters['report-path'], + 'in_folder': prev_parameters['in-folder'], + 'request_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'), + 'submission_time': start_time.strftime('%Y-%m-%dT%H:%M:%S'), + } + + joblist.append(run_finalize( + fmt_args=fmt_args, + temp_path=working_dir, + job_list=joblist, + sequential=args.no_cluster_job, + )) + + if any(j is not None for j in joblist): + print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) + + if __name__ == '__main__': sys.exit(main()) -- GitLab