Skip to content
Snippets Groups Projects
calibrate.py 29.15 KiB
#!/usr/bin/env python

import json
import locale
import math
import os
import re
import shlex
import shutil
import stat
import sys
import textwrap
import warnings
from datetime import datetime
from pathlib import Path
from subprocess import DEVNULL, call, check_call, check_output
from typing import List, Union

import nbformat
import numpy as np
import yaml
from jinja2 import Template
from nbparameterise import extract_parameters, parameter_values, replace_definitions

import cal_tools.tools

from .finalize import tex_escape
from .nb_args import (
    consolize_name,
    first_markdown_cell,
    get_notebook_function,
    parse_argv_and_load_nb,
    set_figure_format,
)
from .settings import (
    default_report_path,
    finalize_time_limit,
    launcher_command,
    python_path,
    sprof,
    try_report_to_output,
)

PKG_DIR = os.path.dirname(os.path.abspath(__file__))


def extract_title_author(nb):
    """ Tries to extract title, author from markdown.

    The version is taken from git.
    """

    first_md = first_markdown_cell(nb)
    if first_md is None:
        return None, None
    source = first_md["source"]
    title = re.findall(r'#+\s*(.*)\s*#+', source)
    author = re.findall(
        r'author[\s]*[:][\s]*(.*?)\s*(?:[,?]|version)', source, flags=re.IGNORECASE)

    title = title[0] if len(title) else None
    author = author[0] if len(author) else None
    return title, author


def get_pycalib_version():
    # Try to get version from the git
    # Will work only in case of the development installation
    # Suppress output errors
    # In case of a standard installation a version is stored
    # in the _version.py file
    try:
        git_dir = os.path.join(PKG_DIR, '..', '..', '.git')
        version = check_output([
            'git', f'--git-dir={git_dir}', 'describe', '--tag',
        ], stderr=DEVNULL).decode('utf8')
        version = version.replace("\n", "")
    except:
        from .VERSION import __version__
        version = __version__
    return version


def get_python_version(python_exe):
    if python_exe == sys.executable:
        return '{}.{}.{}'.format(*sys.version_info)

    return check_output([python_exe, '--version']).decode('utf-8').split()[1]


def balance_sequences(in_folder: str, run: int, sequences: List[int],
                      sequences_per_node: int, karabo_da: Union[list, str],
                      max_nodes: int = 8):
    """Return balance list of sequences to be executed on slurm nodes
    Total list of sequences is splitted onto several nodes based on
    sequences_per_node. If the number of the required nodes is more than
    the max_nodes, the number of sequences_per_node is adjusted.

    :param in_folder: Path to the input raw data without the run number.
    :param run: Run number.
    :param sequences: List of sequences. [-1] for obtaining all.
    :param sequences_per_node: Number of sequences to process per a node.
    :param karabo_da: Karabo data aggregator used as data file inset.
    :param max_nodes: Maximum number of maxwell nodes to use.
    :return: Balanced list of sequences.
    """
    # TODO: some small detector notebooks have karabo_da as a list.
    # remove this str check after unifying the expected type across
    # correction notebooks.
    if isinstance(karabo_da, str):
        karabo_da = [karabo_da]
    elif not isinstance(karabo_da, list):
        raise TypeError("Balance sequences expects `karabo_da` as a string or list.")

    in_path = Path(in_folder, f"r{run:04d}")

    # TODO: remove ["-1"] after karabo_da refactor
    if karabo_da in [["-1"], ["all"]]:
        karabo_da = [""]

    # Get all possible sequences for the selected karabo_da
    sequence_files = []
    for k_da in karabo_da:
        sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5"))

    # Extract sequences from input files.
    seq_nums = {int(sf.stem[-5:]) for sf in sequence_files}

    # Validate selected sequences with sequences in in_folder
    if sequences != [-1]:
        seq_nums = sorted(seq_nums.intersection(sequences))
        if len(seq_nums) == 0:
            raise ValueError(
                f"Selected sequences {sequences} are not "
                f"available in {in_path}"
            )

    # Validate required nodes with max_nodes
    nsplits = len(seq_nums) // sequences_per_node
    if nsplits > max_nodes:
        sequences_per_node = math.ceil(len(seq_nums)/max_nodes)
        nsplits = max_nodes
        print(f"Changed to {sequences_per_node} sequences per node")
        print(f"to have a maximum of {max_nodes} concurrent jobs")
    elif nsplits == 0:
        nsplits = 1

    return [l.tolist() for l in np.array_split(list(seq_nums), nsplits)
            if l.size > 0]


