diff --git a/xfel_calibrate/finalize.py b/xfel_calibrate/finalize.py index f3b3ae552d161882305087bcae42f70b8f7db64f..9f3fd19d516c83e1df35921cad2b319e5a964f85 100644 --- a/xfel_calibrate/finalize.py +++ b/xfel_calibrate/finalize.py @@ -9,6 +9,7 @@ from shutil import copy, copytree, move, rmtree from subprocess import CalledProcessError, check_call, check_output from textwrap import dedent from time import sleep +from typing import Dict, List import cal_tools.tools import tabulate @@ -127,37 +128,47 @@ def prepare_plots(run_path, threshold=1000000): shell=False) -def make_timing_summary(run_path, joblist, request_time, submission_time): +def get_job_info(jobs: List[str], fmt: List[str]) -> List[List[str]]: + """Returns list of job information from sacct + + :param jobs: List of job names + :param fmt: List of fields to query for each job (passed to sacct) + Result ordered according to order of jobs given + Order of fields in inner lists follows fmt + """ + + # will use JobID to match results to jobs (duplicate field in fmt is OK) + fmt_query = ",".join(["JobID"] + fmt) + sacct_out = check_output(["sacct", "--truncate", "--parsable2", "--noheader", + f"--jobs={','.join(jobs)}", + f"--format={fmt_query}"]) + lines = sacct_out.decode().split("\n") + + missing_info = ["not-found"] * len(fmt) + job_info = {job: missing_info for job in jobs} + for line in lines: + parts = line.split("|") + if parts[0] in job_info: + job_info[parts[0]] = parts[1:] + + return [job_info[job] for job in jobs] + + +def make_timing_summary(run_path: Path, job_times: List[List[str]], + job_time_fmt: List[str], pipeline_times: Dict[str, str]): """ Create an rst file with timing summary of executed notebooks :param run_path: Run path of the slurm job - :param joblist: List of slurm jobs - :param request_time: Timestamp of notebook request - :param submission_time: Timestamp for slurm jobs being submitted + :param job_times: List of job information as returned by get_job_info + :param job_time_fmt: List of headers to use for job_times + :param pipeline_times: Dictionary of pipeline step -> timestamp """ print('Prepare timing summary') - run_path = path.abspath(run_path) - pars_vals = [] - pars = 'JobID,Start,End,Elapsed,Suspended,State' - pars_name = pars.split(',') - - for job in joblist: - out = check_output(['sacct', '-T', '-j', job, - '--format={}'.format(pars)], - shell=False) - lines = str(out).split('\\n') - - # loop over output lines, skip first two lines with header. - for line in lines[2:]: - s = line.split() - if len(s) == len(pars_name): - pars_vals.append(s) - break tmpl = Template(''' Runtime summary - ============== + =============== .. math:: {% for line in time_table %} @@ -171,27 +182,24 @@ def make_timing_summary(run_path, joblist, request_time, submission_time): ''') - time_vals = [['Time of Request', request_time], - ['Job submission', submission_time], - ['Report compilation', - datetime.now().strftime('%Y-%m-%dT%H:%M:%S')]] - - with open("{}/timing_summary.rst".format(run_path), "w+") as gfile: - - if len(pars_vals) > 0: - job_table = tabulate.tabulate(pars_vals, tablefmt='latex', - headers=pars_name) + time_vals = [ + ["Time of Request", pipeline_times["request-time"]], + ["Job submission", pipeline_times["submission-time"]], + ["Report compilation", pipeline_times["report-compilation-time"]], + ] - time_table = tabulate.tabulate(time_vals, tablefmt='latex', - headers=['Processing step', - 'Timestamp']) + with (run_path / "timing_summary.rst").open("w+") as fd: + job_table = tabulate.tabulate(job_times, tablefmt="latex", + headers=job_time_fmt) + time_table = tabulate.tabulate(time_vals, tablefmt="latex", + headers=["Processing step", "Timestamp"]) - gfile.write(dedent(tmpl.render(job_table=job_table.split('\n'), - time_table=time_table.split('\n')))) + fd.write(dedent(tmpl.render(job_table=job_table.split("\n"), + time_table=time_table.split("\n")))) -def make_report(run_path: str, tmp_path: str, out_path: str, project: str, - author: str, version: str, report_to: str): +def make_report(run_path: Path, tmp_path: Path, out_path: Path, project: str, + author: str, version: str, report_to: Path): """ Create calibration report (pdf file) @@ -207,11 +215,9 @@ def make_report(run_path: str, tmp_path: str, out_path: str, project: str, :param version: Version of the notebook :param report_to: report path tailed with report name """ - run_path = path.abspath(run_path) - report_path, report_name = path.split(report_to) - if not report_path: - report_path = out_path + report_name = report_to.stem + report_dir = report_to.parent try: check_call([sys.executable, "-m", "sphinx.cmd.quickstart", @@ -231,13 +237,14 @@ def make_report(run_path: str, tmp_path: str, out_path: str, project: str, "Generated simple index.rst instead") # quickbuild went well we need to edit the index.rst and conf.py files - module_path = "{}".format(path.abspath(path.dirname(__file__))) + module_path = Path(__file__).absolute().parent + conf_fn = run_path / "conf.py" + tmp_conf_fn = run_path / "conf.py.tmp" - conf = SourceFileLoader("conf", - "{}/conf.py".format(run_path)).load_module() + conf = SourceFileLoader("conf", str(conf_fn)).load_module() l_var = [v for v in dir(conf) if not v.startswith('__')] - with open("{}/conf.py.tmp".format(run_path), "w") as mf: + with tmp_conf_fn.open("w") as mf: latex_elements = {'extraclassoptions': ',openany, oneside', 'preamble': r'\usepackage{longtable}', 'maketitle': r'\input{titlepage.tex.txt}'} @@ -261,29 +268,29 @@ def make_report(run_path: str, tmp_path: str, out_path: str, project: str, mf.write(tmpl.format(var, v)) - remove("{}/conf.py".format(run_path)) - move("{}/conf.py.tmp".format(run_path), "{}/conf.py".format(run_path)) + conf_fn.unlink() + move(str(tmp_conf_fn), str(conf_fn)) - direntries = listdir(run_path) + direntries = list(run_path.iterdir()) lead_rstfiles = ['InputParameters.rst', 'timing_summary.rst'] # Order rst files based on the known order(lead_rstfiles). # TODO: fix order somewhere else instead of munging filenames def sort_key(f): - if f in lead_rstfiles: - return lead_rstfiles.index(f), f - elif "summary" in f.lower(): - return len(lead_rstfiles), f - elif "precorrection" in f.lower(): - return len(lead_rstfiles) + 1, f + if f.name in lead_rstfiles: + return lead_rstfiles.index(f.name), f.name + elif "summary" in f.name.lower(): + return len(lead_rstfiles), f.name + elif "precorrection" in f.name.lower(): + return len(lead_rstfiles) + 1, f.name else: - return len(lead_rstfiles) + 2, f + return len(lead_rstfiles) + 2, f.name direntries.sort(key=sort_key) files_to_handle = [] for entry in direntries: - if isfile("{}/{}".format(run_path, entry)): - name, ext = splitext("{}".format(entry)) + if entry.is_file(): + name, ext = entry.stem, entry.suffix if ext == ".rst" and "index" not in name: files_to_handle.append(name.strip()) @@ -298,7 +305,7 @@ def make_report(run_path: str, tmp_path: str, out_path: str, project: str, {%- endfor %} ''') - with open("{}/index.rst".format(run_path), "w+") as mf: + with (run_path / "index.rst").open("w+") as mf: mf.write(dedent(index_tmp.render(keys=files_to_handle))) # finally call the make scripts @@ -313,29 +320,21 @@ def make_report(run_path: str, tmp_path: str, out_path: str, project: str, f"can be inspected at: {run_path}") return - print(f"Moving report to final location: {report_path}") - makedirs(report_path, exist_ok=True) - copy(f'{run_path}/_build/latex/{report_name}.pdf', report_path) + print(f"Moving report to final location: {report_dir}") + report_dir.mkdir(parents=True, exist_ok=True) + copy(run_path / "_build" / "latex" / f"{report_name}.pdf", report_dir) - temp_dirs = glob(f'{tmp_path}/*/') # Remove folders with figures and sphinx files. - print(f"Removing directories [{temp_dirs}] in temp folder: {tmp_path}") - for dtmp in temp_dirs: - rmtree(f'{dtmp}/') - - # Archiving files in slurm_tmp - out_path = Path(out_path) - tmp_path = Path(tmp_path) - metadata = cal_tools.tools.CalibrationMetadata(out_path) - # TODO: add runtime summary - metadata.save_copy(tmp_path) + for tmp_subdir in tmp_path.iterdir(): + if tmp_subdir.is_dir(): + print(f"Removing temporary subdir: {tmp_subdir}") + rmtree(tmp_subdir) # Moving temporary files to out-folder after successful execution - # This helps in keeping elements needed for re-producibility. - print(f"Moving temporary files to final location" - f": {report_path}/{os.path.basename(tmp_path)} with name: " - f"slurm_out_{report_name}") - move(tmp_path, f"{report_path}/slurm_out_{report_name}") + # This helps in keeping elements needed for reproducibility. + slurm_archive_dir = report_dir / f"slurm_out_{report_name}" + print(f"Moving temporary files to final location: {slurm_archive_dir}") + move(str(tmp_path), str(slurm_archive_dir)) def make_titlepage(sphinx_path, project, data_path, version): @@ -387,6 +386,8 @@ def tex_escape(text): def finalize(joblist, finaljob, run_path, out_path, project, calibration, author, version, report_to, data_path='Unknown', request_time='', submission_time=''): + run_path = Path(run_path) + out_path = Path(out_path) print("Waiting on jobs to finish: {}".format(joblist)) while True: found_jobs = set() @@ -400,9 +401,38 @@ def finalize(joblist, finaljob, run_path, out_path, project, calibration, sleep(10) prepare_plots(run_path) - make_timing_summary(run_path, joblist + [str(finaljob)], request_time, - submission_time) + + # Archiving files in slurm_tmp + joblist.append(str(finaljob)) + metadata = cal_tools.tools.CalibrationMetadata(out_path) + job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',') + job_time_summary = get_job_info(joblist, job_time_fmt) + pipeline_time_summary = { + "request-time": request_time, + "submission-time": submission_time, + "report-compilation-time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), + } + make_timing_summary(run_path, job_time_summary, job_time_fmt, pipeline_time_summary) + metadata.update( + { + "runtime-summary": { + "calibration-jobs": [dict(zip(job_time_fmt, job_info)) + for job_info in job_time_summary], + "pipeline-steps": pipeline_time_summary, + } + } + ) + metadata.save() + metadata.save_copy(run_path) + sphinx_path = combine_report(run_path, calibration) make_titlepage(sphinx_path, project, data_path, version) - make_report(sphinx_path, run_path, out_path, project, author, version, - report_to) + make_report( + Path(sphinx_path), + run_path, + out_path, + project, + author, + version, + Path(report_to), + )