Skip to content
Snippets Groups Projects
test_cli.py 13.64 KiB
# pylint: disable=missing-class-docstring, missing-function-docstring, no-self-use
"""Tests for the CLI portion of `xfel_calibrate`

These tests cover the CLI interface which is called by the `xfel-calibrate ...`
entrypoint. Some sections of the `calibrate.py` file are still not covered by
the current test cases, this should be improved later on.
"""

import ast
import shlex
import sys
from datetime import date
from pathlib import Path
from unittest import mock

import nbformat
import pytest
from nbparameterise import extract_parameters

import xfel_calibrate.calibrate as calibrate
from tests.test_xfel_calibrate.conftest import (
    CalibrateCall,
    FakeProcessCalibrate,
    MockProposal,
)


class TestBasicCalls:
    """Tests which only call the command line utility `xfel-calibrate` and check
    that the expected output is present in stdout
    """

    def test_help(self, capsys):
        with pytest.raises(SystemExit):
            calibrate.run(["xfel-calibrate", "--help"])

        out, err = capsys.readouterr()

        #  Should always be present in these help outputs
        assert "positional arguments:" in out
        assert "options:" in out

        assert err == ""

    def test_help_detector(self, capsys):
        with pytest.raises(SystemExit):
            calibrate.run(["xfel-calibrate", "TEST", "-h"])

        out, err = capsys.readouterr()

        assert "Notebook for use with the unit and continuous integration" in out
        assert "tests." in out

        assert err == ""

    def test_help_user_notebook(self, capsys):
        with pytest.raises(SystemExit):
            calibrate.run(["xfel-calibrate", "TEST", "-h"])

        out, err = capsys.readouterr()

        assert "TEST-USER-NB" in out
        assert "/{root}/test-cli.ipynb" in out

        assert err == ""

    def test_help_bad_config(self):
        with pytest.raises(KeyError):
            calibrate.run(["xfel-calibrate", "TEST-RAISES-ERRORS", "--help"])

    def test_unknown_detector(self, capsys):
        with pytest.raises(SystemExit) as exit_exception:
            calibrate.run(["xfel-calibrate", "NotADetector", "beep", "-h"])

        out, err = capsys.readouterr()

        assert exit_exception.value.code == 1

        assert "Not one of the known calibrations or detectors" in out

        assert err == ""

    def test_unknown_detector_h(self, capsys):
        with pytest.raises(SystemExit) as exit_exception:
            calibrate.run(["xfel-calibrate", "NotADetector", "-h"])

        out, err = capsys.readouterr()

        assert exit_exception.value.code == 1

        assert "Not one of the known detectors" in out

        assert err == ""

    def test_help_nb(self, capsys):
        with pytest.raises(SystemExit):
            calibrate.run(["xfel-calibrate", "Tutorial", "TEST", "--help"])

        out, err = capsys.readouterr()

        #  Should always be present in these help outputs
        assert "positional arguments:" in out
        assert "options:" in out

        #  Defined in the test notebook, should be propagated to help output
        assert "sensor-size" in out
        assert "random-seed" in out

        assert err == ""


