#!/usr/bin/env python import json import locale import math import os import re import shlex import shutil import stat import sys import textwrap import warnings from datetime import datetime from pathlib import Path from subprocess import DEVNULL, call, check_call, check_output from typing import List, Union import nbformat import numpy as np import yaml from jinja2 import Template from nbparameterise import extract_parameters, parameter_values, replace_definitions import cal_tools.tools from .finalize import tex_escape from .nb_args import ( consolize_name, first_markdown_cell, get_notebook_function, parse_argv_and_load_nb, set_figure_format, ) from .settings import ( default_report_path, finalize_time_limit, launcher_command, python_path, sprof, try_report_to_output, ) PKG_DIR = os.path.dirname(os.path.abspath(__file__)) def extract_title_author(nb): """ Tries to extract title, author from markdown. The version is taken from git. """ first_md = first_markdown_cell(nb) if first_md is None: return None, None source = first_md["source"] title = re.findall(r'#+\s*(.*)\s*#+', source) author = re.findall( r'author[\s]*[:][\s]*(.*?)\s*(?:[,?]|version)', source, flags=re.IGNORECASE) title = title[0] if len(title) else None author = author[0] if len(author) else None return title, author def get_pycalib_version(): # Try to get version from the git # Will work only in case of the development installation # Suppress output errors # In case of a standard installation a version is stored # in the _version.py file try: git_dir = os.path.join(PKG_DIR, '..', '..', '.git') version = check_output([ 'git', f'--git-dir={git_dir}', 'describe', '--tag', ], stderr=DEVNULL).decode('utf8') version = version.replace("\n", "") except: from .VERSION import __version__ version = __version__ return version def get_python_version(python_exe): if python_exe == sys.executable: return '{}.{}.{}'.format(*sys.version_info) return check_output([python_exe, '--version']).decode('utf-8').split()[1] def balance_sequences(in_folder: str, run: int, sequences: List[int], sequences_per_node: int, karabo_da: Union[list, str], max_nodes: int = 8): """Return balance list of sequences to be executed on slurm nodes Total list of sequences is splitted onto several nodes based on sequences_per_node. If the number of the required nodes is more than the max_nodes, the number of sequences_per_node is adjusted. :param in_folder: Path to the input raw data without the run number. :param run: Run number. :param sequences: List of sequences. [-1] for obtaining all. :param sequences_per_node: Number of sequences to process per a node. :param karabo_da: Karabo data aggregator used as data file inset. :param max_nodes: Maximum number of maxwell nodes to use. :return: Balanced list of sequences. """ # TODO: some small detector notebooks have karabo_da as a list. # remove this str check after unifying the expected type across # correction notebooks. if isinstance(karabo_da, str): karabo_da = [karabo_da] elif not isinstance(karabo_da, list): raise TypeError("Balance sequences expects `karabo_da` as a string or list.") in_path = Path(in_folder, f"r{run:04d}") # TODO: remove ["-1"] after karabo_da refactor if karabo_da in [["-1"], ["all"]]: karabo_da = [""] # Get all possible sequences for the selected karabo_da sequence_files = [] for k_da in karabo_da: sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5")) # Extract sequences from input files. seq_nums = {int(sf.stem[-5:]) for sf in sequence_files} # Validate selected sequences with sequences in in_folder if sequences != [-1]: seq_nums = sorted(seq_nums.intersection(sequences)) if len(seq_nums) == 0: raise ValueError( f"Selected sequences {sequences} are not " f"available in {in_path}" ) # Validate required nodes with max_nodes nsplits = len(seq_nums) // sequences_per_node if nsplits > max_nodes: sequences_per_node = math.ceil(len(seq_nums)/max_nodes) nsplits = max_nodes print(f"Changed to {sequences_per_node} sequences per node") print(f"to have a maximum of {max_nodes} concurrent jobs") elif nsplits == 0: nsplits = 1 return [l.tolist() for l in np.array_split(list(seq_nums), nsplits) if l.size > 0] def get_par_attr(parms, key, attr, default=None): """ Return the type of parameter with name key :param parms: List of parameters :param key: Name of the parameter to be considered :param attr: Name of the parameter attribute (e.g. value, type) :param default: Type to be returned if interested name is not found :return: The attribute of the parameter """ for p in parms: if p.name == key: return getattr(p, attr, default) return default def flatten_list(l): """ Make a string representation of a list :param l: List or a string :return: Same string or string with first and last entry of a list """ if not isinstance(l, list): return str(l) if len(l) > 1: return '{}-{}'.format(l[0], l[-1]) elif len(l) == 1: return '{}'.format(l[0]) else: return '' def create_finalize_script(fmt_args, cal_work_dir, job_list) -> str: """ Create a finalize script to produce output report :param fmt_args: Dictionary of fmt arguments :param cal_work_dir: Path to temporary folder to run slurm job :param job_list: List of slurm jobs :return: The path of the created script """ tmpl = Template("""\ #!/usr/bin/env python3 import os from xfel_calibrate.finalize import finalize finalize(joblist={{joblist}}, finaljob=os.environ.get('SLURM_JOB_ID', ''), cal_work_dir='{{cal_work_dir}}', out_path='{{out_path}}', version='{{version}}', title='{{title}}', author='{{author}}', report_to='{{report_to}}', data_path='{{in_folder}}', request_time='{{request_time}}', submission_time='{{submission_time}}') """) fmt_args['joblist'] = job_list f_name = os.path.join(cal_work_dir, "finalize.py") with open(f_name, "w") as finfile: finfile.write(textwrap.dedent(tmpl.render(**fmt_args))) # change rights of the file to be: # executed and writable for user, readable for user, group and others all_stats = stat.S_IXUSR | stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH os.chmod(f_name, all_stats) return f_name def run_finalize( fmt_args, cal_work_dir, job_list, sequential=False, partition=None): if partition is None: partition = "exfel" finalize_script = create_finalize_script(fmt_args, cal_work_dir, job_list) cmd = [] if not sequential: cmd = [ 'sbatch', '--parsable', '--requeue', '--output', f'{cal_work_dir}/slurm-%j.out', '--open-mode=append', # So we can see if it's preempted & requeued '--job-name', 'xfel-cal-finalize', '--time', finalize_time_limit, '--partition', partition, "--dependency=afterany:" + ":".join(str(j) for j in job_list), ] print(" ".join(cmd)) cmd += [ os.path.join(PKG_DIR, "bin", "slurm_finalize.sh"), # path to helper sh sys.executable, # Python with calibration machinery installed cal_work_dir, finalize_script, fmt_args['report_to'] ] output = check_output(cmd, input=b'').decode('utf8') jobid = None if not sequential: jobid = output.partition(';')[0].strip() print("Submitted finalize job: {}".format(jobid)) return jobid class SlurmOptions: def __init__( self, job_name=None, nice=None, mem=None, partition=None, reservation=None, ): self.job_name = job_name or 'xfel_calibrate' self.nice = nice self.mem = mem self.partition = partition self.reservation = reservation def get_partition_or_reservation(self) -> List[str]: """Return sbatch arguments to use a partition or reservation --reservation and --slurm-partition options have precedence. Otherwise, a default partition is used. """ if self.reservation: return ['--reservation', self.reservation] return ['--partition', self.partition or sprof] def get_launcher_command(self, log_dir, after_ok=(), after_any=()) -> List[str]: """ Return a slurm launcher command :param log_dir: Where Slurm .out log files should go :param after_ok: A list of jobs which must succeed first :param after_any: A list of jobs which must finish first, but may fail :return: List of commands and parameters to be used by subprocess """ launcher_slurm = launcher_command.format(temp_path=log_dir).split() launcher_slurm += self.get_partition_or_reservation() launcher_slurm += ["--job-name", self.job_name] if self.nice: launcher_slurm.append(f"--nice={self.nice}") if self.mem: launcher_slurm.append(f"--mem={self.mem}G") deps = [] if after_ok: deps.append("afterok:" + ":".join(str(j) for j in after_ok)) if after_any: deps.append("afterany:" + ":".join(str(j) for j in after_any)) if deps: launcher_slurm.append("--dependency=" + ",".join(deps)) return launcher_slurm def remove_duplications(l) -> list: """ Remove duplicated elements in the list :param l: Input list :return: Output list of unique elements """ unique_l = [] for elem in l: if elem not in unique_l: unique_l.append(elem) if unique_l != l: print("Duplicated concurrency parameters were removed") return unique_l def prepare_job( cal_work_dir: Path, nb, nb_path: Path, args: dict, cparm=None, cval=None, cluster_cores=8, show_title=True, ) -> 'JobArgs': if cparm is not None: args[cparm] = cval suffix = flatten_list(cval) # first convert the notebook parms = extract_parameters(nb, lang='python') if any(p.name == "cluster_profile" for p in parms): cluster_profile = f"{args['cluster_profile']}_{suffix}" else: # Don't start ipcluster if there's no cluster_profile parameter cluster_profile = 'NO_CLUSTER' params = parameter_values(parms, **args) params = parameter_values( params, cluster_profile=cluster_profile, metadata_folder="." ) new_nb = replace_definitions(nb, params, execute=False, lang='python') if not show_title: title_cell = first_markdown_cell(new_nb) if title_cell is not None: title_cell.source = '' set_figure_format(new_nb, args["vector_figs"]) # In some cases a suffix for example can have `/`. e.g. LPDMini and GH2 # have karabo_da with a `/`. new_name = f"{nb_path.stem}__{cparm}__{suffix}.ipynb".replace("/", "-") nbformat.write(new_nb, cal_work_dir / new_name) return JobArgs([ "./pycalib-run-nb.sh", # ./ allows this to run directly new_name, "{python}", cluster_profile, os.path.splitext(new_name)[0].upper(), args["detector"].upper(), args["type"].upper(), str(cluster_cores), ]) class JobArgs: """Command line arguments for running one calibration job""" def __init__(self, args: List[str]): self.args = args def __repr__(self): return f"JobArgs({self.args})" def __eq__(self, other): return isinstance(other, JobArgs) and (self.args == other.args) def format_cmd(self, python): return [a.format(python=python) for a in self.args] def run_direct(self, work_dir, python) -> int: """Run this job in a local process, return exit status""" return call(self.format_cmd(python), cwd=work_dir) def submit_job( self, work_dir, python, slurm_opts, after_ok=(), after_any=(), env=None ): """Submit this job to Slurm, return its job ID""" cmd = slurm_opts.get_launcher_command(work_dir, after_ok, after_any) cmd += self.format_cmd(python) # sbatch propagates environment variables into the job by default output = check_output(cmd, cwd=work_dir, env=env).decode('utf-8') return output.partition(';')[0].strip() # job ID class Step: """A group of jobs which may run in parallel If after_error is True, this step should run even if previous steps failed. Otherwise, it will only run if the previous steps succeed. """ def __init__(self, jobs: List[JobArgs], after_error=False): self.jobs = jobs self.after_error = after_error def to_dict(self): return { 'jobs': [j.args for j in self.jobs], 'after_error': self.after_error, } @classmethod def from_dict(cls, d): return cls([JobArgs(args) for args in d['jobs']], d['after_error']) class JobChain: """A collection of jobs to run for one call to xfel-calibrate This is a chain of steps, each of which may contain several jobs. It also holds the work directory, where the parameterised notebooks are saved, and the path to the Python interpreter to run the notebooks. """ def __init__(self, steps: List[Step], work_dir: Path, python): self.steps = [s for s in steps if s.jobs] self.work_dir = work_dir self.python = python @classmethod def from_dir(cls, work_dir: Path, python): """Load a JobChain from a work directory containing steps.json""" d = json.loads((work_dir / 'steps.json').read_text('utf-8')) steps = [Step.from_dict(sd) for sd in d['steps']] return cls(steps, work_dir, python) def save(self): """Save the steps of this chain to steps.json in the work directory""" with (self.work_dir / 'steps.json').open('w', encoding='utf-8') as f: json.dump({ 'steps': [step.to_dict() for step in self.steps] }, f, indent=2) def submit_jobs(self, slurm_opts: SlurmOptions, env=None): """Submit these jobs to Slurm, return a list of job IDs Slurm dependencies are used to manage the sequence of jobs. """ print("Submitting jobs with Slurm options:") print(" ".join(slurm_opts.get_launcher_command(self.work_dir))) all_job_ids = [] dep_job_ids = () # Replaced after each step for step in self.steps: step_job_ids = [] kw = {('after_any' if step.after_error else 'after_ok'): dep_job_ids} for job_desc in step.jobs: jid = job_desc.submit_job( self.work_dir, self.python, slurm_opts, env=env, **kw ) step_job_ids.append(jid) dep_job_ids = step_job_ids all_job_ids.extend(step_job_ids) return all_job_ids def run_direct(self) -> bool: """Run these jobs in local processes, return True if any failed""" errors = False for i, step in enumerate(self.steps): if errors and not step.after_error: print(f"Not running step {i}, previous step failed") continue exit_codes = [ j.run_direct(self.work_dir, self.python) for j in step.jobs ] if any(ec != 0 for ec in exit_codes): errors = True return errors def make_par_table(parms): """ Create a RST table with input parameters of the notebook :param parms: parameters of the notebook """ # Add space in long strings without line breakers ` ,-/` to # wrap them in latex def split_len(seq, length): """ Splits a sequence into smaller segments of a specified length, concatenates them, and adds line-breaking characters to ensure proper line breaks in LaTeX. Args: seq (str): The sequence to be split. length (int): The desired length of each segment. Returns: str: The concatenated line with line-breaking characters. Examples: >>> split_len("slurm_prof_230711_095647.832671_0", 10) 'slurm\_prof\_230711\\-\_095647.832671\_0\-' """ lbc = set(' ,-/') line = '' for i in range(0, len(seq), length): sub_line = seq[i:i + length] line += sub_line.replace('/', '/\-') if not any(c in lbc for c in sub_line): line += '\-' return line # Prepare strings and estimate their length l_parms = [] len_parms = [0, 0] max_len = [20, 20] for p in parms: name = p.name.replace('_', '-') if len(name) > max_len[0]: len_parms[0] = max_len[0] name = split_len(name, max_len[0]) value = str(p.value) if len(value) > max_len[1]: len_parms[1] = max_len[1] value = split_len(value, max_len[1]) value = tex_escape(value) if issubclass(p.type, str): value = "``{}''".format(value) comment = tex_escape(str(p.comment)[1:]) l_parms.append([name, value, comment]) # Fix column width is needed col_type = ['l', 'c', 'p{.3\\textwidth}'] if len_parms[0] == max_len[0]: col_type[0] = col_type[2] if len_parms[1] == max_len[1]: col_type[1] = col_type[2] tmpl = Template(''' Input of the calibration pipeline ================================= .. raw:: latex \\begin{longtable}{ {% for k in p %}{{k}}{%- endfor %} } \hline {% for k in lines %} {{ k[0] }} & {{ k[1] }} & {{ k[2] }} \\\\ {%- endfor %} \hline \end{longtable} ''') return textwrap.dedent(tmpl.render(p=col_type, lines=l_parms)) def run(argv=None): """ Run a calibration task with parser arguments """ # Ensure files are opened as UTF-8 by default, regardless of environment. if "readthedocs.org" not in sys.executable: locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8')) if argv is None: argv = sys.argv args, nb_details = parse_argv_and_load_nb(argv) concurrency = nb_details.concurrency concurrency_par = args["concurrency_par"] or concurrency['parameter'] if concurrency_par == concurrency['parameter']: # Use the defaults from notebook.py to split the work into several jobs concurrency_defval = concurrency.get('default concurrency', None) concurrency_func = concurrency.get('use function', None) else: # --concurrency-par specified something different from notebook.py: # don't use the associated settings from there. concurrency_defval = concurrency_func = None notebook_path = nb_details.path nb = nb_details.contents title, author = extract_title_author(nb) version = get_pycalib_version() if not title: title = f"{nb_details.detector} {nb_details.caltype} Calibration" if not author: author = "anonymous" if not version: version = "" title = title.rstrip() # request_time is in local timezone if args["request_time"] == "Now": request_time = datetime.now() else: request_time = datetime.fromisoformat(args["request_time"]) # check if concurrency parameter is given and we run concurrently if concurrency_par is not None and not any( p.name == concurrency_par for p in nb_details.default_params ): msg = f"Notebook cannot be run concurrently: no {concurrency_par} parameter" warnings.warn(msg, RuntimeWarning) # If not explicitly specified, use a new profile for ipcluster default_params_by_name = {p.name: p.value for p in nb_details.default_params} if 'cluster_profile' in default_params_by_name: if args.get("cluster_profile") == default_params_by_name["cluster_profile"]: args['cluster_profile'] = f"slurm_prof_{request_time:%y%m%d_%H%M%S.%f}" # wait on all jobs to run and then finalize the run by creating a report from the notebooks out_path = Path(default_report_path) / nb_details.detector / nb_details.caltype / datetime.now().isoformat() if try_report_to_output: if "out_folder" in args: out_path = Path(args["out_folder"]).absolute() else: print(f"No 'out_folder' given, outputting to '{out_path}' instead.") out_path.mkdir(parents=True, exist_ok=True) # Use given report name, or automatic unique name if not specified det_name = args.get('karabo_id', nb_details.detector) unique_name = f"{det_name}-{nb_details.caltype}-{request_time:%y%m%d_%H%M%S.%f}" if args['skip_report']: report_to = '' elif args["report_to"] is None: report_to = out_path / f"{unique_name}.pdf" print(f"report_to not specified, will use {report_to}") else: report_to = Path(args["report_to"]).with_suffix('.pdf').absolute() if report_to: # Work dir matching report file but without .pdf cal_work_dir = report_to.with_suffix('') else: cal_work_dir = out_path / unique_name cal_work_dir.mkdir(parents=True) # Write all input parameters to rst file to be included to final report parms = parameter_values(nb_details.default_params, **args) (cal_work_dir / "InputParameters.rst").write_text(make_par_table(parms)) # And save the invocation of this script itself (cal_work_dir / "run_calibrate.sh").write_text( f'# pycalibration version: {version}\n' + shlex.join(argv) ) # Copy the bash script which will be used to run notebooks shutil.copy2( os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), cal_work_dir / "pycalib-run-nb.sh" ) if nb_details.user_venv: print("Using specified venv:", nb_details.user_venv) python_exe = str(nb_details.user_venv / 'bin' / 'python') else: python_exe = python_path # Write metadata about calibration job to output folder metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir, new=True) parm_subdict = metadata.setdefault("calibration-configurations", {}) for p in parms: name = consolize_name(p.name) parm_subdict[name] = p.value metadata["pycalibration-version"] = version metadata["report-path"] = str(report_to) if report_to \ else '# REPORT SKIPPED #' metadata['reproducible'] = not args['not_reproducible'] metadata["concurrency"] = { 'parameter': concurrency_par, 'default': concurrency_defval, 'function': concurrency_func, } metadata["notebook"] = { 'title': title, 'author': author, } metadata['python-environment'] = { 'path': python_exe, 'python-version': get_python_version(python_exe), } if args["constants_from"]: with open(args["constants_from"], "r", encoding='utf-8') as f: d = yaml.safe_load(f) metadata["retrieved-constants"] = d["retrieved-constants"] metadata.save() # Record installed Python packages for reproducing the environment if not args['skip_env_freeze']: with (cal_work_dir / 'requirements.txt').open('wb') as f: check_call([python_exe, '-m', 'pip', 'freeze'], stdout=f) folder = get_par_attr(parms, 'in_folder', 'value', '') pre_jobs = [] cluster_cores = concurrency.get("cluster cores", 8) # Check if there are pre-notebooks for pre_notebook_path in nb_details.pre_paths: lead_nb = nbformat.read(pre_notebook_path, as_version=4) pre_jobs.append(prepare_job( cal_work_dir, lead_nb, pre_notebook_path, args, cluster_cores=cluster_cores )) main_jobs = [] if concurrency_par is None: main_jobs.append(prepare_job( cal_work_dir, nb, notebook_path, args, cluster_cores=cluster_cores, )) else: cvals = args.get(concurrency_par, None) # Consider [-1] as None if (cvals is None or cvals == [-1]) and concurrency_defval is not None: print(f"Concurrency parameter '{concurrency_par}' " f"is taken from notebooks.py") cvals = concurrency_defval if isinstance(concurrency_defval, (list, tuple)) else range(concurrency_defval) if cvals is None: defcval = get_par_attr(parms, concurrency_par, 'value') if defcval is not None: print(f"Concurrency parameter '{concurrency_par}' " f"is taken from '{notebook_path}'") cvals = defcval if isinstance(defcval, (list, tuple)) else [defcval] if concurrency_func: func = get_notebook_function(nb, concurrency_func) if func is None: warnings.warn( f"Didn't find concurrency function {concurrency_func} in notebook", RuntimeWarning ) else: df = {} exec(func, df) f = df[concurrency_func] import inspect sig = inspect.signature(f) if cvals: # in case default needs to be used for function call args[concurrency_par] = cvals callargs = [args[arg] for arg in sig.parameters] cvals = f(*callargs) print(f"Split concurrency into {cvals}") if cvals is None: raise ValueError( f"No values found for {concurrency_par} (concurrency parameter)" ) # get expected type cvtype = get_par_attr(parms, concurrency_par, 'type', list) cvals = remove_duplications(cvals) for cnum, cval in enumerate(cvals): show_title = cnum == 0 cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval main_jobs.append(prepare_job( cal_work_dir, nb, notebook_path, args, concurrency_par, cval, cluster_cores=cluster_cores, show_title=show_title, )) # Prepare dependent notebooks (e.g. summaries after correction) dep_jobs = [] for i, dep_notebook_path in enumerate(nb_details.dep_paths): dep_nb = nbformat.read(dep_notebook_path, as_version=4) dep_jobs.append(prepare_job( cal_work_dir, dep_nb, dep_notebook_path, args, cluster_cores=cluster_cores, )) job_chain = JobChain([ Step(pre_jobs), Step(main_jobs), Step(dep_jobs, after_error=True) ], Path(cal_work_dir), python_exe) # Save information about jobs for reproducibility job_chain.save() if args['prepare_only']: print("Files prepared, not executing now (--prepare-only option).") print("To execute the notebooks, run:") rpt_opts = '' if nb_details.user_venv is not None: rpt_opts = f'--python {python_exe}' print(f" python -m xfel_calibrate.repeat {cal_work_dir} {rpt_opts}") return print("Calibration work directory (including Slurm .out files):") print(" ", cal_work_dir) submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') # Launch the calibration work if args["no_cluster_job"]: print("Running notebooks directly, not via Slurm...") errors = job_chain.run_direct() joblist = [] else: print("Submitting jobs to Slurm...") joblist = job_chain.submit_jobs(SlurmOptions( job_name=args.get('slurm_name', 'xfel_calibrate'), nice=args['slurm_scheduling'], mem=args['slurm_mem'], reservation=args['reservation'], partition=args['slurm_partition'], )) errors = False fmt_args = {'cal_work_dir': cal_work_dir, 'out_path': out_path, 'version': version, 'title': title, 'author': author, 'report_to': report_to, 'in_folder': folder, 'request_time': request_time.strftime("%Y-%m-%dT%H:%M:%S"), 'submission_time': submission_time, } joblist.append(run_finalize( fmt_args=fmt_args, cal_work_dir=cal_work_dir, job_list=joblist, sequential=args["no_cluster_job"], partition=args["slurm_partition"] or "exfel", )) if any(j is not None for j in joblist): print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) return int(errors) if __name__ == "__main__": sys.exit(run())