Skip to content
Snippets Groups Projects
Commit a4494f37 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

[CORRECT] FIX: Consider list of karabo_da and test_balance_sequences

parent 335796a5
No related branches found
No related tags found
No related merge requests found
import pytest
from xfel_calibrate.calibrate import balance_sequences
def test_balance_sequences():
ret = balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[0, 2, 5, 10, 20, 50, 100],
sequences_per_node=1, karabo_da=["all"],
max_nodes=8)
expected = [[0], [2]]
assert expected == ret
ret = balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[-1],
sequences_per_node=1, karabo_da=["JNGFR01"],
max_nodes=3)
expected = []
assert expected == ret
with pytest.raises(ValueError) as e:
balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[1991, 2021],
sequences_per_node=1, karabo_da=["all"],
max_nodes=3)
assert 'Selected sequences [1991, 2021]]' in e.value()
with pytest.raises(ValueError) as e:
balance_sequences(in_folder="/gpfs/exfel/exp/CALLAB/202031/p900113/raw", # noqa
run=9992, sequences=[1991, 2021],
sequences_per_node=1, karabo_da=-1,
max_nodes=3)
assert 'karabo_da as a string or list' in e.value()
\ No newline at end of file
...@@ -2,6 +2,7 @@ ...@@ -2,6 +2,7 @@
import argparse import argparse
import inspect import inspect
import math
import os import os
import pprint import pprint
import re import re
...@@ -12,10 +13,12 @@ import warnings ...@@ -12,10 +13,12 @@ import warnings
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from subprocess import DEVNULL, check_output from subprocess import DEVNULL, check_output
from typing import List, Union
import cal_tools.tools import cal_tools.tools
import nbconvert import nbconvert
import nbformat import nbformat
import numpy as np
from jinja2 import Template from jinja2 import Template
from nbparameterise import (extract_parameters, parameter_values, from nbparameterise import (extract_parameters, parameter_values,
replace_definitions) replace_definitions)
...@@ -255,44 +258,64 @@ def get_notebook_function(nb, fname): ...@@ -255,44 +258,64 @@ def get_notebook_function(nb, fname):
return None return None
def balance_sequences(in_folder, run, sequences, sequences_per_node, def balance_sequences(in_folder: str, run: int, sequences: List[int],
path_inset, max_nodes=8): sequences_per_node: int, karabo_da: Union[list, str],
max_nodes: int = 8):
"""Return balance list of sequences to be executed on slurm nodes
Total list of sequences is splitted onto several nodes based on
sequences_per_node. If the number of the required nodes is more than
the max_nodes, the number of sequences_per_node is adjusted.
:param in_folder: Path to the input raw data without the run number.
:param run: Run number.
:param sequences: List of sequences. [-1] for obtaining all.
:param sequences_per_node: Number of sequences to process per a node.
:param karabo_da: Karabo data aggregator used as data file inset.
:param max_nodes: Maximum number of maxwell nodes to use.
:return: Balanced list of sequences.
""" """
Return balance list of sequences to be executed on slurm nodes # TODO: some small detector notebooks have karabo_da as a list.
Total list of sequences is splitted to several nodes, with a number of # remove this str check after unifying the expected type across
sequences per node defined in the input parameter. if number # correction notebooks.
of required nodes is more than max_nodes, the number of sequences_per_node if isinstance(karabo_da, str):
will be increased to run on all on max_nodes. karabo_da = [karabo_da]
elif not isinstance(karabo_da, list):
:param in_folder: Path to raw data raise ValueError("Balance sequences expects "
:param run: Run number "karabo_da as a string or list.")
:param sequences: List of sequences
:param sequences_per_node: Number of sequences per node in_path = Path(in_folder, f"r{run:04d}")
:param path_inset: Inset of path to data file
:param max_nodes: Maximum number of Maxwell nodes to use # TODO: remove ["-1"] after karabo_da refactor
:return: Balanced list of list of sequences if karabo_da in [["-1"], ["all"]]:
""" karabo_da = [""]
import glob
# Get all possible sequences for the selected karabo_da
import numpy as np sequence_files = []
if sequences[0] == -1: for k_da in karabo_da:
path = os.path.join(in_folder, f"r{run:04d}", f"*{path_inset}-S*.h5") sequence_files.extend(in_path.glob(f"*{k_da}-S*.h5"))
sequence_files = glob.glob(path)
seq_nums = set() # Extract sequences from input files.
for sf in sequence_files: seq_nums = set([int(sf.stem[-5:]) for sf in sequence_files])
seqnum = re.findall(r".*-S([0-9]*).h5", sf)[0]
seq_nums.add(int(seqnum)) # Validate selected sequences with sequences in in_folder
seq_nums -= set(sequences) if sequences != [-1]:
else: seq_nums = sorted(seq_nums.intersection(sequences))
seq_nums = set(sequences) if len(seq_nums) == 0:
nsplits = len(seq_nums) // sequences_per_node + 1 raise ValueError(f"Selected sequences {sequences} are not "
while nsplits > max_nodes: f"available in {in_path}")
sequences_per_node += 1
nsplits = len(seq_nums) // sequences_per_node + 1 # Validate required nodes with max_nodes
print("Changed to {} sequences per node".format(sequences_per_node)) nsplits = len(seq_nums) // sequences_per_node
if nsplits > max_nodes:
sequences_per_node = math.ceil(len(seq_nums)/max_nodes)
nsplits = max_nodes
print(f"Changed to {sequences_per_node} sequences per node")
print(f"to have a maximum of {max_nodes} concurrent jobs") print(f"to have a maximum of {max_nodes} concurrent jobs")
return [l.tolist() for l in np.array_split(list(seq_nums), nsplits) if elif nsplits == 0:
l.size > 0] nsplits = 1
return [l.tolist() for l in np.array_split(list(seq_nums), nsplits)
if l.size > 0]
def make_extended_parser() -> argparse.ArgumentParser: def make_extended_parser() -> argparse.ArgumentParser:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment