Something went wrong on our end
-
Karim Ahmed authoredKarim Ahmed authored
calibrate.py 29.15 KiB
#!/usr/bin/env python
import json
import locale
import math
import os
import re
import shlex
import shutil
import stat
import sys
import textwrap
import warnings
from datetime import datetime
from pathlib import Path
from subprocess import DEVNULL, call, check_call, check_output
from typing import List, Union
import nbformat
import numpy as np
import yaml
from jinja2 import Template
from nbparameterise import extract_parameters, parameter_values, replace_definitions
import cal_tools.tools
from .finalize import tex_escape
from .nb_args import (
consolize_name,
first_markdown_cell,
get_notebook_function,
parse_argv_and_load_nb,
set_figure_format,
)
from .settings import (
default_report_path,
finalize_time_limit,
launcher_command,
python_path,
sprof,
try_report_to_output,
)
PKG_DIR = os.path.dirname(os.path.abspath(__file__))
def extract_title_author(nb):
""" Tries to extract title, author from markdown.
The version is taken from git.
"""
first_md = first_markdown_cell(nb)
if first_md is None:
return None, None
source = first_md["source"]
title = re.findall(r'#+\s*(.*)\s*#+', source)
author = re.findall(
r'author[\s]*[:][\s]*(.*?)\s*(?:[,?]|version)', source, flags=re.IGNORECASE)
title = title[0] if len(title) else None
author = author[0] if len(author) else None
return title, author
def get_pycalib_version():
# Try to get version from the git
# Will work only in case of the development installation
# Suppress output errors
# In case of a standard installation a version is stored
# in the _version.py file
try:
git_dir = os.path.join(PKG_DIR, '..', '..', '.git')
version = check_output([
'git', f'--git-dir={git_dir}', 'describe', '--tag',
], stderr=DEVNULL).decode('utf8')
version = version.replace("\n", "")
except:
from .VERSION import __version__
version = __version__
return version
def get_python_version(python_exe):
if python_exe == sys.executable:
return '{}.{}.{}'.format(*sys.version_info)
return check_output([python_exe, '--version']).decode('utf-8').split()[1]
def balance_sequences(in_folder: str, run: int, sequences: List[int],
sequences_per_node: int, karabo_da: Union[list, str],
max_nodes: int = 8):
"""Return balance list of sequences to be executed on slurm nodes
Total list of sequences is splitted onto several nodes based on
sequences_per_node. If the number of the required nodes is more than
the max_nodes, the number of sequences_per_node is adjusted.
:param in_folder: Path to the input raw data without the run number.
:param run: Run number.
:param sequences: List of sequences. [-1] for obtaining all.
:param sequences_per_node: Number of sequences to process per a node.
:param karabo_da: Karabo data aggregator used as data file inset.
:param max_nodes: Maximum number of maxwell nodes to use.
:return: Balanced list of sequences.
"""
# TODO: some small detector notebooks have karabo_da as a list.
# remove this str check after unifying the expected type across
# correction notebooks.
if isinstance(karabo_da, str):
karabo_da = [karabo_da]
elif not isinstance(karabo_da, list):
raise TypeError("Balance sequences expects `karabo_da` as a string or list.")
in_path = Path(in_folder, f"r{run:04d}")
# TODO: remove ["-1"] after karabo_da refactor
if karabo_da in [["-1"], ["all"]]:
karabo_da = [""]
# Get all possible sequences for the selected karabo_da
sequence_files = []
for k_da in karabo_da:
sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5"))
# Extract sequences from input files.
seq_nums = {int(sf.stem[-5:]) for sf in sequence_files}
# Validate selected sequences with sequences in in_folder
if sequences != [-1]:
seq_nums = sorted(seq_nums.intersection(sequences))
if len(seq_nums) == 0:
raise ValueError(
f"Selected sequences {sequences} are not "
f"available in {in_path}"
)
# Validate required nodes with max_nodes
nsplits = len(seq_nums) // sequences_per_node
if nsplits > max_nodes:
sequences_per_node = math.ceil(len(seq_nums)/max_nodes)
nsplits = max_nodes
print(f"Changed to {sequences_per_node} sequences per node")
print(f"to have a maximum of {max_nodes} concurrent jobs")
elif nsplits == 0:
nsplits = 1
return [l.tolist() for l in np.array_split(list(seq_nums), nsplits)
if l.size > 0]
def get_par_attr(parms, key, attr, default=None):
"""
Return the type of parameter with name key
:param parms: List of parameters
:param key: Name of the parameter to be considered
:param attr: Name of the parameter attribute (e.g. value, type)
:param default: Type to be returned if interested name is not found
:return: The attribute of the parameter
"""
for p in parms:
if p.name == key:
return getattr(p, attr, default)
return default
def flatten_list(l):
"""
Make a string representation of a list
:param l: List or a string
:return: Same string or string with first and last entry of a list
"""
if not isinstance(l, list):
return str(l)
if len(l) > 1:
return '{}-{}'.format(l[0], l[-1])
elif len(l) == 1:
return '{}'.format(l[0])
else:
return ''
def create_finalize_script(fmt_args, cal_work_dir, job_list) -> str:
"""
Create a finalize script to produce output report
:param fmt_args: Dictionary of fmt arguments
:param cal_work_dir: Path to temporary folder to run slurm job
:param job_list: List of slurm jobs
:return: The path of the created script
"""
tmpl = Template("""\
#!/usr/bin/env python3
import os
from xfel_calibrate.finalize import finalize
finalize(joblist={{joblist}},
finaljob=os.environ.get('SLURM_JOB_ID', ''),
cal_work_dir='{{cal_work_dir}}',
out_path='{{out_path}}',
version='{{version}}',
title='{{title}}',
author='{{author}}',
report_to='{{report_to}}',
data_path='{{in_folder}}',
request_time='{{request_time}}',
submission_time='{{submission_time}}')
""")
fmt_args['joblist'] = job_list
f_name = os.path.join(cal_work_dir, "finalize.py")
with open(f_name, "w") as finfile:
finfile.write(textwrap.dedent(tmpl.render(**fmt_args)))
# change rights of the file to be:
# executed and writable for user, readable for user, group and others
all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
os.chmod(f_name, all_stats)
return f_name
def run_finalize(
fmt_args, cal_work_dir, job_list, sequential=False, partition=None):
if partition is None:
partition = "exfel"
finalize_script = create_finalize_script(fmt_args, cal_work_dir, job_list)
cmd = []
if not sequential:
cmd = [
'sbatch',
'--parsable',
'--requeue',
'--output', f'{cal_work_dir}/slurm-%j.out',
'--open-mode=append', # So we can see if it's preempted & requeued
'--job-name', 'xfel-cal-finalize',
'--time', finalize_time_limit,
'--partition', partition,
"--dependency=afterany:" + ":".join(str(j) for j in job_list),
]
print(" ".join(cmd))
cmd += [
os.path.join(PKG_DIR, "bin", "slurm_finalize.sh"), # path to helper sh
sys.executable, # Python with calibration machinery installed
cal_work_dir,
finalize_script,
fmt_args['report_to']
]
output = check_output(cmd, input=b'').decode('utf8')
jobid = None
if not sequential:
jobid = output.partition(';')[0].strip()
print("Submitted finalize job: {}".format(jobid))
return jobid
class SlurmOptions:
def __init__(
self, job_name=None, nice=None, mem=None, partition=None, reservation=None,
):
self.job_name = job_name or 'xfel_calibrate'
self.nice = nice
self.mem = mem
self.partition = partition
self.reservation = reservation
def get_partition_or_reservation(self) -> List[str]:
"""Return sbatch arguments to use a partition or reservation
--reservation and --slurm-partition options have precedence.
Otherwise, a default partition is used.
"""
if self.reservation:
return ['--reservation', self.reservation]
return ['--partition', self.partition or sprof]
def get_launcher_command(self, log_dir, after_ok=(), after_any=()) -> List[str]:
"""
Return a slurm launcher command
:param log_dir: Where Slurm .out log files should go
:param after_ok: A list of jobs which must succeed first
:param after_any: A list of jobs which must finish first, but may fail
:return: List of commands and parameters to be used by subprocess
"""
launcher_slurm = launcher_command.format(temp_path=log_dir).split()
launcher_slurm += self.get_partition_or_reservation()
launcher_slurm += ["--job-name", self.job_name]
if self.nice:
launcher_slurm.append(f"--nice={self.nice}")
if self.mem:
launcher_slurm.append(f"--mem={self.mem}G")
deps = []
if after_ok:
deps.append("afterok:" + ":".join(str(j) for j in after_ok))
if after_any:
deps.append("afterany:" + ":".join(str(j) for j in after_any))
if deps:
launcher_slurm.append("--dependency=" + ",".join(deps))
return launcher_slurm
def remove_duplications(l) -> list:
"""
Remove duplicated elements in the list
:param l: Input list
:return: Output list of unique elements
"""
unique_l = []
for elem in l:
if elem not in unique_l:
unique_l.append(elem)
if unique_l != l:
print("Duplicated concurrency parameters were removed")
return unique_l
def prepare_job(
cal_work_dir: Path, nb, nb_path: Path, args: dict, cparm=None, cval=None,
cluster_cores=8, show_title=True,
) -> 'JobArgs':
if cparm is not None:
args[cparm] = cval
suffix = flatten_list(cval)
# first convert the notebook
parms = extract_parameters(nb, lang='python')
if any(p.name == "cluster_profile" for p in parms):
cluster_profile = f"{args['cluster_profile']}_{suffix}"
else:
# Don't start ipcluster if there's no cluster_profile parameter
cluster_profile = 'NO_CLUSTER'
params = parameter_values(parms, **args)
params = parameter_values(
params, cluster_profile=cluster_profile, metadata_folder="."
)
new_nb = replace_definitions(nb, params, execute=False, lang='python')
if not show_title:
title_cell = first_markdown_cell(new_nb)
if title_cell is not None:
title_cell.source = ''
set_figure_format(new_nb, args["vector_figs"])
# In some cases a suffix for example can have `/`. e.g. LPDMini and GH2
# have karabo_da with a `/`.
new_name = f"{nb_path.stem}__{cparm}__{suffix}.ipynb".replace("/", "-")
nbformat.write(new_nb, cal_work_dir / new_name)
return JobArgs([
"./pycalib-run-nb.sh", # ./ allows this to run directly
new_name,
"{python}",
cluster_profile,
os.path.splitext(new_name)[0].upper(),
args["detector"].upper(),
args["type"].upper(),
str(cluster_cores),
])
class JobArgs:
"""Command line arguments for running one calibration job"""
def __init__(self, args: List[str]):
self.args = args
def __repr__(self):
return f"JobArgs({self.args})"
def __eq__(self, other):
return isinstance(other, JobArgs) and (self.args == other.args)
def format_cmd(self, python):
return [a.format(python=python) for a in self.args]
def run_direct(self, work_dir, python) -> int:
"""Run this job in a local process, return exit status"""
return call(self.format_cmd(python), cwd=work_dir)
def submit_job(
self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None
):
"""Submit this job to Slurm, return its job ID"""
cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any)
cmd += self.format_cmd(python)
# sbatch propagates environment variables into the job by default
output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8')
return output.partition(';')[0].strip() # job ID
class Step:
"""A group of jobs which may run in parallel
If after_error is True, this step should run even if previous steps failed.
Otherwise, it will only run if the previous steps succeed.
"""
def __init__(self, jobs: List[JobArgs], after_error=False):
self.jobs = jobs
self.after_error = after_error
def to_dict(self):
return {
'jobs': [j.args for j in self.jobs],
'after_error': self.after_error,
}
@classmethod
def from_dict(cls, d):
return cls([JobArgs(args) for args in d['jobs']], d['after_error'])
class JobChain:
"""A collection of jobs to run for one call to xfel-calibrate
This is a chain of steps, each of which may contain several jobs.
It also holds the work directory, where the parameterised notebooks are
saved, and the path to the Python interpreter to run the notebooks.
"""
def __init__(self, steps: List[Step], work_dir: Path, python):
self.steps = [s for s in steps if s.jobs]
self.work_dir = work_dir
self.python = python
@classmethod
def from_dir(cls, work_dir: Path, python):
"""Load a JobChain from a work directory containing steps.json"""
d = json.loads((work_dir / 'steps.json').read_text('utf-8'))
steps = [Step.from_dict(sd) for sd in d['steps']]
return cls(steps, work_dir, python)
def save(self):
"""Save the steps of this chain to steps.json in the work directory"""
with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f:
json.dump({
'steps': [step.to_dict() for step in self.steps]
}, f, indent=2)
def submit_jobs(self, slurm_opts: SlurmOptions, env=None):
"""Submit these jobs to Slurm, return a list of job IDs
Slurm dependencies are used to manage the sequence of jobs.
"""
print("Submitting jobs with Slurm options:")
print(" ".join(slurm_opts.get_launcher_command(self.work_dir)))
all_job_ids = []
dep_job_ids = () # Replaced after each step
for step in self.steps:
step_job_ids = []
kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids}
for job_desc in step.jobs:
jid = job_desc.submit_job(
self.work_dir, self.python, slurm_opts, env=env, **kw
)
step_job_ids.append(jid)
dep_job_ids = step_job_ids
all_job_ids.extend(step_job_ids)
return all_job_ids
def run_direct(self) -> bool:
"""Run these jobs in local processes, return True if any failed"""
errors = False
for i, step in enumerate(self.steps):
if errors and not step.after_error:
print(f"Not running step {i}, previous step failed")
continue
exit_codes = [
j.run_direct(self.work_dir, self.python) for j in step.jobs
]
if any(ec != 0 for ec in exit_codes):
errors = True
return errors
def make_par_table(parms):
"""
Create a RST table with input parameters of the notebook
:param parms: parameters of the notebook
"""
# Add space in long strings without line breakers ` ,-/` to
# wrap them in latex
def split_len(seq, length):
"""
Splits a sequence into smaller segments of a specified length,
concatenates them, and adds line-breaking characters
to ensure proper line breaks in LaTeX.
Args:
seq (str): The sequence to be split.
length (int): The desired length of each segment.
Returns:
str: The concatenated line with line-breaking characters.
Examples:
>>> split_len("slurm_prof_230711_095647.832671_0", 10)
'slurm\_prof\_230711\\-\_095647.832671\_0\-'
"""
lbc = set(' ,-/')
line = ''
for i in range(0, len(seq), length):
sub_line = seq[i:i + length]
line += sub_line.replace('/', '/\-')
if not any(c in lbc for c in sub_line):
line += '\-'
return line
# Prepare strings and estimate their length
l_parms = []
len_parms = [0, 0]
max_len = [20, 20]
for p in parms:
name = p.name.replace('_', '-')
if len(name) > max_len[0]:
len_parms[0] = max_len[0]
name = split_len(name, max_len[0])
value = str(p.value)
if len(value) > max_len[1]:
len_parms[1] = max_len[1]
value = split_len(value, max_len[1])
value = tex_escape(value)
if issubclass(p.type, str):
value = "``{}''".format(value)
comment = tex_escape(str(p.comment)[1:])
l_parms.append([name, value, comment])
# Fix column width is needed
col_type = ['l', 'c', 'p{.3\\textwidth}']
if len_parms[0] == max_len[0]:
col_type[0] = col_type[2]
if len_parms[1] == max_len[1]:
col_type[1] = col_type[2]
tmpl = Template('''
Input of the calibration pipeline
=================================
.. raw:: latex
\\begin{longtable}{ {% for k in p %}{{k}}{%- endfor %} }
\hline
{% for k in lines %}
{{ k[0] }} & {{ k[1] }} & {{ k[2] }} \\\\
{%- endfor %}
\hline
\end{longtable}
''')
return textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))
def run(argv=None):
""" Run a calibration task with parser arguments """
# Ensure files are opened as UTF-8 by default, regardless of environment.
if "readthedocs.org" not in sys.executable:
locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8'))
if argv is None:
argv = sys.argv
args, nb_details = parse_argv_and_load_nb(argv)
concurrency = nb_details.concurrency
concurrency_par = args["concurrency_par"] or concurrency['parameter']
if concurrency_par == concurrency['parameter']:
# Use the defaults from notebook.py to split the work into several jobs
concurrency_defval = concurrency.get('default concurrency', None)
concurrency_func = concurrency.get('use function', None)
else:
# --concurrency-par specified something different from notebook.py:
# don't use the associated settings from there.
concurrency_defval = concurrency_func = None
notebook_path = nb_details.path
nb = nb_details.contents
title, author = extract_title_author(nb)
version = get_pycalib_version()
if not title:
title = f"{nb_details.detector} {nb_details.caltype} Calibration"
if not author:
author = "anonymous"
if not version:
version = ""
title = title.rstrip()
# request_time is in local timezone
if args["request_time"] == "Now":
request_time = datetime.now()
else:
request_time = datetime.fromisoformat(args["request_time"])
# check if concurrency parameter is given and we run concurrently
if concurrency_par is not None and not any(
p.name == concurrency_par for p in nb_details.default_params
):
msg = f"Notebook cannot be run concurrently: no {concurrency_par} parameter"
warnings.warn(msg, RuntimeWarning)
# If not explicitly specified, use a new profile for ipcluster
default_params_by_name = {p.name: p.value for p in nb_details.default_params}
if 'cluster_profile' in default_params_by_name:
if args.get("cluster_profile") == default_params_by_name["cluster_profile"]:
args['cluster_profile'] = f"slurm_prof_{request_time:%y%m%d_%H%M%S.%f}"
# wait on all jobs to run and then finalize the run by creating a report from the notebooks
out_path = Path(default_report_path) / nb_details.detector / nb_details.caltype / datetime.now().isoformat()
if try_report_to_output:
if "out_folder" in args:
out_path = Path(args["out_folder"]).absolute()
else:
print(f"No 'out_folder' given, outputting to '{out_path}' instead.")
out_path.mkdir(parents=True, exist_ok=True)
# Use given report name, or automatic unique name if not specified
det_name = args.get('karabo_id', nb_details.detector)
unique_name = f"{det_name}-{nb_details.caltype}-{request_time:%y%m%d_%H%M%S.%f}"
if args['skip_report']:
report_to = ''
elif args["report_to"] is None:
report_to = out_path / f"{unique_name}.pdf"
print(f"report_to not specified, will use {report_to}")
else:
report_to = Path(args["report_to"]).with_suffix('.pdf').absolute()
if report_to:
# Work dir matching report file but without .pdf
cal_work_dir = report_to.with_suffix('')
else:
cal_work_dir = out_path / unique_name
cal_work_dir.mkdir(parents=True)
# Write all input parameters to rst file to be included to final report
parms = parameter_values(nb_details.default_params, **args)
(cal_work_dir / "InputParameters.rst").write_text(make_par_table(parms))
# And save the invocation of this script itself
(cal_work_dir / "run_calibrate.sh").write_text(
f'# pycalibration version: {version}\n' + shlex.join(argv)
)
# Copy the bash script which will be used to run notebooks
shutil.copy2(
os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"),
cal_work_dir / "pycalib-run-nb.sh"
)
if nb_details.user_venv:
print("Using specified venv:", nb_details.user_venv)
python_exe = str(nb_details.user_venv / 'bin' / 'python')
else:
python_exe = python_path
# Write metadata about calibration job to output folder
metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir, new=True)
parm_subdict = metadata.setdefault("calibration-configurations", {})
for p in parms:
name = consolize_name(p.name)
parm_subdict[name] = p.value
metadata["pycalibration-version"] = version
metadata["report-path"] = str(report_to) if report_to \
else '# REPORT SKIPPED #'
metadata['reproducible'] = not args['not_reproducible']
metadata["concurrency"] = {
'parameter': concurrency_par,
'default': concurrency_defval,
'function': concurrency_func,
}
metadata["notebook"] = {
'title': title,
'author': author,
}
metadata['python-environment'] = {
'path': python_exe,
'python-version': get_python_version(python_exe),
}
if args["constants_from"]:
with open(args["constants_from"], "r", encoding='utf-8') as f:
d = yaml.safe_load(f)
metadata["retrieved-constants"] = d["retrieved-constants"]
metadata.save()
# Record installed Python packages for reproducing the environment
if not args['skip_env_freeze']:
with (cal_work_dir / 'requirements.txt').open('wb') as f:
check_call([python_exe, '-m', 'pip', 'freeze'], stdout=f)
folder = get_par_attr(parms, 'in_folder', 'value', '')
pre_jobs = []
cluster_cores = concurrency.get("cluster cores", 8)
# Check if there are pre-notebooks
for pre_notebook_path in nb_details.pre_paths:
lead_nb = nbformat.read(pre_notebook_path, as_version=4)
pre_jobs.append(prepare_job(
cal_work_dir, lead_nb, pre_notebook_path, args,
cluster_cores=cluster_cores
))
main_jobs = []
if concurrency_par is None:
main_jobs.append(prepare_job(
cal_work_dir, nb,
notebook_path, args,
cluster_cores=cluster_cores,
))
else:
cvals = args.get(concurrency_par, None)
# Consider [-1] as None
if (cvals is None or cvals == [-1]) and concurrency_defval is not None:
print(f"Concurrency parameter '{concurrency_par}' "
f"is taken from notebooks.py")
cvals = concurrency_defval if isinstance(concurrency_defval, (list, tuple)) else range(concurrency_defval)
if cvals is None:
defcval = get_par_attr(parms, concurrency_par, 'value')
if defcval is not None:
print(f"Concurrency parameter '{concurrency_par}' "
f"is taken from '{notebook_path}'")
cvals = defcval if isinstance(defcval, (list, tuple)) else [defcval]
if concurrency_func:
func = get_notebook_function(nb, concurrency_func)
if func is None:
warnings.warn(
f"Didn't find concurrency function {concurrency_func} in notebook",
RuntimeWarning
)
else:
df = {}
exec(func, df)
f = df[concurrency_func]
import inspect
sig = inspect.signature(f)
if cvals:
# in case default needs to be used for function call
args[concurrency_par] = cvals
callargs = [args[arg] for arg in sig.parameters]
cvals = f(*callargs)
print(f"Split concurrency into {cvals}")
if cvals is None:
raise ValueError(
f"No values found for {concurrency_par} (concurrency parameter)"
)
# get expected type
cvtype = get_par_attr(parms, concurrency_par, 'type', list)
cvals = remove_duplications(cvals)
for cnum, cval in enumerate(cvals):
show_title = cnum == 0
cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval
main_jobs.append(prepare_job(
cal_work_dir, nb, notebook_path, args,
concurrency_par, cval,
cluster_cores=cluster_cores,
show_title=show_title,
))
# Prepare dependent notebooks (e.g. summaries after correction)
dep_jobs = []
for i, dep_notebook_path in enumerate(nb_details.dep_paths):
dep_nb = nbformat.read(dep_notebook_path, as_version=4)
dep_jobs.append(prepare_job(
cal_work_dir, dep_nb, dep_notebook_path, args,
cluster_cores=cluster_cores,
))
job_chain = JobChain([
Step(pre_jobs),
Step(main_jobs),
Step(dep_jobs, after_error=True)
], Path(cal_work_dir), python_exe)
# Save information about jobs for reproducibility
job_chain.save()
if args['prepare_only']:
print("Files prepared, not executing now (--prepare-only option).")
print("To execute the notebooks, run:")
rpt_opts = ''
if nb_details.user_venv is not None:
rpt_opts = f'--python {python_exe}'
print(f" python -m xfel_calibrate.repeat {cal_work_dir} {rpt_opts}")
return
print("Calibration work directory (including Slurm .out files):")
print(" ", cal_work_dir)
submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
# Launch the calibration work
if args["no_cluster_job"]:
print("Running notebooks directly, not via Slurm...")
errors = job_chain.run_direct()
joblist = []
else:
print("Submitting jobs to Slurm...")
joblist = job_chain.submit_jobs(SlurmOptions(
job_name=args.get('slurm_name', 'xfel_calibrate'),
nice=args['slurm_scheduling'],
mem=args['slurm_mem'],
reservation=args['reservation'],
partition=args['slurm_partition'],
))
errors = False
fmt_args = {'cal_work_dir': cal_work_dir,
'out_path': out_path,
'version': version,
'title': title,
'author': author,
'report_to': report_to,
'in_folder': folder,
'request_time': request_time.strftime("%Y-%m-%dT%H:%M:%S"),
'submission_time': submission_time,
}
joblist.append(run_finalize(
fmt_args=fmt_args,
cal_work_dir=cal_work_dir,
job_list=joblist,
sequential=args["no_cluster_job"],
partition=args["slurm_partition"] or "exfel",
))
if any(j is not None for j in joblist):
print("Submitted the following SLURM jobs: {}".format(",".join(joblist)))
return int(errors)
if __name__ == "__main__":
sys.exit(run())