Skip to content
Snippets Groups Projects
finalize.py 15.9 KiB
Newer Older
Cyril Danilevski's avatar
Cyril Danilevski committed
import re
Thomas Kluyver's avatar
Thomas Kluyver committed
import sys
from datetime import datetime, timezone
from importlib.machinery import SourceFileLoader
from os import chdir, listdir, path
from pathlib import Path
from shutil import copy, copytree, move, rmtree
from subprocess import CalledProcessError, check_call, check_output
from tempfile import TemporaryDirectory
from textwrap import dedent
from typing import Dict, List

import tabulate
from jinja2 import Template

import cal_tools.tools

from .settings import logo_path
Cyril Danilevski's avatar
Cyril Danilevski committed

def atoi(text):
    """
    Convert string to integer is possible

    :param text: string to be converted
    :return: integer value or input string
    """

    return int(text) if text.isdigit() else text


def natural_keys(text):
    """
    Decompose string to list of integers and sub-strings
    """
    return [atoi(c) for c in re.split(r'(\d+)', text)]


def combine_report(cal_work_dir):
    sphinx_path = Path(cal_work_dir, "sphinx_rep").resolve()
    # if the finalize job was preempted or requeued,
    # while building the report.
    sphinx_path.mkdir(parents=True)
    direntries.sort(key=natural_keys)

    for entry in direntries:
        if entry.suffix == '.rst' and entry.is_file():
            comps = entry.stem.split("__")
            if len(comps) >= 3:
                group, name_param, conc_param = "_".join(comps[:-2]), \
                                                comps[-2], comps[-1]
            else:
                group, name_param, conc_param = comps[0], "None", "None"

            rst_content = entry.read_text('utf-8')

            # If this is part of a group (same notebook run on different
            # sequences or modules), replace its title so each section has
            # a meaningful heading:
            if conc_param != 'None':
                # Match a title at the start of the file with === underline
                m = re.match(r"\n*(.{3,})\n={3,}\n\n", rst_content)
                if m:
                    old_title = m.group(1)
                    _, title_end = m.span()
                    rst_content = rst_content[title_end:]
                    old_title = 'Processing'  # Generic fallback
                title = f"{old_title}, {name_param} = {conc_param}"
                rst_content = '\n'.join([
                    title, '=' * len(title), '', rst_content
                ])

            with (sphinx_path / f"{group}.rst").open("a", encoding="utf-8") as f:
                f.write(rst_content)
                f.write("\n\n")

        elif entry.is_dir():
            copytree(entry, sphinx_path / entry.name)
def prepare_plots(cal_work_dir: Path, threshold=1_000_000):
    """
    Convert svg file to pdf or png to be used for latex

    Conversion of svg to vector graphics pdf is performed using svglib3.
    This procedure is CPU consuming. In order to speed up process
    large svg files are converted to png format.

    The links in the rst files are adapted accordingly to the
    converted image files.

    :param cal_work_dir: Run path of the slurm job
    :param threshold: Max svg file size (in bytes) to be converted to pdf
    """
    print('Convert svg to pdf and png')

    for rst_file in cal_work_dir.glob("*.rst"):
        for f_path in (cal_work_dir / f'{rst_file.stem}_files').glob('*.svg'):
            if f_path.stat().st_size < threshold:
                check_call(["svg2pdf", str(f_path)], shell=False)
                check_call(["convert", str(f_path),
                            str(f_path.with_suffix('.png'))], shell=False)
            check_call([
                "sed", "-i", f"s/{f_path.name}/{f_path.stem}.{new_ext}/g", rst_file
            ], shell=False)
def get_job_info(jobs: List[str], fmt: List[str]) -> List[List[str]]:
    """Returns list of job information from sacct

    :param jobs: List of job names
    :param fmt: List of fields to query for each job (passed to sacct)
    Result ordered according to order of jobs given
    Order of fields in inner lists follows fmt
    """
    if not jobs:
        return []  # Skip calling sacct if not using Slurm

    # will use JobID to match results to jobs (duplicate field in fmt is OK)
    fmt_query = ",".join(["JobID"] + fmt)
    sacct_out = check_output(["sacct", "--truncate", "--parsable2", "--noheader",
                              f"--jobs={','.join(jobs)}",
                              f"--format={fmt_query}"])
    lines = sacct_out.decode().split("\n")

    missing_info = ["not-found"] * len(fmt)
    job_info = {job: missing_info for job in jobs}
    for line in lines:
        parts = line.split("|")
        if parts[0] in job_info:
            job_info[parts[0]] = parts[1:]

    return [job_info[job] for job in jobs]