def get_par_attr(parms, key, attr, default=None):
    """
    Return the type of parameter with name key
    :param parms: List of parameters
    :param key: Name of the parameter to be considered
    :param attr: Name of the parameter attribute (e.g. value, type)
    :param default: Type to be returned if interested name is not found
    :return: The attribute of the parameter
    """
    for p in parms:
        if p.name == key:
            return getattr(p, attr, default)
    return default


def flatten_list(l):
    """
    Make a string representation of a list
    :param l: List or a string
    :return: Same string or string with first and last entry of a list
    """
    if not isinstance(l, list):
        return str(l)
    if len(l) > 1:
        return '{}-{}'.format(l[0], l[-1])
    elif len(l) == 1:
        return '{}'.format(l[0])
    else:
        return ''


def create_finalize_script(fmt_args, cal_work_dir, job_list) -> str:
    """
    Create a finalize script to produce output report
    :param fmt_args: Dictionary of fmt arguments
    :param cal_work_dir: Path to temporary folder to run slurm job
    :param job_list: List of slurm jobs
    :return: The path of the created script
    """
    tmpl = Template("""\
                    #!/usr/bin/env python3
                    import os
                    from xfel_calibrate.finalize import finalize

                    finalize(joblist={{joblist}},
                             finaljob=os.environ.get('SLURM_JOB_ID', ''),
                             cal_work_dir='{{cal_work_dir}}',
                             out_path='{{out_path}}',
                             version='{{version}}',
                             title='{{title}}',
                             author='{{author}}',
                             report_to='{{report_to}}',
                             data_path='{{in_folder}}',
                             request_time='{{request_time}}',
                             submission_time='{{submission_time}}')

                    """)

    fmt_args['joblist'] = job_list
    f_name = os.path.join(cal_work_dir, "finalize.py")
    with open(f_name, "w") as finfile:
        finfile.write(textwrap.dedent(tmpl.render(**fmt_args)))

    # change rights of the file to be:
    # executed and writable for user, readable for user, group and others
    all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
    os.chmod(f_name, all_stats)
    return f_name


def run_finalize(
    fmt_args, cal_work_dir, job_list, sequential=False, partition=None):
    if partition is None:
        partition = "exfel"
    finalize_script = create_finalize_script(fmt_args, cal_work_dir, job_list)

    cmd = []
    if not sequential:
        cmd = [
            'sbatch',
            '--parsable',
            '--requeue',
            '--output', f'{cal_work_dir}/slurm-%j.out',
            '--open-mode=append',  # So we can see if it's preempted & requeued
            '--job-name', 'xfel-cal-finalize',
            '--time', finalize_time_limit,
            '--partition', partition,
            "--dependency=afterany:" + ":".join(str(j) for j in job_list),
        ]
        print(" ".join(cmd))

    cmd += [
        os.path.join(PKG_DIR, "bin", "slurm_finalize.sh"),  # path to helper sh
        sys.executable,  # Python with calibration machinery installed
        cal_work_dir,
        finalize_script,
        fmt_args['report_to']
    ]

    output = check_output(cmd, input=b'').decode('utf8')
    jobid = None
    if not sequential:
        jobid = output.partition(';')[0].strip()
        print("Submitted finalize job: {}".format(jobid))
    return jobid


