diff --git a/tests/test_calibrate.py b/tests/test_calibrate.py new file mode 100644 index 0000000000000000000000000000000000000000..6b3d50b54588bbf48805cfad5619e06c54c5a240 --- /dev/null +++ b/tests/test_calibrate.py @@ -0,0 +1,35 @@ +import pytest + +from xfel_calibrate.calibrate import balance_sequences + + +def test_balance_sequences(): + + ret = balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa + run=9992, sequences=[0, 2, 5, 10, 20, 50, 100], + sequences_per_node=1, karabo_da=["all"], + max_nodes=8) + + expected = [[0], [2]] + assert expected == ret + + ret = balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa + run=9992, sequences=[-1], + sequences_per_node=1, karabo_da=["JNGFR01"], + max_nodes=3) + expected = [] + assert expected == ret + + with pytest.raises(ValueError) as e: + balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa + run=9992, sequences=[1991, 2021], + sequences_per_node=1, karabo_da=["all"], + max_nodes=3) + assert 'Selected sequences [1991, 2021]]' in e.value() + + with pytest.raises(ValueError) as e: + balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa + run=9992, sequences=[1991, 2021], + sequences_per_node=1, karabo_da=-1, + max_nodes=3) + assert 'karabo_da as a string or list' in e.value() \ No newline at end of file diff --git a/xfel_calibrate/calibrate.py b/xfel_calibrate/calibrate.py index 6289bdd514a72cc80493577665a28803e35b90f3..249556be9e61526bede5057ed3b1c330c104ee18 100755 --- a/xfel_calibrate/calibrate.py +++ b/xfel_calibrate/calibrate.py @@ -2,6 +2,7 @@ import argparse import inspect +import math import os import pprint import re @@ -12,10 +13,12 @@ import warnings from datetime import datetime from pathlib import Path from subprocess import DEVNULL, check_output +from typing import List, Union import cal_tools.tools import nbconvert import nbformat +import numpy as np from jinja2 import Template from nbparameterise import (extract_parameters, parameter_values, replace_definitions) @@ -255,44 +258,64 @@ def get_notebook_function(nb, fname): return None -def balance_sequences(in_folder, run, sequences, sequences_per_node, - path_inset, max_nodes=8): +def balance_sequences(in_folder: str, run: int, sequences: List[int], + sequences_per_node: int, karabo_da: Union[list, str], + max_nodes: int = 8): + """Return balance list of sequences to be executed on slurm nodes + Total list of sequences is splitted onto several nodes based on + sequences_per_node. If the number of the required nodes is more than + the max_nodes, the number of sequences_per_node is adjusted. + + :param in_folder: Path to the input raw data without the run number. + :param run: Run number. + :param sequences: List of sequences. [-1] for obtaining all. + :param sequences_per_node: Number of sequences to process per a node. + :param karabo_da: Karabo data aggregator used as data file inset. + :param max_nodes: Maximum number of maxwell nodes to use. + :return: Balanced list of sequences. """ - Return balance list of sequences to be executed on slurm nodes - Total list of sequences is splitted to several nodes, with a number of - sequences per node defined in the input parameter. if number - of required nodes is more than max_nodes, the number of sequences_per_node - will be increased to run on all on max_nodes. - - :param in_folder: Path to raw data - :param run: Run number - :param sequences: List of sequences - :param sequences_per_node: Number of sequences per node - :param path_inset: Inset of path to data file - :param max_nodes: Maximum number of Maxwell nodes to use - :return: Balanced list of list of sequences - """ - import glob - - import numpy as np - if sequences[0] == -1: - path = os.path.join(in_folder, f"r{run:04d}", f"*{path_inset}-S*.h5") - sequence_files = glob.glob(path) - seq_nums = set() - for sf in sequence_files: - seqnum = re.findall(r".*-S([0-9]*).h5", sf)[0] - seq_nums.add(int(seqnum)) - seq_nums -= set(sequences) - else: - seq_nums = set(sequences) - nsplits = len(seq_nums) // sequences_per_node + 1 - while nsplits > max_nodes: - sequences_per_node += 1 - nsplits = len(seq_nums) // sequences_per_node + 1 - print("Changed to {} sequences per node".format(sequences_per_node)) + # TODO: some small detector notebooks have karabo_da as a list. + # remove this str check after unifying the expected type across + # correction notebooks. + if isinstance(karabo_da, str): + karabo_da = [karabo_da] + elif not isinstance(karabo_da, list): + raise ValueError("Balance sequences expects " + "karabo_da as a string or list.") + + in_path = Path(in_folder, f"r{run:04d}") + + # TODO: remove ["-1"] after karabo_da refactor + if karabo_da in [["-1"], ["all"]]: + karabo_da = [""] + + # Get all possible sequences for the selected karabo_da + sequence_files = [] + for k_da in karabo_da: + sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5")) + + # Extract sequences from input files. + seq_nums = set([int(sf.stem[-5:]) for sf in sequence_files]) + + # Validate selected sequences with sequences in in_folder + if sequences != [-1]: + seq_nums = sorted(seq_nums.intersection(sequences)) + if len(seq_nums) == 0: + raise ValueError(f"Selected sequences {sequences} are not " + f"available in {in_path}") + + # Validate required nodes with max_nodes + nsplits = len(seq_nums) // sequences_per_node + if nsplits > max_nodes: + sequences_per_node = math.ceil(len(seq_nums)/max_nodes) + nsplits = max_nodes + print(f"Changed to {sequences_per_node} sequences per node") print(f"to have a maximum of {max_nodes} concurrent jobs") - return [l.tolist() for l in np.array_split(list(seq_nums), nsplits) if - l.size > 0] + elif nsplits == 0: + nsplits = 1 + + return [l.tolist() for l in np.array_split(list(seq_nums), nsplits) + if l.size > 0] def make_extended_parser() -> argparse.ArgumentParser: