Skip to content
Snippets Groups Projects

Feat/dss cimprove master rebasing

Closed Andrey Samartsev requested to merge feat/DSSCimproveMasterRebasing into feat/DSSCdarksImprove
2 files
+ 10
9
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 53
24
#!/usr/bin/env python
import argparse
import copy
from datetime import datetime
import dateutil.parser
import nbconvert
import nbformat
from nbparameterise import (
extract_parameters, replace_definitions, parameter_values
)
import os
from os import chdir
import pprint
import re
from subprocess import Popen, PIPE, check_output
from subprocess import check_output
import sys
from uuid import uuid4
import warnings
from .settings import *
from .notebooks import notebooks
@@ -78,6 +76,9 @@ def make_initial_parser():
'+- 2147483645 (negative value increases '
'priority)')
parser.add_argument('--request-time', type=str, default='Now',
help='Time of request to process notebook. Iso format')
parser.add_argument_group('required arguments')
parser.add_argument('--reservation', type=str, default="")
@@ -526,9 +527,18 @@ def create_finalize_script(fmt_args, temp_path, job_list):
module load texlive
echo 'Running finalize script'
python3 -c "from xfel_calibrate.finalize import finalize;
finalize({{joblist}}, $1, '{{run_path}}', '{{out_path}}',
'{{project}}', '{{calibration}}', '{{author}}',
'{{version}}', '{{report_to}}', '{{in_folder}}' )"
finalize(joblist={{joblist}},
finaljob=$1,
run_path='{{run_path}}',
out_path='{{out_path}}',
project='{{project}}',
calibration='{{calibration}}',
author='{{author}}',
version='{{version}}',
report_to='{{report_to}}',
data_path='{{in_folder}}',
request_time='{{request_time}}',
submission_time='{{submission_time}}')"
''')
@@ -543,6 +553,20 @@ def create_finalize_script(fmt_args, temp_path, job_list):
os.chmod(f_name, all_stats)
def save_executed_command(run_tmp_path, version):
"""
Create a file with string used to execute `xfel_calibrate`
:param run_tmp_path: path to temporary directory for running job outputs
:parm version: git version of the pycalibration package
"""
f_name = "{}/run_calibrate.sh".format(run_tmp_path)
with open(f_name, "w") as finfile:
finfile.write(f'# pycalibration version: {version}\n')
finfile.write(' '.join(sys.argv))
def get_launcher_command(args, temp_path, dependent, job_list):
"""
Return a slurm launcher command
@@ -671,13 +695,12 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None,
return jobid
def make_par_table(parms, temp_path, run_uuid):
def make_par_table(parms, run_tmp_path):
"""
Create a table with input parameters if the notebook
Create a table with input parameters of the notebook
:param parms: parameters of the notebook
:param temp_path: path to temporary directory for job outputs
:param run_uuid: inset of folder name containing job output
:param run_tmp_path: path to temporary directory for running job outputs
"""
# Add space in long strings without line breakers ` ,-/` to
@@ -733,7 +756,7 @@ def make_par_table(parms, temp_path, run_uuid):
\end{longtable}
''')
f_name = "{}/slurm_tmp_{}/InputParameters.rst".format(temp_path, run_uuid)
f_name = "{}/InputParameters.rst".format(run_tmp_path)
with open(f_name, "w") as finfile:
finfile.write(textwrap.dedent(tmpl.render(p=col_type, lines=l_parms)))
@@ -803,8 +826,10 @@ def run():
version = ""
title = title.rstrip()
if detector not in title:
title = f"{detector} {title}"
run_uuid = uuid4()
run_uuid = f"t{datetime.now().strftime('%y%m%d_%H%M%S')}"
# check if concurrency parameter is given and we run concurrently
if not has_parm(parms, concurrency["parameter"]) and concurrency["parameter"] is not None:
@@ -820,24 +845,19 @@ def run():
cluster_profile = "slurm_prof_{}".format(run_uuid)
# create a temporary output directory to work in
run_tmp_path = "{}/slurm_tmp_{}".format(temp_path, run_uuid)
run_tmp_path = "{}/slurm_out_{}_{}_{}".format(temp_path, detector, caltype, run_uuid)
os.makedirs(run_tmp_path)
# Write all input parameters to rst file to be included to final report
parms = parameter_values(parms, **args)
make_par_table(parms, temp_path, run_uuid)
make_par_table(parms, run_tmp_path)
save_executed_command(run_tmp_path, version)
# wait on all jobs to run and then finalize the run by creating a report from the notebooks
out_path = "{}/{}/{}/{}".format(report_path, detector.upper(),
caltype.upper(), datetime.now().isoformat())
if try_report_to_output:
if "out_folder" in args:
if "run" in args and str(args["run"]) not in args["out_folder"]:
rr = args["run"]
# Only put run, when it is not in the out_folder
if isinstance(rr, int):
rr = "r{:04d}".format(rr)
args["out_folder"] = "{}/{}/".format(args["out_folder"], rr)
out_path = os.path.abspath(args["out_folder"])
else:
print("No 'out_folder' defined as argument, outputting to '{}' instead.".format(
@@ -851,6 +871,12 @@ def run():
folder = get_par_attr(parms, 'in_folder', 'value', '')
if args["request_time"] == "Now":
request_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
else:
request_time = args["request_time"]
submission_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
fmt_args = {'run_path': run_tmp_path,
'out_path': out_path,
'project': title,
@@ -858,7 +884,10 @@ def run():
'author': author,
'version': version,
'report_to': report_to,
'in_folder': folder}
'in_folder': folder,
'request_time': request_time,
'submission_time': submission_time
}
joblist = []
if concurrency.get("parameter", None) is None:
@@ -879,8 +908,8 @@ def run():
cluster_cores = concurrency.get("cluster cores", 8)
con_func = concurrency.get("use function", None)
if cvals is None:
# Consider [-1] as None
if cvals is None or cvals == [-1]:
defcval = concurrency.get("default concurrency", None)
if defcval is not None:
print("Concurrency parameter '{}' is taken from notebooks.py".format(cvar))
Loading