diff --git a/tests/test_webservice.py b/tests/test_webservice.py index 4da6d42528e5012946a2d78be94dc1be43fb2469..dfa154e30885c16e2ab9550f4a82c3952b06cf8e 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -1,3 +1,4 @@ +import os import sys from pathlib import Path from unittest import mock @@ -76,14 +77,24 @@ def test_parse_config(): @pytest.mark.asyncio -async def test_wait_on_transfer(tmp_path): - mock_getfattr = MockCommand( - 'getfattr', - content="""#!{}\nprint('user.status="dCache"')""".format(sys.executable) - ) - with mock_getfattr: - res = await wait_on_transfer(str(tmp_path), max_tries=1) - assert res is True +@pytest.mark.parametrize( + "xattr_list, xattr_get, expected_result", + [ + (["user.status"], b"offline", True), + (["user.status"], b"notmigrated2d2", False), + ([], b"", False), + ], +) +async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path): + with mock.patch.object(os, "listxattr", lambda path: xattr_list): + with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): + res = await wait_on_transfer( + str(tmp_path), + max_tries_completion=1, + max_tries_attributes=1, + sleep_attributes=1, + ) + assert res is expected_result @pytest.mark.asyncio diff --git a/webservice/webservice.py b/webservice/webservice.py index c2907d30d18ba216f651d3a247ed65df4a5ea767..9a468e12a4e06660884dde497743f81f09a98f4a 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -523,41 +523,78 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: return message -async def wait_on_transfer(rpath, max_tries=300) -> bool: +async def wait_on_transfer( + run_path: str, + max_tries_completion: int = 300, + max_tries_attributes: int = 5, + sleep_completion: int = 10, + sleep_attributes: int = 60, +) -> bool: """ Wait on data files to be transferred to Maxwell - :param rpath: Folder, which contains data files migrated to Maxwell - :param max_tries: Maximum number of checks if files are transferred + :param run_path: Folder, which contains data files migrated to Maxwell + :param max_tries_completion: Maximum number of tries to check for migration completed attribute + :param max_tries_attributes: Maximum number of tries to check for migration attributes being present + :param sleep_completion: Sleep time between checks for migration completed attribute + :param sleep_attributes: Sleep time between checks for migration attributes being present :return: True if files are transferred """ # TODO: Make use of MyMDC to request whether the run has been copied. # It is not sufficient to know that the files are on disk, but also to # check the copy is finished (ie. that the files are complete). - if 'pnfs' in os.path.realpath(rpath): + if 'pnfs' in os.path.realpath(run_path): return True - tries = 0 + + tries_for_completion = 0 + tries_for_attributes = 0 # FIXME: if not kafka, then do event-driven, no sleep # wait until folder gets created - while not os.path.exists(rpath): - if tries > max_tries: + while not os.path.exists(run_path): + if tries_for_completion > max_tries_completion: return False - tries += 1 - await asyncio.sleep(10) + tries_for_completion += 1 + await asyncio.sleep(sleep_completion) # FIXME: if not kafka, then do event-driven, no sleep # wait until files are migrated while True: - retcode, stdout = await run_proc_async([ - "getfattr", "-n", "user.status", rpath - ]) - if retcode == 0 and 'status="online"' not in stdout.decode().lower(): + # TODO: add test case for migration issues/missing attribute handling + if "user.status" not in os.listxattr(run_path): + if tries_for_attributes >= max_tries_attributes: + logging.critical( + "`status` attribute missing after max tries for migration reached. " + "Migration may have failed, try triggering migration manually again." + ) + # TODO: automatically re-trigger migration...? + return False + + tries_for_attributes += 1 + logging.warning( + f"`status` attribute missing, migration may have failed, on attempt " + f"{tries_for_attributes}/{max_tries_attributes}" + ) + + # Wait for a longer time if the attributes are missing + await asyncio.sleep(sleep_attributes) + continue + + user_status = os.getxattr(run_path, "user.status").decode().lower() + logging.debug(f"{run_path}: {user_status=}") # TODO: use `trace` instead of `debug` w/ loguru + if user_status in ["dcache", "tape", "offline"]: return True - if tries > max_tries: + elif user_status == "notmigrated2d2": + logging.critical(f"Migration failed for {run_path}") return False - tries += 1 - await asyncio.sleep(10) + elif user_status != "migration_in_progress": + logging.critical("Unknown status: {user_status}") + + if tries_for_completion > max_tries_completion: + return False + + tries_for_completion += 1 + await asyncio.sleep(sleep_completion) async def wait_transfers(