class TestTutorialNotebook:
    """Checks calling `xfel-calibrate` on the `Tutorial TEST` notebook, looks
    at the stdout as well as files generated by the call
    """

    @pytest.fixture(scope="class", autouse=True)
    def fake_process_calibrate(self):
        with FakeProcessCalibrate() as fake_process:
            yield fake_process

    @pytest.fixture(scope="class")
    def mock_proposal(self, tmp_path_factory):
        return MockProposal(
            tmp_path=tmp_path_factory.mktemp("exp"),
            instrument="None",
            cycle="000000",
            proposal="p000000",
            runs=0,
            sequences=0,
        )

    @pytest.fixture(scope="function")
    def calibrate_call(
        self,
        mock_proposal: MockProposal,
        capsys,
        tmp_path,
    ):
        return CalibrateCall(
            tmp_path,
            capsys,
            out_folder=mock_proposal.path_proc,
            detector="Tutorial",
            cal_type="TEST",
            extra_args=["--runs", "1000"],
        )

    def test_call(
        self,
        calibrate_call: CalibrateCall,
    ):
        assert "sbatch" in calibrate_call.out
        assert "--job-name xfel_calibrate" in calibrate_call.out
        assert str(calibrate_call.reports_dir) in calibrate_call.out

        assert calibrate_call.err == ""

    def test_expected_processes_called(
        self,
        calibrate_call: CalibrateCall,
        fake_process_calibrate: FakeProcessCalibrate,
    ):
        process_calls = [
            list(shlex.shlex(p, posix=True, punctuation_chars=True))
            if isinstance(p, str)
            else p
            for p in fake_process_calibrate.calls
        ]

        processes_called = [p[0] for p in process_calls]  # List of the process names

        assert "sbatch" in processes_called

    @pytest.mark.skip(reason="not implemented")
    def test_output_metadata_yml(self):
        # TODO: Finish this test later, not a priority
        # metadata_yml_path = list(self.tmp_path.glob("**/calibration_metadata.yml"))
        pass

    def test_output_ipynb(self, calibrate_call: CalibrateCall):
        notebook_path = calibrate_call.paths.notebooks

        assert len(notebook_path) == 1

        with notebook_path[0].open() as file:
            notebook = nbformat.read(file, as_version=4)

        parameters = {p.name: p.value for p in extract_parameters(notebook)}

        assert parameters["out_folder"] == str(calibrate_call.out_folder)
        assert parameters["sensor_size"] == [10, 30]
        assert parameters["random_seed"] == [2345]
        assert parameters["runs"] == 1000

    def test_output_finalize(
        self, mock_proposal: MockProposal, calibrate_call: CalibrateCall
    ):
        # TODO: Specify `feature_version` once we run on python 3.8+
        finalize_ast = ast.parse(calibrate_call.paths.finalize.read_text())

        today = date.today()

        expected_equals = {
            "joblist": ["000000"],
            "version": "0.0.0",
            "data_path": "",
        }

        expected_contains = {
            "request_time": str(today),
            "submission_time": str(today),
            "cal_work_dir": str(calibrate_call.reports_dir),
            # TODO: add a test checking that the out folder is correct
            # reffer to: https://git.xfel.eu/gitlab/detectors/pycalibration/issues/52
            "out_path": str(mock_proposal.path_proc),
            "report_to": str(calibrate_call.reports_dir),
        }

        #  Pull the keyword arguments out of the finalize function call via the AST,
        #  here we use the keys in `expected_...` to filter which kwargs are parsed
        #  as some cannot be read
        finalize_kwargs = {
            k.arg: ast.literal_eval(k.value)
            for k in ast.walk(finalize_ast)
            if isinstance(k, ast.keyword)
            and (k.arg in expected_equals or k.arg in expected_contains)
        }

        for k, v in expected_equals.items():
            assert v == finalize_kwargs[k]

        for k, v in expected_contains.items():
            assert v in finalize_kwargs[k]

    @pytest.mark.skip(reason="not implemented")
    def test_output_rst(self, calibrate_call: CalibrateCall):
        # TODO: Finish this test later, not a priority
        # rst_path = calibrate_call.paths.InputParameters
        pass

    def test_output_sh(self, calibrate_call: CalibrateCall):
        cmd = list(
            shlex.shlex(
                calibrate_call.paths.run_calibrate.read_text(),
                posix=True,
                punctuation_chars=True,
            )
        )

        assert (
            cmd[0] == "xfel-calibrate"
        ), f"{calibrate_call.paths.run_calibrate} does not call `xfel-calibrate`"

        assert cmd[1:3] == ["Tutorial", "TEST"]
        assert {"--out-folder", str(calibrate_call.out_folder)}.issubset(cmd)
        assert {"--runs", "1000"}.issubset(cmd)


class TestIntelliList:
    @pytest.fixture(scope="class", autouse=True)
    def fake_process_calibrate(self):
        with FakeProcessCalibrate() as fake_process:
            yield fake_process

    @pytest.fixture(scope="class")
    def mock_proposal(self, tmpdir_factory):
        return MockProposal(
            tmp_path=Path(tmpdir_factory.mktemp("exp")),
            instrument="AGIPD",
            cycle="202031",
            proposal="p900113",
            runs=1,
            sequences=1,
        )

    @pytest.fixture(scope="function")
    def calibrate_call(self, mock_proposal: MockProposal, capsys, tmp_path):
        return CalibrateCall(
            tmp_path,
            capsys,
            in_folder=mock_proposal.path_raw,
            out_folder=mock_proposal.path_proc,
            detector="TEST",
            cal_type="TEST-CLI",
            extra_args=[
                "--number",
                "10",
                "--list-normal",
                "1,2,10",
                "--list-intellilist",
                "1,2,5-8",
                "--concurrency-parameter",
                "0,1",
            ],
        )

    def test_intellilist(self, calibrate_call: CalibrateCall):
        assert "--number" in calibrate_call.args
        assert "--list-intellilist" in calibrate_call.args
        assert "1,2,5-8" in calibrate_call.args

        assert len(calibrate_call.paths.notebooks) == 2

        for i, notebook_path in enumerate(sorted(calibrate_call.paths.notebooks)):
            with notebook_path.open() as file:
                notebook = nbformat.read(file, as_version=4)

                parameters = {p.name: p.value for p in extract_parameters(notebook)}

                assert parameters["number"] == 10
                assert parameters["list_normal"] == [1, 2, 10]
                assert parameters["list_intellilist"] == [1, 2, 5, 6, 7]
                assert parameters["concurrency_parameter"][0] == i


