import os import sys from pathlib import Path from unittest import mock from webservice.messages import MigrationError import pytest from testpath import MockCommand sys.path.insert(0, Path(__file__).parent / 'webservice') import webservice # noqa: import not at top of file from webservice.webservice import ( # noqa: import not at top of file check_files, merge, parse_config, run_action, wait_on_transfer, ) @pytest.mark.requires_gpfs def test_check_files(): in_folder = '/gpfs/exfel/exp/CALLAB/202031/p900113/raw' runs = [9985, 9984] karabo_das = ['AGIPD06', 'AGIPD07'] assert check_files(in_folder, runs, karabo_das) karabo_das = ['LPD06', 'LPD07'] assert not check_files(in_folder, runs, karabo_das) runs = [1, 2] assert not check_files(in_folder, runs, karabo_das) with pytest.raises(PermissionError): in_folder = '/gpfs/maxwell/home/achilles' # arbitrarily chosen check_files(in_folder, runs, karabo_das) def test_merge(): a = {'some': {'key': {'akey': 'avalue', 'number': 1}}} b = {'some': {'key': {'anotherkey': 'anothervalue', 'number': 5}}, 'completely': 'different'} ret = merge(a, b) expected = {'some': {'key': {'akey': 'avalue', 'anotherkey': 'anothervalue', 'number': 1}}, 'completely': 'different'} assert ret == expected def test_parse_config(): cmd = ['whatever'] config = {'somebool': True, 'notsomebool': False, 'alist': [1, 2, 3], 'some_empty_key': '""', 'other_empty_key': "''", 'brian': 'scone'} expected = ['whatever', '--somebool', '--alist', '1', '2', '3', '--some_empty_key', '', '--other_empty_key', '', '--brian', 'scone'] config = parse_config(cmd, config) assert config == expected assert '--notsomebool' not in config with pytest.raises(ValueError): config = {'some key': 'value'} config = parse_config(cmd, config) with pytest.raises(ValueError): config = {'somekey': 'a value'} config = parse_config(cmd, config) @pytest.mark.asyncio @pytest.mark.parametrize( "xattr_list, xattr_get, expected_result", [ (["user.status"], b"offline", True), (["user.status"], b"tape", True), (["user.status"], b"dCache", True), ], ) async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path): with mock.patch.object(os, "listxattr", lambda path: xattr_list): with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): res = await wait_on_transfer( str(tmp_path), max_tries_completion=1, max_tries_attributes=1, sleep_attributes=1, ) assert res is expected_result @pytest.mark.asyncio @pytest.mark.parametrize( "xattr_list, xattr_get, exception_match", [ ([], b"", r"FAILED:.*user.status.*"), (["user.status"], b"notmigrated2d2", r"FAILED:.*notmigratedr2d2.*"), (["user.status"], b"foobar", r"FAILED:.*unknown.*"), ], ) async def test_wait_on_transfer_exceptions( xattr_list, xattr_get, exception_match, tmp_path ): with mock.patch.object(os, "listxattr", lambda path: xattr_list): with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): with pytest.raises(MigrationError, match=exception_match): await wait_on_transfer( str(tmp_path), max_tries_completion=1, max_tries_attributes=1, sleep_attributes=1, ) @pytest.mark.asyncio async def test_wait_on_transfer_timeout(tmp_path): with mock.patch.object(os, "listxattr", lambda path: ["user.status"]): with mock.patch.object(os, "getxattr", lambda path, attr: b"migration_in_progress"): with pytest.raises(MigrationError, match=r"FAILED:.*progress.*"): await wait_on_transfer( str(tmp_path), max_tries_completion=1, max_tries_attributes=1, sleep_attributes=1, ) @pytest.mark.asyncio @pytest.mark.parametrize( 'mode, cmd, retcode, expected', [ ('prod', ['CORRECT', '1', '2', '3', '4'], 1, "failed: failed executing"), # noqa ('prod', ['DARK', '1', '2', '3', '4'], 1, "failed: failed executing"), ('prod', ['CORRECT', '1', '2', '3', '4'], 0, "success: started correction"), # noqa ('prod', ['DARK', '1', '2', '3', '4'], 0, "success: started dark characterization"), # noqa ('sim', ['CORRECT', '1', '2', '3', '4'], 1, "success: simulated"), ('sim', ['DARK', '1', '2', '3', '4'], 1, "success: simulated"), ], ) async def test_run_action(mode, cmd, retcode, expected): job_db = mock.Mock() async def mock_run_proc_async(*args): return retcode, b'Submitted job: 42' webservice.webservice.run_proc_async = mock_run_proc_async ret = await run_action(job_db, cmd, mode, 1, 1, 1) assert ret.lower().startswith(expected)