diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index baa4cdff5f792bd5961e0407e8807d6983f8c333..67e79a995a1c52f1dafb1ae5cbee95ca0df2d58d 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -229,6 +229,10 @@ def has_parm(parms, name): return False +def flatten_list(l): + return "_".join([str(flatten_list(v)) for v in l]) if isinstance(l, list) else l + + def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, final_job=False, job_list=[], fmtcmd="", cluster_cores=8, sequential=False): @@ -238,7 +242,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, if cparm is not None: args[cparm] = cval - suffix = "_".join([str(v) for v in cval]) if isinstance(cval, list) else cval + suffix = flatten_list(cval) if "cluster_profile" in args: args["cluster_profile"] = "{}_{}".format(args["cluster_profile"], suffix) @@ -265,9 +269,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, os.chmod("{}/finalize.sh".format(temp_path), all_stats) # then run an sbatch job if not sequential: - srun_base = ["sbatch", "-p", slurm_partion, "-t", "24:00:00", - "--mem", "500G", "--mail-type", "END", "--requeue", - "--output", "{}/slurm-%j.out".format(temp_path)] + srun_base = launcher_command.format(temp_path=temp_path).split() else: srun_base = [] @@ -289,12 +291,42 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, output = check_output(srun_base).decode('utf8') jobid = None - for line in output.split("\n"): - if "Submitted batch job " in line: - jobid = line.split(" ")[3] - print("Submitted job: {}".format(jobid)) + if not sequential: + for line in output.split("\n"): + if "Submitted batch job " in line: + jobid = line.split(" ")[3] + print("Submitted job: {}".format(jobid)) return jobid + + +def get_concurrency_function(nb, fname): + import re + flines = [] + def_found = False + indent = None + for cell in nb.cells: + if cell.cell_type == 'code': + lines = cell.source.split("\n") + for line in lines: + if def_found: + lin = len(line) - len(line.lstrip()) + if indent is None: + if lin != 0: + indent = lin + flines.append(line) + elif lin >= indent: + flines.append(line) + else: + return "\n".join(flines) + + if re.search(r"def\s+{}\(.*\):\s*".format(fname), line): + print("Found {} in line {}".format(fname, line)) + # set this to indent level + def_found = True + flines.append(line) + return None + def run(): """ Run a calibration task with parser arguments """ @@ -309,6 +341,7 @@ def run(): try: notebook = notebooks[detector][caltype]["notebook"] + notebook = os.path.abspath("{}/{}".format(os.path.dirname(__file__), notebook)) concurrency = notebooks[detector][caltype].get("concurrency", None) version = notebooks[detector][caltype].get("version", "NA") author = notebooks[detector][caltype].get("author", "anonymous") @@ -380,16 +413,46 @@ def run(): cvals = args.get(cvar, None) cluster_cores = concurrency.get("cluster cores", 8) + con_func = concurrency.get("use function", None) + if cvals is None: - cvals = range(concurrency["default concurrency"]) + defcval = concurrency.get("default concurrency", None) + if not isinstance(defcval, (list, tuple)): + cvals = range(defcval) + else: + cvals = defcval + + if con_func: + func = get_concurrency_function(nb, con_func) + if func is None: + warnings.warn("Didn't find concurrency function {} in notebook".format(con_func), + RuntimeWarning) + + else: + df = {} + exec(func, df) + f = df[con_func] + import inspect + sig = inspect.signature(f) + callargs = [] + if cvals: + + args[cvar] = cvals # in case default needs to be used for function call + for arg in sig.parameters: + callargs.append(args[arg]) + cvals = f(*callargs) + print("Split concurrency into {}".format(cvals)) for cnum, cval in enumerate(cvals): jobid = concurrent_run(run_tmp_path, nb, notebook, args, - cvar, [cval,], cnum==len(list(cvals))-1, joblist, fmtcmd, + cvar, [cval,] if not isinstance(cval, list) else cval, + cnum==len(list(cvals))-1, joblist, fmtcmd, cluster_cores=cluster_cores, sequential=sequential) joblist.append(jobid) - - print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) + + + if not all([j is None for j in joblist]): + print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) if __name__ == "__main__": diff --git a/xfel_calibrate/notebooks.py b/xfel_calibrate/notebooks.py index 182fbffb6d5effc8f5b68285621838bf32896c32..972d6df66269b4596e5821b227a99492b8eb8298 100644 --- a/xfel_calibrate/notebooks.py +++ b/xfel_calibrate/notebooks.py @@ -12,7 +12,7 @@ notebooks = { "notebook": "notebooks/AGIPD/Chracterize_AGIPD_Gain_PC_NBC.ipynb", "concurrency": {"parameter": "modules", "default concurrency": 16, - "cluster cores": 16}, + "cluster cores": 32}, }, "FF": { "notebook": "notebooks/AGIPD/Characterize_AGIPD_Gain_FlatFields_NBC.ipynb", @@ -74,15 +74,6 @@ notebooks = { "default concurrency": None, "cluster cores": 32}, }, - }, - "GENERIC": { - "DBTOH5": { - "notebook": "notebooks/generic/DB_Constants_to_HDF5_NBC.ipynb", - "concurrency": {"parameter": None, - "default concurrency": None, - "cluster cores": 32}, - "extend parms": "extend_parms", - }, } } \ No newline at end of file