def make_timing_summary(cal_work_dir: Path, job_times: List[List[str]],
                        job_time_fmt: List[str], pipeline_times: Dict[str, datetime]):
    Create an rst file with timing summary of executed notebooks
    :param cal_work_dir: Run path of the slurm job
    :param job_times: List of job information as returned by get_job_info
    :param job_time_fmt: List of headers to use for job_times
    :param pipeline_times: Dictionary of pipeline step -> timestamp
    """
    print('Prepare timing summary')

    tmpl = Template('''
                    Runtime summary
                    ===============
                    All timestamps are shown in local (Hamburg) time.

                        {% for line in time_table %}
                        {{ line }}
                        {%- endfor %}
                    ''')

    job_tbl_tmpl = Template('''
                    .. math::
                        {% for line in job_table %}
                        {{ line }}
                        {%- endfor %}
    time_vals = [
        [title, pipeline_times[k].astimezone(None).strftime("%Y-%m-%d %H:%M:%S")]
        for (title, k) in [
            ["Time of Request", "request-time"],
            ["Job submission", "submission-time"],
            ["Report compilation", "report-compilation-time"],
    ]]
    with (cal_work_dir / "timing_summary.rst").open("w+") as fd:
        time_table = tabulate.tabulate(time_vals, tablefmt="latex",
                                       headers=["Processing step", "Timestamp"])
        fd.write(dedent(tmpl.render(time_table=time_table.split("\n"))))

        if job_times:
            job_table = tabulate.tabulate(job_times, tablefmt="latex",
                                          headers=job_time_fmt)
            fd.write(dedent(job_tbl_tmpl.render(job_table=job_table.split("\n"))))
def make_report(run_path: Path, cal_work_dir: Path, project: str,
                author: str, version: str, report_to: Path):
    """
    Create calibration report (pdf file)

    Automatically generated report document results, produced by
    jupyter-notebooks.

    :param run_path: Path to sphinx run directory
    :param cal_work_dir: Run path of the slurm job
    :param project: Project title
    :param author: Author of the notebook
    :param version: Version of the notebook
    :param report_to: report path tailed with report name
    report_name = report_to.stem
    report_dir = report_to.parent
        check_call([sys.executable, "-m", "sphinx.cmd.quickstart",
                    "--quiet",
                    "--project='{}'".format(project),
                    "--author='{}'".format(author),
                    "-v", str(version),
                    "--suffix=.rst",
                    "--master=index",
                    "--ext-intersphinx",
                    "--ext-mathjax",
                    "--makefile",
                    "--no-batchfile", run_path])

    except CalledProcessError:
        raise Exception("Failed to run sphinx-quickstart. Is sphinx installed?"
                        "Generated simple index.rst instead")

    # quickbuild went well we need to edit the index.rst and conf.py files
    module_path = Path(__file__).absolute().parent
    conf_fn = run_path / "conf.py"
    tmp_conf_fn = run_path / "conf.py.tmp"
    conf = SourceFileLoader("conf", str(conf_fn)).load_module()
    l_var = [v for v in dir(conf) if not v.startswith('__')]

    with tmp_conf_fn.open("w") as mf:
        latex_elements = {'extraclassoptions': ',openany, oneside',
                          'preamble': r'\usepackage{longtable}',
                          'maketitle': r'\input{titlepage.tex.txt}'}
        mf.write("latex_elements = {}\n".format(latex_elements))
        mf.write("latex_logo = '{}/{}'\n".format(module_path,
                                                 logo_path))
        mf.write("latex_additional_files = ['titlepage.tex.txt']\n")

        # Set name of the .tex file & thus also the .pdf output
        # https://www.sphinx-doc.org/en/master/usage/configuration.html#confval-latex_documents
        latex_doc = ('index', f'{report_name}.tex', tex_escape(project),
                     tex_escape(author), 'manual', False)
        mf.write(f"latex_documents = [{latex_doc!r}]\n")

        for var in l_var:
            if var in ['latex_elements', 'latex_logo',
                       'latex_additional_files', 'latex_documents']:
            mf.write(f'{var} = {getattr(conf, var, None)!r}\n')
    conf_fn.unlink()
    move(str(tmp_conf_fn), str(conf_fn))
    direntries = list(run_path.iterdir())
    lead_rstfiles = ['InputParameters.rst', 'timing_summary.rst']

    # Order rst files based on the known order(lead_rstfiles).
    # TODO: fix order somewhere else instead of munging filenames
    def sort_key(f):
        if f.name in lead_rstfiles:
            return lead_rstfiles.index(f.name), f.name
        elif "summary" in f.name.lower():
            return len(lead_rstfiles), f.name
        elif "precorrection" in f.name.lower():
            return len(lead_rstfiles) + 1, f.name
            return len(lead_rstfiles) + 2, f.name
    direntries.sort(key=sort_key)
    files_to_handle = []
    for entry in direntries:
        if entry.is_file():
            name, ext = entry.stem, entry.suffix
            if ext == ".rst" and "index" not in name:
                files_to_handle.append(name.strip())

    index_tmp = Template('''
                        Calibration report
                        ==================
                        .. toctree::
                           :maxdepth: 2
                           {% for k in keys %}
                           {{ k }}
                           {%- endfor %}
                        ''')

    with (run_path / "index.rst").open("w+") as mf:
        mf.write(dedent(index_tmp.render(keys=files_to_handle)))

    # finally call the make scripts
    chdir(run_path)
    try:
        check_call(["make", f"SPHINXBUILD={sys.executable} -m sphinx",

    except CalledProcessError:
        print("Failed to make pdf documentation")
        print("Temp files will not be deleted and "
              f"can be inspected at: {run_path}")
    print(f"Moving report to final location: {report_dir}")
    report_dir.mkdir(parents=True, exist_ok=True)
    copy(run_path / "_build" / "latex" / f"{report_name}.pdf", report_dir)

    # Remove folders with figures and sphinx files.
    for tmp_subdir in cal_work_dir.iterdir():
        if tmp_subdir.is_dir():
            print(f"Removing temporary subdir: {tmp_subdir}")
            rmtree(tmp_subdir)
Karim Ahmed's avatar
Karim Ahmed committed


def make_titlepage(sphinx_path, project, data_path, version):
    """
    Create title page for report using template

    :param sphinx_path: path to sphinx run directory
    :param project: title of the project
    :param data_path: path to input data sample used for notebook
    :param version: Version of the pycalibration tool
    """
    module_path = "{}".format(path.abspath(path.dirname(__file__)))
    with open('{}/titlepage.tmpl'.format(module_path)) as file_:
        title_tmp = Template(file_.read())

    with open("{}/titlepage.tex.txt".format(sphinx_path), "w+") as mf:
        mf.write(dedent(title_tmp.render(project=tex_escape(project),
                                         data_path=tex_escape(data_path),
                                         version=tex_escape(version))))


def tex_escape(text):
    """
    Escape latex special characters found in the text

    :param text: a plain text message
    :return: the message escaped to appear correctly in LaTeX
    """
    conv = {
        '&': r'\&',
        '%': r'\%',
        '$': r'\$',
        '#': r'\#',
        '_': r'\_',
        '{': r'\{',
        '}': r'\}',
        '~': r'\textasciitilde{}',
        '^': r'\^{}',
        '\\': r'\textbackslash{}',
        '<': r'\textless{}',
        '>': r'\textgreater{}',
        '': r'\textemdash',
    }

    key_list = sorted(conv.keys(), key=lambda item: - len(item))
    regex = re.compile('|'.join(re.escape(str(key)) for key in key_list))
    return regex.sub(lambda match: conv[match.group()], text)


def finalize(joblist, finaljob, cal_work_dir, out_path, version, title, author, report_to, data_path='Unknown',
             request_time='', submission_time=''):
    # Archiving files in slurm_tmp
    if finaljob:
        joblist.append(str(finaljob))
    metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir)
    metadata.gather_fragments()
    job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',')
    job_time_summary = get_job_info(joblist, job_time_fmt)
    pipeline_time_summary = {
        "request-time": datetime.fromisoformat(request_time),
        "submission-time": datetime.fromisoformat(submission_time),
        "report-compilation-time": datetime.now(timezone.utc),
    make_timing_summary(cal_work_dir, job_time_summary, job_time_fmt, pipeline_time_summary)
    metadata.update(
        {
            "runtime-summary": {
                "calibration-jobs": [dict(zip(job_time_fmt, job_info))
                                     for job_info in job_time_summary],
                "pipeline-steps": pipeline_time_summary,
            }
        }
    )
    metadata.save()
        report_to = Path(report_to)
        sphinx_path = combine_report(cal_work_dir)
        make_titlepage(sphinx_path, title, data_path, version)
        make_report(
            Path(sphinx_path),
        det = metadata['calibration-configurations'].get('karabo-id', report_to.stem)
        try:
            det = metadata['calibration-configurations']['karabo-id']
        except KeyError:
            # Generate a hash based on job IDs to uniquely identify this
            # detector.
            from hashlib import sha1
            det = sha1(''.join(joblist).encode('ascii')).hexdigest()[:8]

    md_path = cal_work_dir / "calibration_metadata.yml"

    # Notebooks should have a karabo_id parameter, which we'll use to make a
    # unique name like calibration_metadata_MID_DET_AGIPD1M-1.yml in the output
    # folder. In case they don't, fall back to a name like the report.
    # To avoid interleaved writes, we'll copy it to a temp folder, then rename.
    with TemporaryDirectory(dir=out_path) as td:
        tmp_path = Path(td, f"calibration_metadata_{det}.yml")
        copy(md_path, tmp_path)
        tmp_path.replace(out_path / f"calibration_metadata_{det}.yml")

        # For continuity, we'll also expose it as $out/calibration_metadata.yml.
        # This is only useful if there's 1 calibration run per output folder,
        # otherwise the last one to create the file wins.
        tmp_path = Path(td, f"calibration_metadata.yml")
        copy(md_path, tmp_path)
        tmp_path.replace(out_path / "calibration_metadata.yml")