class SlurmOptions:
    def __init__(
            self, job_name=None, nice=None, mem=None, partition=None, reservation=None,
    ):
        self.job_name = job_name or 'xfel_calibrate'
        self.nice = nice
        self.mem = mem
        self.partition = partition
        self.reservation = reservation

    def get_partition_or_reservation(self) -> List[str]:
        """Return sbatch arguments to use a partition or reservation

        --reservation and --slurm-partition options have precedence.
        Otherwise, a default partition is used.
        """
        if self.reservation:
            return ['--reservation', self.reservation]
        return ['--partition', self.partition or sprof]

    def get_launcher_command(self, log_dir, after_ok=(), after_any=()) -> List[str]:
        """
        Return a slurm launcher command
        :param log_dir: Where Slurm .out log files should go
        :param after_ok: A list of jobs which must succeed first
        :param after_any: A list of jobs which must finish first, but may fail
        :return: List of commands and parameters to be used by subprocess
        """

        launcher_slurm = launcher_command.format(temp_path=log_dir).split()

        launcher_slurm += self.get_partition_or_reservation()

        launcher_slurm += ["--job-name", self.job_name]

        if self.nice:
            launcher_slurm.append(f"--nice={self.nice}")

        if self.mem:
            launcher_slurm.append(f"--mem={self.mem}G")

        deps = []
        if after_ok:
            deps.append("afterok:" + ":".join(str(j) for j in after_ok))
        if after_any:
            deps.append("afterany:" + ":".join(str(j) for j in after_any))
        if deps:
            launcher_slurm.append("--dependency=" + ",".join(deps))
        return launcher_slurm


def remove_duplications(l) -> list:
    """
    Remove duplicated elements in the list

    :param l: Input list
    :return: Output list of unique elements
    """
    unique_l = []
    for elem in l:
        if elem not in unique_l:
            unique_l.append(elem)
    if unique_l != l:
        print("Duplicated concurrency parameters were removed")
    return unique_l


def prepare_job(
    cal_work_dir: Path, nb, nb_path: Path, args: dict, cparm=None, cval=None,
    cluster_cores=8, show_title=True,
) -> 'JobArgs':
    if cparm is not None:
        args[cparm] = cval

    suffix = flatten_list(cval)

    # first convert the notebook
    parms = extract_parameters(nb, lang='python')

    if any(p.name == "cluster_profile" for p in parms):
        cluster_profile = f"{args['cluster_profile']}_{suffix}"
    else:
        # Don't start ipcluster if there's no cluster_profile parameter
        cluster_profile = 'NO_CLUSTER'

    params = parameter_values(parms, **args)
    params = parameter_values(
        params, cluster_profile=cluster_profile, metadata_folder="."
    )
    new_nb = replace_definitions(nb, params, execute=False, lang='python')
    if not show_title:
        title_cell = first_markdown_cell(new_nb)
        if title_cell is not None:
            title_cell.source = ''
    set_figure_format(new_nb, args["vector_figs"])
    # In some cases a suffix for example can have `/`. e.g. LPDMini and GH2
    # have karabo_da with a `/`.
    new_name = f"{nb_path.stem}__{cparm}__{suffix}.ipynb".replace("/", "-")

    nbformat.write(new_nb, cal_work_dir / new_name)

    return JobArgs([
        "./pycalib-run-nb.sh",  # ./ allows this to run directly
        new_name,
        "{python}",
        cluster_profile,
        os.path.splitext(new_name)[0].upper(),
        args["detector"].upper(),
        args["type"].upper(),
        str(cluster_cores),
    ])


