From 1deabcfba124a26c91faaffbd20ffdee50a21118 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Thu, 25 Aug 2022 18:16:53 +0100 Subject: [PATCH] Write directly to 'archive' location instead of moving files there later --- src/xfel_calibrate/calibrate.py | 114 +++++++++++++------------------- src/xfel_calibrate/finalize.py | 81 +++++++++-------------- src/xfel_calibrate/repeat.py | 18 ++--- 3 files changed, 86 insertions(+), 127 deletions(-) diff --git a/src/xfel_calibrate/calibrate.py b/src/xfel_calibrate/calibrate.py index bd03fd8ca..aaf27e46f 100755 --- a/src/xfel_calibrate/calibrate.py +++ b/src/xfel_calibrate/calibrate.py @@ -38,7 +38,6 @@ from .settings import ( launcher_command, python_path, sprof, - temp_path, try_report_to_output, ) @@ -181,11 +180,11 @@ def flatten_list(l): return '' -def create_finalize_script(fmt_args, temp_path, job_list) -> str: +def create_finalize_script(fmt_args, cal_work_dir, job_list) -> str: """ Create a finalize script to produce output report :param fmt_args: Dictionary of fmt arguments - :param temp_path: Path to temporary folder to run slurm job + :param cal_work_dir: Path to temporary folder to run slurm job :param job_list: List of slurm jobs :return: The path of the created script """ @@ -196,7 +195,7 @@ def create_finalize_script(fmt_args, temp_path, job_list) -> str: finalize(joblist={{joblist}}, finaljob=os.environ.get('SLURM_JOB_ID', ''), - run_path='{{run_path}}', + cal_work_dir='{{cal_work_dir}}', out_path='{{out_path}}', version='{{version}}', title='{{title}}', @@ -209,7 +208,7 @@ def create_finalize_script(fmt_args, temp_path, job_list) -> str: """) fmt_args['joblist'] = job_list - f_name = os.path.join(temp_path, "finalize.py") + f_name = os.path.join(cal_work_dir, "finalize.py") with open(f_name, "w") as finfile: finfile.write(textwrap.dedent(tmpl.render(**fmt_args))) @@ -221,10 +220,10 @@ def create_finalize_script(fmt_args, temp_path, job_list) -> str: def run_finalize( - fmt_args, temp_path, job_list, sequential=False, partition=None): + fmt_args, cal_work_dir, job_list, sequential=False, partition=None): if partition is None: partition = "exfel" - finalize_script = create_finalize_script(fmt_args, temp_path, job_list) + finalize_script = create_finalize_script(fmt_args, cal_work_dir, job_list) cmd = [] if not sequential: @@ -232,7 +231,7 @@ def run_finalize( 'sbatch', '--parsable', '--requeue', - '--output', f'{temp_path}/slurm-%j.out', + '--output', f'{cal_work_dir}/slurm-%j.out', '--open-mode=append', # So we can see if it's preempted & requeued '--job-name', 'xfel-cal-finalize', '--time', finalize_time_limit, @@ -244,7 +243,7 @@ def run_finalize( cmd += [ os.path.join(PKG_DIR, "bin", "slurm_finalize.sh"), # path to helper sh sys.executable, # Python with calibration machinery installed - temp_path, + cal_work_dir, finalize_script, fmt_args['report_to'] ] @@ -257,20 +256,6 @@ def run_finalize( return jobid -def save_executed_command(run_tmp_path, version, argv): - """ - Create a file with string used to execute `xfel_calibrate` - - :param run_tmp_path: path to temporary directory for running job outputs - :param version: git version of the pycalibration package - """ - - f_name = os.path.join(run_tmp_path, "run_calibrate.sh") - with open(f_name, "w") as finfile: - finfile.write(f'# pycalibration version: {version}\n') - finfile.write(shlex.join(argv)) - - class SlurmOptions: def __init__( self, job_name=None, nice=None, mem=None, partition=None, reservation=None, @@ -291,17 +276,16 @@ class SlurmOptions: return ['--reservation', self.reservation] return ['--partition', self.partition or sprof] - def get_launcher_command(self, temp_path, after_ok=(), after_any=()) -> List[str]: + def get_launcher_command(self, log_dir, after_ok=(), after_any=()) -> List[str]: """ Return a slurm launcher command - :param args: Command line arguments - :param temp_path: Temporary path to run job + :param log_dir: Where Slurm .out log files should go :param after_ok: A list of jobs which must succeed first :param after_any: A list of jobs which must finish first, but may fail :return: List of commands and parameters to be used by subprocess """ - launcher_slurm = launcher_command.format(temp_path=temp_path).split() + launcher_slurm = launcher_command.format(temp_path=log_dir).split() launcher_slurm += self.get_partition_or_reservation() @@ -340,7 +324,7 @@ def remove_duplications(l) -> list: def prepare_job( - temp_path: str, nb, nb_path: Path, args: dict, cparm=None, cval=None, + cal_work_dir: Path, nb, nb_path: Path, args: dict, cparm=None, cval=None, cluster_cores=8, show_title=True, ) -> 'JobArgs': if cparm is not None: @@ -369,8 +353,7 @@ def prepare_job( set_figure_format(new_nb, args["vector_figs"]) new_name = f"{nb_path.stem}__{cparm}__{suffix}.ipynb" - nbpath = os.path.join(temp_path, new_name) - nbformat.write(new_nb, nbpath) + nbformat.write(new_nb, cal_work_dir / new_name) return JobArgs([ "./pycalib-run-nb.sh", # ./ allows this to run directly @@ -493,12 +476,11 @@ class JobChain: return errors -def make_par_table(parms, run_tmp_path: str): +def make_par_table(parms): """ - Create a table with input parameters of the notebook + Create a RST table with input parameters of the notebook :param parms: parameters of the notebook - :param run_tmp_path: path to temporary directory for running job outputs """ # Add space in long strings without line breakers ` ,-/` to @@ -554,9 +536,7 @@ def make_par_table(parms, run_tmp_path: str): \end{longtable} ''') - f_name = os.path.join(run_tmp_path, "InputParameters.rst") - with open(f_name, "w") as finfile: - finfile.write(textwrap.dedent(tmpl.render(p=col_type, lines=l_parms))) + return textwrap.dedent(tmpl.render(p=col_type, lines=l_parms)) def run(argv=None): @@ -611,23 +591,6 @@ def run(argv=None): if args.get("cluster_profile") == default_params_by_name["cluster_profile"]: args['cluster_profile'] = "slurm_prof_{}".format(run_uuid) - # create a temporary output directory to work in - run_tmp_path = os.path.join( - temp_path, f"slurm_out_{nb_details.detector}_{nb_details.caltype}_{run_uuid}" - ) - os.makedirs(run_tmp_path) - - # Write all input parameters to rst file to be included to final report - parms = parameter_values(nb_details.default_params, **args) - make_par_table(parms, run_tmp_path) - # And save the invocation of this script itself - save_executed_command(run_tmp_path, version, argv) - - # Copy the bash script which will be used to run notebooks - shutil.copy2( - os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), - os.path.join(run_tmp_path, "pycalib-run-nb.sh") - ) # wait on all jobs to run and then finalize the run by creating a report from the notebooks out_path = Path(default_report_path) / nb_details.detector / nb_details.caltype / datetime.now().isoformat() @@ -654,6 +617,27 @@ def run(argv=None): print(f"report_to path contained no path, saving report in '{out_path}'") report_to = out_path / report_to + workdir_name = f"slurm_out_{nb_details.detector}_{nb_details.caltype}_{run_uuid}" + if report_to: + cal_work_dir = report_to / workdir_name + else: + cal_work_dir = out_path / workdir_name + cal_work_dir.mkdir(parents=True) + + # Write all input parameters to rst file to be included to final report + parms = parameter_values(nb_details.default_params, **args) + (cal_work_dir / "InputParameters.rst").write_text(make_par_table(parms)) + # And save the invocation of this script itself + (cal_work_dir / "run_calibrate.sh").write_text( + f'# pycalibration version: {version}\n' + shlex.join(argv) + ) + + # Copy the bash script which will be used to run notebooks + shutil.copy2( + os.path.join(PKG_DIR, "bin", "slurm_calibrate.sh"), + cal_work_dir / "pycalib-run-nb.sh" + ) + if nb_details.user_venv: print("Using specified venv:", nb_details.user_venv) python_exe = str(nb_details.user_venv / 'bin' / 'python') @@ -661,7 +645,7 @@ def run(argv=None): python_exe = python_path # Write metadata about calibration job to output folder - metadata = cal_tools.tools.CalibrationMetadata(run_tmp_path, new=True) + metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir, new=True) parm_subdict = metadata.setdefault("calibration-configurations", {}) for p in parms: @@ -693,7 +677,7 @@ def run(argv=None): # Record installed Python packages for reproducing the environment if not args['skip_env_freeze']: - with open(os.path.join(run_tmp_path, 'requirements.txt'), 'wb') as f: + with (cal_work_dir / 'requirements.txt').open('wb') as f: check_call([python_exe, '-m', 'pip', 'freeze'], stdout=f) folder = get_par_attr(parms, 'in_folder', 'value', '') @@ -709,14 +693,14 @@ def run(argv=None): for pre_notebook_path in nb_details.pre_paths: lead_nb = nbformat.read(pre_notebook_path, as_version=4) pre_jobs.append(prepare_job( - run_tmp_path, lead_nb, pre_notebook_path, args, + cal_work_dir, lead_nb, pre_notebook_path, args, cluster_cores=cluster_cores )) main_jobs = [] if concurrency_par is None: main_jobs.append(prepare_job( - run_tmp_path, nb, + cal_work_dir, nb, notebook_path, args, cluster_cores=cluster_cores, )) @@ -770,7 +754,7 @@ def run(argv=None): cval = [cval, ] if not isinstance(cval, list) and cvtype is list else cval main_jobs.append(prepare_job( - run_tmp_path, nb, notebook_path, args, + cal_work_dir, nb, notebook_path, args, concurrency_par, cval, cluster_cores=cluster_cores, show_title=show_title, @@ -781,7 +765,7 @@ def run(argv=None): for i, dep_notebook_path in enumerate(nb_details.dep_paths): dep_nb = nbformat.read(dep_notebook_path, as_version=4) dep_jobs.append(prepare_job( - run_tmp_path, dep_nb, dep_notebook_path, args, + cal_work_dir, dep_nb, dep_notebook_path, args, cluster_cores=cluster_cores, )) @@ -789,22 +773,18 @@ def run(argv=None): Step(pre_jobs), Step(main_jobs), Step(dep_jobs, after_error=True) - ], Path(run_tmp_path), python_exe) + ], Path(cal_work_dir), python_exe) # Save information about jobs for reproducibility job_chain.save() if args['prepare_only']: - # FIXME: Clean up where this file goes when. - # When the jobs run, it is copied by finalize.py - metadata.save_copy(Path(run_tmp_path)) - print("Files prepared, not executing now (--prepare-only option).") print("To execute the notebooks, run:") rpt_opts = '' if nb_details.user_venv is not None: rpt_opts = f'--python {python_exe}' - print(f" python -m xfel_calibrate.repeat {run_tmp_path} {rpt_opts}") + print(f" python -m xfel_calibrate.repeat {cal_work_dir} {rpt_opts}") return submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') @@ -826,7 +806,7 @@ def run(argv=None): )) errors = False - fmt_args = {'run_path': run_tmp_path, + fmt_args = {'run_path': cal_work_dir, 'out_path': out_path, 'version': version, 'title': title, @@ -839,7 +819,7 @@ def run(argv=None): joblist.append(run_finalize( fmt_args=fmt_args, - temp_path=run_tmp_path, + cal_work_dir=cal_work_dir, job_list=joblist, sequential=args["no_cluster_job"], partition=args["slurm_partition"] or "exfel", diff --git a/src/xfel_calibrate/finalize.py b/src/xfel_calibrate/finalize.py index 67f6fbddd..0911f0afb 100644 --- a/src/xfel_calibrate/finalize.py +++ b/src/xfel_calibrate/finalize.py @@ -1,9 +1,8 @@ import re import sys from datetime import datetime -from glob import glob from importlib.machinery import SourceFileLoader -from os import chdir, listdir, path, stat +from os import chdir, listdir, path from pathlib import Path from shutil import copy, copytree, move, rmtree from subprocess import CalledProcessError, check_call, check_output @@ -37,18 +36,18 @@ def natural_keys(text): return [atoi(c) for c in re.split(r'(\d+)', text)] -def combine_report(run_path): - sphinx_path = Path(run_path, "sphinx_rep").resolve() +def combine_report(cal_work_dir): + sphinx_path = Path(cal_work_dir, "sphinx_rep").resolve() # if the finalize job was preempted or requeued, # while building the report. if sphinx_path.is_dir(): rmtree(sphinx_path) sphinx_path.mkdir(parents=True) - direntries = listdir(run_path) + direntries = listdir(cal_work_dir) direntries.sort(key=natural_keys) for entry in direntries: - entry = Path(run_path, entry) + entry = Path(cal_work_dir, entry) if entry.suffix == '.rst' and entry.is_file(): comps = entry.stem.split("__") if len(comps) >= 3: @@ -85,7 +84,7 @@ def combine_report(run_path): return sphinx_path -def prepare_plots(run_path, threshold=1000000): +def prepare_plots(cal_work_dir: Path, threshold=1_000_000): """ Convert svg file to pdf or png to be used for latex @@ -96,36 +95,25 @@ def prepare_plots(run_path, threshold=1000000): The links in the rst files are adapted accordingly to the converted image files. - :param run_path: Run path of the slurm job + :param cal_work_dir: Run path of the slurm job :param threshold: Max svg file size (in bytes) to be converted to pdf """ print('Convert svg to pdf and png') - run_path = path.abspath(run_path) - rst_files = glob('{}/*rst'.format(run_path)) - for rst_file in rst_files: - rst_file_name = path.basename(rst_file) - rst_file_name = path.splitext(rst_file_name)[0] + for rst_file in cal_work_dir.glob("*.rst"): - svg_files = glob( - '{}/{}_files/*svg'.format(run_path, rst_file_name)) - for f_path in svg_files: - f_name = path.basename(f_path) - f_name = path.splitext(f_name)[0] - - if (stat(f_path)).st_size < threshold: - check_call(["svg2pdf", "{}".format(f_path)], shell=False) + for f_path in (cal_work_dir / f'{rst_file.stem}_files').glob('*.svg'): + if f_path.stat().st_size < threshold: + check_call(["svg2pdf", str(f_path)], shell=False) new_ext = 'pdf' else: - check_call(["convert", "{}".format(f_path), - "{}.png".format(f_name)], shell=False) + check_call(["convert", str(f_path), + str(f_path.with_suffix('.png'))], shell=False) new_ext = 'png' - check_call(["sed", - "-i", - "s/{}.svg/{}.{}/g".format(f_name, f_name, new_ext), - rst_file], - shell=False) + check_call([ + "sed", "-i", f"s/{f_path.name}/{f_path.stem}.{new_ext}/g", rst_file + ], shell=False) def get_job_info(jobs: List[str], fmt: List[str]) -> List[List[str]]: @@ -156,12 +144,12 @@ def get_job_info(jobs: List[str], fmt: List[str]) -> List[List[str]]: return [job_info[job] for job in jobs] -def make_timing_summary(run_path: Path, job_times: List[List[str]], +def make_timing_summary(cal_work_dir: Path, job_times: List[List[str]], job_time_fmt: List[str], pipeline_times: Dict[str, str]): """ Create an rst file with timing summary of executed notebooks - :param run_path: Run path of the slurm job + :param cal_work_dir: Run path of the slurm job :param job_times: List of job information as returned by get_job_info :param job_time_fmt: List of headers to use for job_times :param pipeline_times: Dictionary of pipeline step -> timestamp @@ -193,7 +181,7 @@ def make_timing_summary(run_path: Path, job_times: List[List[str]], ["Report compilation", pipeline_times["report-compilation-time"]], ] - with (run_path / "timing_summary.rst").open("w+") as fd: + with (cal_work_dir / "timing_summary.rst").open("w+") as fd: time_table = tabulate.tabulate(time_vals, tablefmt="latex", headers=["Processing step", "Timestamp"]) @@ -205,7 +193,7 @@ def make_timing_summary(run_path: Path, job_times: List[List[str]], fd.write(dedent(job_tbl_tmpl.render(job_table=job_table.split("\n")))) -def make_report(run_path: Path, tmp_path: Path, project: str, +def make_report(run_path: Path, cal_work_dir: Path, project: str, author: str, version: str, report_to: Path): """ Create calibration report (pdf file) @@ -214,7 +202,7 @@ def make_report(run_path: Path, tmp_path: Path, project: str, jupyter-notebooks. :param run_path: Path to sphinx run directory - :param tmp_path: Run path of the slurm job + :param cal_work_dir: Run path of the slurm job :param project: Project title :param author: Author of the notebook :param version: Version of the notebook @@ -330,7 +318,7 @@ def make_report(run_path: Path, tmp_path: Path, project: str, copy(run_path / "_build" / "latex" / f"{report_name}.pdf", report_dir) # Remove folders with figures and sphinx files. - for tmp_subdir in tmp_path.iterdir(): + for tmp_subdir in cal_work_dir.iterdir(): if tmp_subdir.is_dir(): print(f"Removing temporary subdir: {tmp_subdir}") rmtree(tmp_subdir) @@ -382,15 +370,15 @@ def tex_escape(text): return regex.sub(lambda match: conv[match.group()], text) -def finalize(joblist, finaljob, run_path, out_path, version, title, author, report_to, data_path='Unknown', +def finalize(joblist, finaljob, cal_work_dir, out_path, version, title, author, report_to, data_path='Unknown', request_time='', submission_time=''): - run_path = Path(run_path) + cal_work_dir = Path(cal_work_dir) out_path = Path(out_path) # Archiving files in slurm_tmp if finaljob: joblist.append(str(finaljob)) - metadata = cal_tools.tools.CalibrationMetadata(run_path) + metadata = cal_tools.tools.CalibrationMetadata(cal_work_dir) job_time_fmt = 'JobID,Start,End,Elapsed,Suspended,State'.split(',') job_time_summary = get_job_info(joblist, job_time_fmt) @@ -399,7 +387,7 @@ def finalize(joblist, finaljob, run_path, out_path, version, title, author, repo "submission-time": submission_time, "report-compilation-time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S"), } - make_timing_summary(run_path, job_time_summary, job_time_fmt, pipeline_time_summary) + make_timing_summary(cal_work_dir, job_time_summary, job_time_fmt, pipeline_time_summary) metadata.update( { "runtime-summary": { @@ -413,21 +401,18 @@ def finalize(joblist, finaljob, run_path, out_path, version, title, author, repo if report_to: report_to = Path(report_to) - prepare_plots(run_path) + prepare_plots(cal_work_dir) - sphinx_path = combine_report(run_path) + sphinx_path = combine_report(cal_work_dir) make_titlepage(sphinx_path, title, data_path, version) make_report( Path(sphinx_path), - run_path, + cal_work_dir, title, author, version, report_to, ) - # Store the contents of the Slurm working directory next to the report. - # This contains elements needed for reproducibility. - slurm_archive_dir = report_to.parent / f"slurm_out_{report_to.name}" det = metadata['calibration-configurations'].get('karabo-id', report_to.name) else: try: @@ -438,14 +423,8 @@ def finalize(joblist, finaljob, run_path, out_path, version, title, author, repo from hashlib import sha1 det = sha1(''.join(joblist).encode('ascii')).hexdigest()[:8] - # If there is no report location, simply move the slurm_out_ - # directory to the output. - slurm_archive_dir = Path(out_path) / f"slurm_out_{det}" - - print(f"Moving temporary files to final location: {slurm_archive_dir}") - move(str(run_path), str(slurm_archive_dir)) # Needs str until Python 3.9 + md_path = cal_work_dir / "calibration_metadata.yml" - md_path = slurm_archive_dir / "calibration_metadata.yml" # Notebooks should have a karabo_id parameter, which we'll use to make a # unique name like calibration_metadata_MID_DET_AGIPD1M-1.yml in the output # folder. In case they don't, fall back to a name like the report. diff --git a/src/xfel_calibrate/repeat.py b/src/xfel_calibrate/repeat.py index 9a25487e2..9992ac59b 100644 --- a/src/xfel_calibrate/repeat.py +++ b/src/xfel_calibrate/repeat.py @@ -123,13 +123,13 @@ def main(argv=None): start_time = datetime.now() run_uuid = f"t{start_time:%y%m%d_%H%M%S}" - working_dir = Path(temp_path, f'slurm_out_repeat_{run_uuid}') + cal_work_dir = Path(temp_path, f'slurm_out_repeat_{run_uuid}') copytree_no_metadata( - args.from_dir, working_dir, ignore=shutil.ignore_patterns('slurm-*.out') + args.from_dir, cal_work_dir, ignore=shutil.ignore_patterns('slurm-*.out') ) - print(f"New working directory: {working_dir}") + print(f"New working directory: {cal_work_dir}") - cal_metadata = CalibrationMetadata(working_dir) + cal_metadata = CalibrationMetadata(cal_work_dir) parameters = cal_metadata['calibration-configurations'] out_folder = parameters['out-folder'] @@ -137,7 +137,7 @@ def main(argv=None): if args.out_folder: out_folder = parameters['out-folder'] = args.out_folder params_to_set['out_folder'] = out_folder - update_notebooks_params(working_dir, params_to_set) + update_notebooks_params(cal_work_dir, params_to_set) if args.report_to: report_to = args.report_to @@ -149,11 +149,11 @@ def main(argv=None): # finalize & some notebooks expect yaml metadata in the output folder Path(out_folder).mkdir(parents=True, exist_ok=True) - shutil.copy(working_dir / 'calibration_metadata.yml', out_folder) + shutil.copy(cal_work_dir / 'calibration_metadata.yml', out_folder) py_version = cal_metadata.get('python-environment', {}).get('python-version') job_chain = JobChain.from_dir( - working_dir, python=get_python(args, py_version) + cal_work_dir, python=get_python(args, py_version) ) if args.no_cluster_job: job_chain.run_direct() @@ -164,7 +164,7 @@ def main(argv=None): mem=args.slurm_mem, )) - fmt_args = {'run_path': working_dir, + fmt_args = {'run_path': cal_work_dir, 'out_path': out_folder, 'version': get_pycalib_version(), 'report_to': report_to, @@ -175,7 +175,7 @@ def main(argv=None): joblist.append(run_finalize( fmt_args=fmt_args, - temp_path=working_dir, + cal_work_dir=cal_work_dir, job_list=joblist, sequential=args.no_cluster_job, partition=args.slurm_partition -- GitLab