class TestAgipdNotebook:
    @pytest.fixture(scope="class", autouse=True)
    def fake_process_calibrate(self):
        with FakeProcessCalibrate() as fake_process:
            yield fake_process

    @pytest.fixture(scope="function")
    def mock_proposal(self, tmpdir_factory):
        return MockProposal(
            tmp_path=Path(tmpdir_factory.mktemp("exp")),
            instrument="AGIPD",
            cycle="202031",
            proposal="p900113",
            runs=2,
            # TODO: update this once extra-data tests can have variable sequences
            # sequences=5,
        )

    @pytest.fixture(scope="function")
    def calibrate_call(self, mock_proposal: MockProposal, capsys, tmp_path):
        return CalibrateCall(
            tmp_path,
            capsys,
            in_folder=mock_proposal.path_raw,
            out_folder=mock_proposal.path_proc / "r0000",
            detector="AGIPD",
            cal_type="CORRECT",
            extra_args=[
                "--run",
                "0",
                "--sequences",
                "1-3",
                #  TODO: enable this when notebook execution tests are ready to be ran
                # "--no-cluster-job",
            ],
        )

    @pytest.mark.skip(reason="not implemented")
    def test_out_folder_correct(self):
        #  TODO: add a test checking that the out folder is correct
        #  reffer to: https://git.xfel.eu/gitlab/detectors/pycalibration/issues/52
        pass

    @pytest.mark.skip(reason="requires extra-data test file sequence options")
    def test_files_present(self, calibrate_call: CalibrateCall):
        #  There should be three notebooks: one pre, then the main, then one post
        assert len(calibrate_call.paths.notebooks) == 3
        #  This is pretty fragile, but the name of notebooks should not change
        #  (too) often
        root_nb_path = calibrate_call.paths.notebooks[0].parent
        notebooks = [
            root_nb_path / "AGIPD_Correct_and_Verify__sequences__1.ipynb",
            root_nb_path / "AGIPD_Correct_and_Verify_Summary_NBC__None__None.ipynb",
            root_nb_path / "AGIPD_Retrieve_Constants_Precorrection__None__None.ipynb",
        ]

        assert all(nb in calibrate_call.paths.notebooks for nb in notebooks)

    @pytest.mark.skip(reason="not implemented")
    def test_nb_sequences(self, calibrate_call: CalibrateCall):
        notebook_path = (
            calibrate_call.paths.notebooks[0].parent
            / "AGIPD_Correct_and_Verify__sequences__1.ipynb"
        )

        with notebook_path.open() as file:
            notebook = nbformat.read(file, as_version=4)

        parameters = {p.name: p.value for p in extract_parameters(notebook)}
        #  TODO: add test cases for this notebook
        print(parameters)

    @pytest.mark.skip(reason="not implemented")
    def test_nb_summary(self, calibrate_call: CalibrateCall):
        notebook_path = (
            calibrate_call.paths.notebooks[0].parent
            / "AGIPD_Correct_and_Verify_Summary_NBC__None__None.ipynb"
        )

        with notebook_path.open() as file:
            notebook = nbformat.read(file, as_version=4)

        parameters = {p.name: p.value for p in extract_parameters(notebook)}
        # TODO: add test cases for this notebook
        print(parameters)

    @pytest.mark.skip(reason="not implemented")
    def test_nb_precorrection(self, calibrate_call: CalibrateCall):
        notebook_path = (
            calibrate_call.paths.notebooks[0].parent
            / "AGIPD_Retrieve_Constants_Precorrection__None__None.ipynb"
        )

        with notebook_path.open() as file:
            notebook = nbformat.read(file, as_version=4)
        #  TODO: add test cases for this notebook
        parameters = {p.name: p.value for p in extract_parameters(notebook)}

        print(parameters)