class JobArgs:
    """Command line arguments for running one calibration job"""
    def __init__(self, args: List[str]):
        self.args = args

    def __repr__(self):
        return f"JobArgs({self.args})"

    def __eq__(self, other):
        return isinstance(other, JobArgs) and (self.args == other.args)

    def format_cmd(self, python):
        return [a.format(python=python) for a in self.args]

    def run_direct(self, work_dir, python) -> int:
        """Run this job in a local process, return exit status"""
        return call(self.format_cmd(python), cwd=work_dir)

    def submit_job(
            self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None
    ):
        """Submit this job to Slurm, return its job ID"""
        cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
        cmd += self.format_cmd(python)
        # sbatch propagates environment variables into the job by default
        output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8')
        return output.partition(';')[0].strip()  # job ID


class Step:
    """A group of jobs which may run in parallel

    If after_error is True, this step should run even if previous steps failed.
    Otherwise, it will only run if the previous steps succeed.
    """
    def __init__(self, jobs: List[JobArgs], after_error=False):
        self.jobs = jobs
        self.after_error = after_error

    def to_dict(self):
        return {
            'jobs': [j.args for j in self.jobs],
            'after_error': self.after_error,
        }

    @classmethod
    def from_dict(cls, d):
        return cls([JobArgs(args) for args in d['jobs']], d['after_error'])


class JobChain:
    """A collection of jobs to run for one call to xfel-calibrate

    This is a chain of steps, each of which may contain several jobs.
    It also holds the work directory, where the parameterised notebooks are
    saved, and the path to the Python interpreter to run the notebooks.
    """
    def __init__(self, steps: List[Step], work_dir: Path, python):
        self.steps = [s for s in steps if s.jobs]
        self.work_dir = work_dir
        self.python = python

    @classmethod
    def from_dir(cls, work_dir: Path, python):
        """Load a JobChain from a work directory containing steps.json"""
        d = json.loads((work_dir / 'steps.json').read_text('utf-8'))
        steps = [Step.from_dict(sd) for sd in d['steps']]
        return cls(steps, work_dir, python)

    def save(self):
        """Save the steps of this chain to steps.json in the work directory"""
        with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f:
            json.dump({
                'steps': [step.to_dict() for step in self.steps]
            }, f, indent=2)

    def submit_jobs(self, slurm_opts: SlurmOptions, env=None):
        """Submit these jobs to Slurm, return a list of job IDs

        Slurm dependencies are used to manage the sequence of jobs.
        """
        print("Submitting jobs with Slurm options:")
        print(" ".join(slurm_opts.get_launcher_command(self.work_dir)))
        all_job_ids = []
        dep_job_ids = ()  # Replaced after each step
        for step in self.steps:
            step_job_ids = []
            kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
            for job_desc in step.jobs:
                jid = job_desc.submit_job(
                    self.work_dir, self.python, slurm_opts, env=env, **kw
                )
                step_job_ids.append(jid)
            dep_job_ids = step_job_ids
            all_job_ids.extend(step_job_ids)
        return all_job_ids

    def run_direct(self) -> bool:
        """Run these jobs in local processes, return True if any failed"""
        errors = False
        for i, step in enumerate(self.steps):
            if errors and not step.after_error:
                print(f"Not running step {i}, previous step failed")
                continue

            exit_codes = [
                j.run_direct(self.work_dir, self.python) for j in step.jobs
            ]
            if any(ec != 0 for ec in exit_codes):
                errors = True

        return errors


