Skip to content
Snippets Groups Projects
Commit d5f8c5f8 authored by Karim Ahmed's avatar Karim Ahmed
Browse files

Add no-cluster-job

- Use no-cluster-job by default in xfel-calibrate CL
- Introduce --use-slurm to use Slurm for xfel-calibrate
- small refactors e.g. isort
- separate code in functions to reduce main pytest function.
parent a606b97b
No related branches found
No related tags found
1 merge request!610Add a pytest to run a dict of CALLAB test runs before releases
......@@ -36,8 +36,10 @@ def pytest_addoption(parser):
"--validation-only",
action="store_true",
default=False,
help="Skips tests for numerical validation for produced h5files.",
help=("Skips running xfel-calibrate CLI and "
"apply validation test on numerical data only."),
)
parser.addoption(
"--find-difference",
action="store_true",
......@@ -46,6 +48,15 @@ def pytest_addoption(parser):
"In case of non numerical validation of h5file. "
"Find the different attribute and fail on the first one."),
)
parser.addoption(
"--use-slurm",
action="store_true",
default=True,
help=("Run xfel-calibrate CLI without "
"--no-cluster-job option and use Slurm"),
)
parser.addoption(
"--picked-test",
type=str,
......@@ -62,6 +73,7 @@ def pytest_addoption(parser):
type=str,
)
@pytest.fixture
def release_test_config(request):
detectors = request.config.getoption("--detectors")
......@@ -72,13 +84,15 @@ def release_test_config(request):
"--validation-only")
find_difference = request.config.getoption(
"--find-difference")
use_slurm = request.config.getoption(
"--use-slurm")
picked_test = request.config.getoption("--picked-test")
reference_folder = request.config.getoption("--reference-folder")
out_folder = request.config.getoption("--out-folder")
return (
detectors, calibration, picked_test,
skip_numerical_validation, validate_only,
find_difference, reference_folder,
find_difference, use_slurm, reference_folder,
out_folder,
)
......
import hashlib
import io
import logging
import multiprocessing
import pathlib
import tempfile
......@@ -11,14 +12,12 @@ from subprocess import PIPE, run
from typing import Any, Dict, List, Tuple
import h5py
import pytest
import numpy as np
import pytest
import xfel_calibrate.calibrate as calibrate
import gc
from .callab_tests import automated_test_config
import logging
from .callab_tests import automated_test_config
LOGGER = logging.getLogger(__name__)
......@@ -268,6 +267,118 @@ def parse_config(
return cmd
def validate_hdf5_files(
test_key: str,
out_folder: pathlib.Path,
reference_folder: pathlib.Path,
cal_type: str,
find_difference: bool
):
"""Apply HDF5 data validation.
Args:
test_key (str): The test name.
out_folder (pathlib.Path): The OUT folder for the tested data.
reference_folder (pathlib.Path): The Reference folder for
the reference data to validate against
cal_type (str): The type of calibration processing.
e.g. dark or correct.
find_difference (bool): A flag indicating a need to find the
difference between two files if tested data was
not identical to the reference data.
"""
# 3rd Check number of produced h5 files.
h5files = list(out_folder.glob("*.h5"))
expected_h5files = list(reference_folder.glob("*.h5"))
assert len(h5files) == len(expected_h5files), f"{test_key} failure, number of files are not as expected." # noqa
LOGGER.info(f"{test_key}'s calibration h5files numbers are as expected.")
non_valid_files = []
# Hard coded datasets to exclude from numerical validation.
# These datasets are know to be updated everytime.
if cal_type.lower() == "correct":
exclude_attrs = ["METADATA/creationDate", "METADATA/updateDate"]
else:
exclude_attrs = ["report"]
# 4th check that test and reference h5files are identical.
_validate_files = partial(
validate_files,
reference_folder,
out_folder,
exclude_attrs,
)
with multiprocessing.pool.ThreadPool(processes=8) as executor:
result = executor.map(_validate_files, h5files)
# Collect non-valid files, if any, to display them in the error message.
for valid, file in result:
if not valid:
non_valid_files.append(file)
if len(non_valid_files) > 0:
if find_difference:
LOGGER.error(f"Found non valid files: {non_valid_files}. "
f"Checking differences for {non_valid_files[0]}")
find_differences(
non_valid_files[0],
reference_folder / non_valid_files[0].name,
exclude_attrs
)
LOGGER.info(f"No difference found for {non_valid_files[0]}")
else:
assert len(non_valid_files) == 0, f"{test_key} failure, while validating metadata for {non_valid_files}" # noqa
LOGGER.info(f"{test_key}'s calibration h5files"
" are validated successfully.")
def slurm_watcher(
test_key: str,
std_out: str
):
"""
Watch for submitted slurm jobs and wait for them to finish.
After they finish apply first test and check
if they were `COMPLETED`, successfully.
Args:
test_key (str): Test name.
out_str (str): xfel-calibrate CLU std output.
"""
slurm_watcher = True
LOGGER.info(f"{test_key} - xfel-calibrate std out: {std_out}")
for r in std_out.split("\n"):
if "Submitted the following SLURM jobs:" in r:
_, jobids = r.split(":")
# Adding a sleep for the slurm jobs initialization
time.sleep(len(jobids.split(",")))
jobids = jobids.strip()
while slurm_watcher:
cmd = ["sacct", "-j", jobids, "--format=state"]
res = run(cmd, stdout=PIPE)
states = res.stdout.decode().split("\n")[2:-1]
if not any(s.strip() in [
"COMPLETING",
"RUNNING",
"CONFIGURING",
"PENDING",
] for s in states):
slurm_watcher = False
else:
time.sleep(2)
# 1st check that all jobs were COMPLETED without errors.
states = res.stdout.decode().split("\n")[2:-1]
assert all(s.strip() == "COMPLETED" for s in states), f"{test_key} failure, calibration jobs were not completed. {jobids}: {states}" # noqa
LOGGER.info(f"{test_key}'s jobs were COMPLETED")
time.sleep(1.0)
@pytest.mark.manual_run
@pytest.mark.parametrize(
"test_key, val_dict",
......@@ -290,7 +401,7 @@ def test_xfel_calibrate(
(
detectors, calibration, picked_test,
skip_numerical_validation, only_validate, find_difference,
reference_dir_base, out_dir_base,
use_slurm, reference_dir_base, out_dir_base,
) = release_test_config
cal_type = val_dict["cal_type"]
......@@ -312,6 +423,7 @@ def test_xfel_calibrate(
pytest.skip()
cmd = ["xfel-calibrate", det_type, cal_type]
cal_conf = val_dict["config"]
report_name = f"{test_key}_{datetime.now().strftime('%y%m%d_%H%M%S')}"
cal_conf["report-to"] = report_name
......@@ -323,92 +435,28 @@ def test_xfel_calibrate(
reference_folder = pathlib.Path(val_dict["reference-folder"].format(
reference_dir_base, cal_conf["karabo-id"], test_key))
def validate_files_now():
# 3rd Check number of produced h5 files.
h5files = list(out_folder.glob("*.h5"))
expected_h5files = list(reference_folder.glob("*.h5"))
assert len(h5files) == len(expected_h5files), f"{test_key} failure, number of files are not as expected." # noqa
LOGGER.info(f"{test_key}'s calibration h5files numbers are as expected.")
non_valid_files = []
# Hard coded datasets to exclude from numerical validation.
# These datasets are know to be updated everytime.
if cal_type.lower() == "correct":
exclude_attrs = ["METADATA/creationDate", "METADATA/updateDate"]
else:
exclude_attrs = ["report"]
# 4th check that test and reference h5files are identical.
_validate_files = partial(
validate_files,
reference_folder,
out_folder,
exclude_attrs,
)
with multiprocessing.pool.ThreadPool(processes=8) as executor:
result = executor.map(_validate_files, h5files)
# Collect non-valid files, if any, to display them in the error message.
for valid, file in result:
if not valid:
non_valid_files.append(file)
if len(non_valid_files) > 0:
if find_difference:
LOGGER.error(f"Found non valid files: {non_valid_files}. "
f"Checking differences for {non_valid_files[0]}")
find_differences(
non_valid_files[0],
reference_folder / non_valid_files[0].name,
exclude_attrs
)
LOGGER.info(f"No difference found for {non_valid_files[0]}")
else:
assert len(non_valid_files) == 0, f"{test_key} failure, while validating metadata for {non_valid_files}" # noqa
LOGGER.info(f"{test_key}'s calibration h5files"
" are validated successfully.")
if only_validate:
validate_files_now()
validate_hdf5_files(
test_key,
out_folder,
reference_folder,
cal_type,
find_difference,
)
return
if not use_slurm: # e.g. for Gitlab CI.
cmd += ["--no-cluster-job"]
cmd += ["--slurm-name", test_key]
f = io.StringIO()
LOGGER.info(f"Submitting CL: {cmd}")
with redirect_stdout(f):
calibrate.run(cmd)
out_str = f.getvalue()
slurm_watcher = True
for r in out_str.split("\n"):
if "Submitted the following SLURM jobs:" in r:
_, jobids = r.split(":")
# Adding a sleep for the slurm jobs initialization
time.sleep(len(jobids.split(",")))
jobids = jobids.strip()
while slurm_watcher:
cmd = ["sacct", "-j", jobids, "--format=state"]
res = run(cmd, stdout=PIPE)
states = res.stdout.decode().split("\n")[2:-1]
if not any(s.strip() in [
"COMPLETING",
"RUNNING",
"CONFIGURING",
"PENDING",
] for s in states):
slurm_watcher = False
else:
time.sleep(2)
# 1st check that all jobs were COMPLETED without errors.
states = res.stdout.decode().split("\n")[2:-1]
assert all(s.strip() == "COMPLETED" for s in states), f"{test_key} failure, calibration jobs were not completed. {jobids}: {states}" # noqa
LOGGER.info(f"{test_key}'s jobs were COMPLETED")
time.sleep(1.0)
if use_slurm:
slurm_watcher(test_key, out_str)
# 2nd check for report availability.
report_file = out_folder / f"{report_name}.pdf"
......@@ -417,4 +465,10 @@ def test_xfel_calibrate(
# Stop tests at this point, if desired.
if not skip_numerical_validation:
validate_files_now()
validate_hdf5_files(
test_key,
out_folder,
reference_folder,
cal_type,
find_difference,
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment