import re from datetime import datetime from glob import glob from importlib.machinery import SourceFileLoader from os import chdir, listdir, makedirs, path, stat from os.path import isdir, isfile, splitext from pathlib import Path from shutil import copy, copytree, move, rmtree from subprocess import CalledProcessError, check_call, check_output from textwrap import dedent from time import sleep from typing import Dict, List import tabulate from jinja2 import Template import cal_tools.tools from .settings import logo_path def atoi(text): """ Convert string to integer is possible :param text: string to be converted :return: integer value or input string """ return int(text) if text.isdigit() else text def natural_keys(text): """ Decompose string to list of integers and sub-strings """ return [atoi(c) for c in re.split(r'(\d+)', text)] def combine_report(run_path, calibration): sphinx_path = "{}/sphinx_rep".format(path.abspath(run_path)) makedirs(sphinx_path) direntries = listdir(run_path) direntries.sort(key=natural_keys) for entry in direntries: if isfile("{}/{}".format(run_path, entry)): name, ext = splitext("{}".format(entry)) if ext == ".rst": comps = name.split("__") if len(comps) >= 3: group, name_param, conc_param = "_".join(comps[:-2]), \ comps[-2], comps[-1] else: group, name_param, conc_param = comps[0], "None", "None" with open("{}/{}.rst".format(sphinx_path, group), "a") as gfile: if conc_param != "None": title = "{}, {} = {}".format(calibration, name_param, conc_param) gfile.write(title + "\n") gfile.write("=" * len(title) + "\n") gfile.write("\n") with open("{}/{}".format(run_path, entry), "r") as ifile: skip_next = False for line in ifile.readlines(): if skip_next: skip_next = False continue if conc_param == "None": gfile.write(line) elif " ".join(calibration.split()) != " ".join( line.split()): gfile.write(line) else: skip_next = True gfile.write("\n\n") if isdir("{}/{}".format(run_path, entry)): copytree("{}/{}".format(run_path, entry), "{}/{}".format(sphinx_path, entry)) return sphinx_path def prepare_plots(run_path, threshold=1000000): """ Convert svg file to pdf or png to be used for latex Conversion of svg to vector graphics pdf is performed using svglib3. This procedure is CPU consuming. In order to speed up process large svg files are converted to png format. The links in the rst files are adapted accordingly to the converted image files. :param run_path: Run path of the slurm job :param threshold: Max svg file size (in bytes) to be converted to pdf """ print('Convert svg to pdf and png') run_path = path.abspath(run_path) rst_files = glob('{}/*rst'.format(run_path)) for rst_file in rst_files: rst_file_name = path.basename(rst_file) rst_file_name = path.splitext(rst_file_name)[0] svg_files = glob( '{}/{}_files/*svg'.format(run_path, rst_file_name)) for f_path in svg_files: f_name = path.basename(f_path) f_name = path.splitext(f_name)[0] if (stat(f_path)).st_size < threshold: check_call(["svg2pdf", "{}".format(f_path)], shell=False) new_ext = 'pdf' else: check_call(["convert", "{}".format(f_path), "{}.png".format(f_name)], shell=False) new_ext = 'png' check_call(["sed", "-i", "s/{}.svg/{}.{}/g".format(f_name, f_name, new_ext), rst_file], shell=False) def get_job_info(jobs: List[str], fmt: List[str]) -> List[List[str]]: """Returns list of job information from sacct :param jobs: List of job names :param fmt: List of fields to query for each job (passed to sacct) Result ordered according to order of jobs given Order of fields in inner lists follows fmt """ # will use JobID to match results to jobs (duplicate field in fmt is OK) fmt_query = ",".join(["JobID"] + fmt) sacct_out = check_output(["sacct", "--truncate", "--parsable2", "--noheader", f"--jobs={','.join(jobs)}", f"--format={fmt_query}"]) lines = sacct_out.decode().split("\n") missing_info = ["not-found"] * len(fmt) job_info = {job: missing_info for job in jobs} for line in lines: parts = line.split("|") if parts[0] in job_info: job_info[parts[0]] = parts[1:] return [job_info[job] for job in jobs] def make_timing_summary(run_path: Path, job_times: List[List[str]], job_time_fmt: List[str], pipeline_times: Dict[str, str]): """ Create an rst file with timing summary of executed notebooks :param run_path: Run path of the slurm job :param job_times: List of job information as returned by get_job_info :param job_time_fmt: List of headers to use for job_times :param pipeline_times: Dictionary of pipeline step -> timestamp """ print('Prepare timing summary') tmpl = Template(''' Runtime summary =============== .. math:: {% for line in time_table %} {{ line }} {%- endfor %} .. math:: {% for line in job_table %} {{ line }} {%- endfor %} ''') time_vals = [ ["Time of Request", pipeline_times["request-time"]], ["Job submission", pipeline_times["submission-time"]], ["Report compilation", pipeline_times["report-compilation-time"]], ] with (run_path / "timing_summary.rst").open("w+") as fd: job_table = tabulate.tabulate(job_times, tablefmt="latex", headers=job_time_fmt) time_table = tabulate.tabulate(time_vals, tablefmt="latex", headers=["Processing step", "Timestamp"]) fd.write(dedent(tmpl.render(job_table=job_table.split("\n"), time_table=time_table.split("\n")))) def make_report(run_path: Path, tmp_path: Path, out_path: Path, project: str, author: str, version: str, report_to: Path): """ Create calibration report (pdf file) Automatically generated report document results, produced by jupyter-notebooks. :param run_path: Path to sphinx run directory :param tmp_path: Run path of the slurm job :param out_path: Output directory for report. Overwritten if path to report is given in `report_to` :param project: Project title :param author: Author of the notebook :param version: Version of the notebook :param report_to: report path tailed with report name """ report_name = report_to.stem report_dir = report_to.parent try: check_call([sys.executable, "-m", "sphinx.cmd.quickstart", "--quiet", "--project='{}'".format(project), "--author='{}'".format(author), "-v", str(version), "--suffix=.rst", "--master=index", "--ext-intersphinx", "--ext-mathjax", "--makefile", "--no-batchfile", run_path]) except CalledProcessError: raise Exception("Failed to run sphinx-quickstart. Is sphinx installed?" "Generated simple index.rst instead") # quickbuild went well we need to edit the index.rst and conf.py files module_path = Path(__file__).absolute().parent conf_fn = run_path / "conf.py" tmp_conf_fn = run_path / "conf.py.tmp" conf = SourceFileLoader("conf", str(conf_fn)).load_module() l_var = [v for v in dir(conf) if not v.startswith('__')] with tmp_conf_fn.open("w") as mf: latex_elements = {'extraclassoptions': ',openany, oneside', 'preamble': r'\usepackage{longtable}', 'maketitle': r'\input{titlepage.tex.txt}'} mf.write("latex_elements = {}\n".format(latex_elements)) mf.write("latex_logo = '{}/{}'\n".format(module_path, logo_path)) mf.write("latex_additional_files = ['titlepage.tex.txt']\n") for var in l_var: if var in ['latex_elements', 'latex_logo', 'latex_additional_files']: continue tmpl = '{} = {}\n' v = getattr(conf, var, None) if isinstance(v, str): tmpl = '{} = "{}"\n' # Set name of the latex document if var == 'latex_documents' and len(v[0]) > 1: v[0] = v[0][:1] + ('{}.tex'.format(report_name), ) + v[0][2:] mf.write(tmpl.format(var, v)) conf_fn.unlink() move(str(tmp_conf_fn), str(conf_fn)) direntries = list(run_path.iterdir()) lead_rstfiles = ['InputParameters.rst', 'timing_summary.rst'] # Order rst files based on the known order(lead_rstfiles). # TODO: fix order somewhere else instead of munging filenames def sort_key(f): if f.name in lead_rstfiles: return lead_rstfiles.index(f.name), f.name elif "summary" in f.name.lower(): return len(lead_rstfiles), f.name elif "precorrection" in f.name.lower(): return len(lead_rstfiles) + 1, f.name else: return len(lead_rstfiles) + 2, f.name direntries.sort(key=sort_key) files_to_handle = [] for entry in direntries: if entry.is_file(): name, ext = entry.stem, entry.suffix if ext == ".rst" and "index" not in name: files_to_handle.append(name.strip()) index_tmp = Template(''' Calibration report ================== .. toctree:: :maxdepth: 2 {% for k in keys %} {{ k }} {%- endfor %} ''') with (run_path / "index.rst").open("w+") as mf: mf.write(dedent(index_tmp.render(keys=files_to_handle))) # finally call the make scripts chdir(run_path) try: check_call(["make", f"SPHINXBUILD={sys.executable} -m sphinx", "latexpdf"]) except CalledProcessError: print("Failed to make pdf documentation") print("Temp files will not be deleted and " f"can be inspected at: {run_path}") return print(f"Moving report to final location: {report_dir}") report_dir.mkdir(parents=True, exist_ok=True) copy(run_path / "_build" / "latex" / f"{report_name}.pdf", report_dir) # Remove folders with figures and sphinx files. for tmp_subdir in tmp_path.iterdir(): if tmp_subdir.is_dir(): print(f"Removing temporary subdir: {tmp_subdir}") rmtree(tmp_subdir) # Moving temporary files to out-folder after successful execution # This helps in keeping elements needed for reproducibility. slurm_archive_dir = report_dir / f"slurm_out_{report_name}" print(f"Moving temporary files to final location: {slurm_archive_dir}") move(str(tmp_path), str(slurm_archive_dir)) def make_titlepage(sphinx_path, project, data_path, version): """ Create title page for report using template :param sphinx_path: path to sphinx run directory :param project: title of the project :param data_path: path to input data sample used for notebook :param version: Version of the pycalibration tool """ module_path = "{}".format(path.abspath(path.dirname(__file__))) with open('{}/titlepage.tmpl'.format(module_path)) as file_: title_tmp = Template(file_.read()) with open("{}/titlepage.tex.txt".format(sphinx_path), "w+") as mf: mf.write(dedent(title_tmp.render(project=tex_escape(project), data_path=tex_escape(data_path), version=tex_escape(version)))) def tex_escape(text): """ Escape latex special characters found in the text :param text: a plain text message :return: the message escaped to appear correctly in LaTeX """ conv = { '&': r'\&', '%': r'\%', '$': r'\$', '#': r'\#', '_': r'\_', '{': r'\{', '}': r'\}', '~': r'\textasciitilde{}', '^': r'\^{}', '\\': r'\textbackslash{}', '<': r'\textless{}', '>': r'\textgreater{}', } key_list = sorted(conv.keys(), key=lambda item: - len(item)) regex = re.compile('|'.join(re.escape(str(key)) for key in key_list)) return regex.sub(lambda match: conv[match.group()], text) def finalize(joblist, finaljob, run_path, out_path, project, calibration, author, version, report_to, data_path='Unknown', request_time='', submission_time=''): run_path = Path(run_path) out_path = Path(out_path) print("Waiting on jobs to finish: {}".format(joblist)) while True: found_jobs = set() output = check_output(['squeue']).decode('utf8') for line in output.split("\n"): for job in joblist: if str(job) in line: found_jobs.add(job) if len(found_jobs) == 0: break sleep(10) prepare_plots(run_path) # Archiving files in slurm_tmp if finaljob: joblist.append(str(finaljob)) metadata = cal_tools.tools.CalibrationMetadata(out_path) job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',') job_time_summary = get_job_info(joblist, job_time_fmt) pipeline_time_summary = { "request-time": request_time, "submission-time": submission_time, "report-compilation-time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), } make_timing_summary(run_path, job_time_summary, job_time_fmt, pipeline_time_summary) metadata.update( { "runtime-summary": { "calibration-jobs": [dict(zip(job_time_fmt, job_info)) for job_info in job_time_summary], "pipeline-steps": pipeline_time_summary, } } ) metadata.save() metadata.save_copy(run_path) sphinx_path = combine_report(run_path, calibration) make_titlepage(sphinx_path, project, data_path, version) make_report( Path(sphinx_path), run_path, out_path, project, author, version, Path(report_to), )