def make_par_table(parms):
    """
    Create a RST table with input parameters of the notebook

    :param parms: parameters of the notebook
    """

    # Add space in long strings without line breakers ` ,-/` to
    # wrap them in latex
    def split_len(seq, length):
        """
        Splits a sequence into smaller segments of a specified length,
        concatenates them, and adds line-breaking characters
        to ensure proper line breaks in LaTeX.

        Args:
            seq (str): The sequence to be split.
            length (int): The desired length of each segment.

        Returns:
            str: The concatenated line with line-breaking characters.

        Examples:
            >>> split_len("slurm_prof_230711_095647.832671_0", 10)
            'slurm\_prof\_230711\\-\_095647.832671\_0\-'
        """
        lbc = set(' ,-/')
        line = ''
        for i in range(0, len(seq), length):
            sub_line = seq[i:i + length]
            line += sub_line.replace('/', '/\-')
            if not any(c in lbc for c in sub_line):
                line += '\-'
        return line

    # Prepare strings and estimate their length
    l_parms = []
    len_parms = [0, 0]
    max_len = [20, 20]
    for p in parms:
        name = p.name.replace('_', '-')
        if len(name) > max_len[0]:
            len_parms[0] = max_len[0]
            name = split_len(name, max_len[0])

        value = str(p.value)
        if len(value) > max_len[1]:
            len_parms[1] = max_len[1]
            value = split_len(value, max_len[1])
        value = tex_escape(value)
        if issubclass(p.type, str):
            value = "``{}''".format(value)
        comment = tex_escape(str(p.comment)[1:])
        l_parms.append([name, value, comment])

    # Fix column width is needed
    col_type = ['l', 'c', 'p{.3\\textwidth}']
    if len_parms[0] == max_len[0]:
        col_type[0] = col_type[2]
    if len_parms[1] == max_len[1]:
        col_type[1] = col_type[2]

    tmpl = Template('''
                    Input of the calibration pipeline
                    =================================

                    .. raw:: latex

                        \\begin{longtable}{ {% for k in p %}{{k}}{%- endfor %} }
                        \hline
                        {% for k in lines %}
                        {{ k[0] }} & {{ k[1] }} & {{ k[2] }} \\\\
                        {%- endfor %}
                        \hline
                        \end{longtable}
                    ''')

    return textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))


