diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index 26d0d8a3c7b313d5dc44cab144859957030f9d29..5d7fc8eac9eb334b9d7845fb83bd0fa34791e51f 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -65,6 +65,12 @@ The configuration is to be given in form of a python directory:: "default concurrency": 16, "cluster cores": 16}, }, + "CORRECT": { + "notebook": "notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb", + "concurrency": {"parameter": "sequences", + "use function": "balance_sequences", + "default concurrency": [-1], + "cluster cores": 32}, ... } } @@ -83,3 +89,42 @@ be derived e.g. by profiling memory usage per core, run times, etc. It is good practice to name command line enabled notebooks with an `_NBC` suffix as shown in the above example. + +The `CORRECT` notebook (last notebook in the example) makes use of a concurrency generating function +by setting the `use function` parameter. This function must be defined in a code cell in the notebook, +its parameters should be named like other exposed parameters. It should return a list of of parameters +to be inserted into the concurrently run notebooks. The example given e.g. defines the `balance_sequences` +function:: + + def balance_sequences(in_folder, run, sequences, sequences_per_node): + import glob + import re + import numpy as np + if sequences_per_node != 0: + sequence_files = glob.glob("{}/r{:04d}/*-S*.h5".format(in_folder, run)) + seq_nums = set() + for sf in sequence_files: + seqnum = re.findall(r".*-S([0-9]*).h5", sf)[0] + seq_nums.add(int(seqnum)) + seq_nums -= set(sequences) + return [l.tolist() for l in np.array_split(list(seq_nums), + len(seq_nums)//sequences_per_node+1)] + else: + return sequences + + +.. note:: + + Note how imports are inlined in the definition. This is necessary, as only the function code, + not the entire notebook is executed. + +which requires as exposed parameters e.g. :: + + in_folder = "/gpfs/exfel/exp/SPB/201701/p002038/raw/" # the folder to read data from, required + run = 239 # runs to process, required + sequences = [-1] # sequences to correct, set to -1 for all, range allowed + sequences_per_node = 2 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel + +.. note:: + + The function only needs to be defined, but not executed within the notebook context itself. diff --git a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb index 00e25888b555d5d0c9345933be705005285fabd5..28cc07ebf869b4f669cb4d8c45baf196e02ac09f 100644 --- a/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb +++ b/notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb @@ -37,7 +37,25 @@ "zero_below_nsigma = 3. # zero data below n sigma noise for higher compression, set to 0 to not zero data\n", "compress_data = \"lzf\" # compression algorithm to use for data compression, set to \"None\" to disable\n", "compress_metadata = \"lzf\" # compression algorithm to use for metadata compression, set to \"None\" to disable\n", - "use_dir_creation_date = True # use the creation data of the input dir for database queries" + "use_dir_creation_date = True # use the creation data of the input dir for database queries\n", + "sequences_per_node = 2 # number of sequence files per cluster node if run as slurm job, set to 0 to not run SLURM parallel\n", + "\n", + "def balance_sequences(in_folder, run, sequences, sequences_per_node):\n", + " import glob\n", + " import re\n", + " import numpy as np\n", + " if sequences_per_node != 0:\n", + " sequence_files = glob.glob(\"{}/r{:04d}/*-S*.h5\".format(in_folder, run))\n", + " seq_nums = set()\n", + " for sf in sequence_files:\n", + " seqnum = re.findall(r\".*-S([0-9]*).h5\", sf)[0]\n", + " seq_nums.add(int(seqnum))\n", + " seq_nums -= set(sequences)\n", + " return [l.tolist() for l in np.array_split(list(seq_nums),\n", + " len(seq_nums)//sequences_per_node+1)]\n", + " else:\n", + " return sequences\n", + " " ] }, { diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index baa4cdff5f792bd5961e0407e8807d6983f8c333..c5d7f673bd77d0a420c042023893c60dc8b27f91 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -229,6 +229,10 @@ def has_parm(parms, name): return False +def flatten_list(l): + return "_".join([str(flatten_list(v)) for v in l]) if isinstance(l, list) else l + + def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, final_job=False, job_list=[], fmtcmd="", cluster_cores=8, sequential=False): @@ -238,7 +242,7 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, if cparm is not None: args[cparm] = cval - suffix = "_".join([str(v) for v in cval]) if isinstance(cval, list) else cval + suffix = flatten_list(cval) if "cluster_profile" in args: args["cluster_profile"] = "{}_{}".format(args["cluster_profile"], suffix) @@ -294,7 +298,36 @@ def concurrent_run(temp_path, nb, nbname, args, cparm=None, cval=None, jobid = line.split(" ")[3] print("Submitted job: {}".format(jobid)) return jobid + + +def get_concurrency_function(nb, fname): + import re + flines = [] + def_found = False + indent = None + for cell in nb.cells: + if cell.cell_type == 'code': + lines = cell.source.split("\n") + for line in lines: + if def_found: + lin = len(line) - len(line.lstrip()) + if indent is None: + if lin != 0: + indent = lin + flines.append(line) + elif lin >= indent: + flines.append(line) + else: + return "\n".join(flines) + + if re.search(r"def\s+{}\(.*\):\s*".format(fname), line): + print("Found {} in line {}".format(fname, line)) + # set this to indent level + def_found = True + flines.append(line) + return None + def run(): """ Run a calibration task with parser arguments """ @@ -380,14 +413,44 @@ def run(): cvals = args.get(cvar, None) cluster_cores = concurrency.get("cluster cores", 8) + con_func = concurrency.get("use function", None) + if cvals is None: - cvals = range(concurrency["default concurrency"]) + defcval = concurrency.get("default concurrency", None) + if not isinstance(defcval, (list, tuple)): + cvals = range(defcval) + else: + cvals = defcval + + if con_func: + func = get_concurrency_function(nb, con_func) + if func is None: + warnings.warn("Didn't find concurrency function {} in notebook".format(con_func), + RuntimeWarning) + + else: + df = {} + exec(func, df) + f = df[con_func] + import inspect + sig = inspect.signature(f) + callargs = [] + if cvals: + + args[cvar] = cvals # in case default needs to be used for function call + for arg in sig.parameters: + callargs.append(args[arg]) + cvals = f(*callargs) + print("Split concurrency into {}".format(cvals)) for cnum, cval in enumerate(cvals): jobid = concurrent_run(run_tmp_path, nb, notebook, args, - cvar, [cval,], cnum==len(list(cvals))-1, joblist, fmtcmd, + cvar, [cval,] if not isinstance(cval, list) else cval, + cnum==len(list(cvals))-1, joblist, fmtcmd, cluster_cores=cluster_cores, sequential=sequential) joblist.append(jobid) + + print("Submitted the following SLURM jobs: {}".format(",".join(joblist))) diff --git a/xfel_calibrate/notebooks.py b/xfel_calibrate/notebooks.py index 8ef1d0d06497c76b5db3eacf292aadf4b95849bf..1542762b561742224893e265a8c1dce02573f2e8 100644 --- a/xfel_calibrate/notebooks.py +++ b/xfel_calibrate/notebooks.py @@ -22,8 +22,9 @@ notebooks = { }, "CORRECT": { "notebook": "notebooks/AGIPD/AGIPD_Correct_and_Verify.ipynb", - "concurrency": {"parameter": None, - "default concurrency": None, + "concurrency": {"parameter": "sequences", + "use function": "balance_sequences", + "default concurrency": [-1], "cluster cores": 32}, }, "COMBINE": {