def run(argv=None):
    """ Run a calibration task with parser arguments """
    # Ensure files are opened as UTF-8 by default, regardless of environment.
    if "readthedocs.org" not in sys.executable:
        locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))

    if argv is None:
        argv = sys.argv

    args, nb_details = parse_argv_and_load_nb(argv)

    concurrency = nb_details.concurrency
    concurrency_par = args["concurrency_par"] or concurrency['parameter']
    if concurrency_par == concurrency['parameter']:
        # Use the defaults from notebook.py to split the work into several jobs
        concurrency_defval = concurrency.get('default concurrency', None)
        concurrency_func = concurrency.get('use function', None)
    else:
        # --concurrency-par specified something different from notebook.py:
        # don't use the associated settings from there.
        concurrency_defval = concurrency_func = None

    notebook_path = nb_details.path
    nb = nb_details.contents

    title, author = extract_title_author(nb)
    version = get_pycalib_version()

    if not title:
        title = f"{nb_details.detector} {nb_details.caltype} Calibration"
    if not author:
        author = "anonymous"
    if not version:
        version = ""

    title = title.rstrip()

    # request_time is in local timezone
    if args["request_time"] == "Now":
        request_time = datetime.now()
    else:
        request_time = datetime.fromisoformat(args["request_time"])

    # check if concurrency parameter is given and we run concurrently
    if concurrency_par is not None and not any(
            p.name == concurrency_par for p in nb_details.default_params
    ):
        msg = f"Notebook cannot be run concurrently: no {concurrency_par} parameter"
        warnings.warn(msg, RuntimeWarning)

    # If not explicitly specified, use a new profile for ipcluster
    default_params_by_name = {p.name: p.value for p in nb_details.default_params}
    if 'cluster_profile' in default_params_by_name:
        if args.get("cluster_profile") == default_params_by_name["cluster_profile"]:
            args['cluster_profile'] = f"slurm_prof_{request_time:%y%m%d_%H%M%S.%f}"

    # wait on all jobs to run and then finalize the run by creating a report from the notebooks
    out_path = Path(default_report_path) / nb_details.detector / nb_details.caltype / datetime.now().isoformat()
    if try_report_to_output:
        if "out_folder" in args:
            out_path = Path(args["out_folder"]).absolute()
        else:
            print(f"No 'out_folder' given, outputting to '{out_path}' instead.")

    out_path.mkdir(parents=True, exist_ok=True)

    # Use given report name, or automatic unique name if not specified
    det_name = args.get('karabo_id', nb_details.detector)
    unique_name = f"{det_name}-{nb_details.caltype}-{request_time:%y%m%d_%H%M%S.%f}"
    if args['skip_report']:
        report_to = ''
    elif args["report_to"] is None:
        report_to = out_path / f"{unique_name}.pdf"
        print(f"report_to not specified, will use {report_to}")
    else:
        report_to = Path(args["report_to"]).with_suffix('.pdf').absolute()

    if report_to:
        # Work dir matching report file but without .pdf
        cal_work_dir = report_to.with_suffix('')
    else:
        cal_work_dir = out_path / unique_name
    cal_work_dir.mkdir(parents=True)

    # Write all input parameters to rst file to be included to final report
    parms = parameter_values(nb_details.default_params, **args)
    (cal_work_dir / "InputParameters.rst").write_text(make_par_table(parms))
    # And save the invocation of this script itself
    (cal_work_dir / "run_calibrate.sh").write_text(
        f'# pycalibration version: {version}\n' + shlex.join(argv)
    )

    # Copy the bash script which will be used to run notebooks
    shutil.copy2(
        os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"),
        cal_work_dir / "pycalib-run-nb.sh"
    )

    if nb_details.user_venv:
        print("Using specified venv:", nb_details.user_venv)
        python_exe = str(nb_details.user_venv / 'bin' / 'python')
    else:
        python_exe = python_path

    # Write metadata about calibration job to output folder
    metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir, new=True)

    parm_subdict = metadata.setdefault("calibration-configurations", {})
    for p in parms:
        name = consolize_name(p.name)
        parm_subdict[name] = p.value

    metadata["pycalibration-version"] = version
    metadata["report-path"] = str(report_to) if report_to \
        else '# REPORT SKIPPED #'
    metadata['reproducible'] = not args['not_reproducible']
    metadata["concurrency"] = {
        'parameter': concurrency_par,
        'default': concurrency_defval,
        'function': concurrency_func,
    }
    metadata["notebook"] = {
        'title': title,
        'author': author,
    }
    metadata['python-environment'] = {
        'path': python_exe,
        'python-version': get_python_version(python_exe),
    }
    if args["constants_from"]:
        with open(args["constants_from"], "r", encoding='utf-8') as f:
            d = yaml.safe_load(f)
        metadata["retrieved-constants"] = d["retrieved-constants"]
    metadata.save()

    # Record installed Python packages for reproducing the environment
    if not args['skip_env_freeze']:
        with (cal_work_dir / 'requirements.txt').open('wb') as f:
            check_call([python_exe, '-m', 'pip', 'freeze'], stdout=f)

    folder = get_par_attr(parms, 'in_folder', 'value', '')

    pre_jobs = []
    cluster_cores = concurrency.get("cluster cores", 8)
    # Check if there are pre-notebooks
    for pre_notebook_path in nb_details.pre_paths:
        lead_nb = nbformat.read(pre_notebook_path, as_version=4)
        pre_jobs.append(prepare_job(
            cal_work_dir, lead_nb, pre_notebook_path, args,
            cluster_cores=cluster_cores
        ))

    main_jobs = []
    if concurrency_par is None:
        main_jobs.append(prepare_job(
            cal_work_dir, nb,
            notebook_path, args,
            cluster_cores=cluster_cores,
        ))
    else:
        cvals = args.get(concurrency_par, None)

        # Consider [-1] as None
        if (cvals is None or cvals == [-1]) and concurrency_defval is not None:
            print(f"Concurrency parameter '{concurrency_par}' "
                  f"is taken from notebooks.py")
            cvals = concurrency_defval if isinstance(concurrency_defval, (list, tuple)) else range(concurrency_defval)

        if cvals is None:
            defcval = get_par_attr(parms, concurrency_par, 'value')
            if defcval is not None:
                print(f"Concurrency parameter '{concurrency_par}' "
                      f"is taken from '{notebook_path}'")
                cvals = defcval if isinstance(defcval, (list, tuple)) else [defcval]

        if concurrency_func:
            func = get_notebook_function(nb, concurrency_func)
            if func is None:
                warnings.warn(
                    f"Didn't find concurrency function {concurrency_func} in notebook",
                    RuntimeWarning
                )
            else:
                df = {}
                exec(func, df)
                f = df[concurrency_func]
                import inspect
                sig = inspect.signature(f)
                if cvals:
                    # in case default needs to be used for function call
                    args[concurrency_par] = cvals
                callargs = [args[arg] for arg in sig.parameters]
                cvals = f(*callargs)
                print(f"Split concurrency into {cvals}")

        if cvals is None:
            raise ValueError(
                f"No values found for {concurrency_par} (concurrency parameter)"
            )

        # get expected type
        cvtype = get_par_attr(parms, concurrency_par, 'type', list)
        cvals = remove_duplications(cvals)

        for cnum, cval in enumerate(cvals):
            show_title = cnum == 0
            cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval

            main_jobs.append(prepare_job(
                cal_work_dir, nb, notebook_path, args,
                concurrency_par, cval,
                cluster_cores=cluster_cores,
                show_title=show_title,
            ))

    # Prepare dependent notebooks (e.g. summaries after correction)
    dep_jobs = []
    for i, dep_notebook_path in enumerate(nb_details.dep_paths):
        dep_nb = nbformat.read(dep_notebook_path, as_version=4)
        dep_jobs.append(prepare_job(
            cal_work_dir, dep_nb, dep_notebook_path, args,
            cluster_cores=cluster_cores,
        ))

    job_chain = JobChain([
        Step(pre_jobs),
        Step(main_jobs),
        Step(dep_jobs, after_error=True)
    ], Path(cal_work_dir), python_exe)

    # Save information about jobs for reproducibility
    job_chain.save()

    if args['prepare_only']:
        print("Files prepared, not executing now (--prepare-only option).")
        print("To execute the notebooks, run:")
        rpt_opts = ''
        if nb_details.user_venv is not None:
            rpt_opts = f'--python {python_exe}'
        print(f"  python -m xfel_calibrate.repeat {cal_work_dir} {rpt_opts}")
        return

    print("Calibration work directory (including Slurm .out files):")
    print(" ", cal_work_dir)

    submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')

    # Launch the calibration work
    if args["no_cluster_job"]:
        print("Running notebooks directly, not via Slurm...")
        errors = job_chain.run_direct()
        joblist = []
    else:
        print("Submitting jobs to Slurm...")

        joblist = job_chain.submit_jobs(SlurmOptions(
            job_name=args.get('slurm_name', 'xfel_calibrate'),
            nice=args['slurm_scheduling'],
            mem=args['slurm_mem'],
            reservation=args['reservation'],
            partition=args['slurm_partition'],
        ))
        errors = False

    fmt_args = {'cal_work_dir': cal_work_dir,
                'out_path': out_path,
                'version': version,
                'title': title,
                'author': author,
                'report_to': report_to,
                'in_folder': folder,
                'request_time': request_time.strftime("%Y-%m-%dT%H:%M:%S"),
                'submission_time': submission_time,
                }

    joblist.append(run_finalize(
        fmt_args=fmt_args,
        cal_work_dir=cal_work_dir,
        job_list=joblist,
        sequential=args["no_cluster_job"],
        partition=args["slurm_partition"] or "exfel",
    ))

    if any(j is not None for j in joblist):
        print("Submitted the following SLURM jobs: {}".format(",".join(joblist)))

    return int(errors)

if __name__ == "__main__":
    sys.exit(run())