Skip to content
Snippets Groups Projects
Commit 9e031a8c authored by Robert Rosca's avatar Robert Rosca
Browse files

Use `os....xattr` instead of process calls, simplify,

prep for migration failed flag
parent f0a62dce
No related branches found
No related tags found
No related merge requests found
import os
import sys
from pathlib import Path
from unittest import mock
......@@ -76,14 +77,24 @@ def test_parse_config():
@pytest.mark.asyncio
async def test_wait_on_transfer(tmp_path):
mock_getfattr = MockCommand(
'getfattr',
content="""#!{}\nprint('user.status="dCache"')""".format(sys.executable)
)
with mock_getfattr:
res = await wait_on_transfer(str(tmp_path), max_tries=1)
assert res is True
@pytest.mark.parametrize(
"xattr_list, xattr_get, expected_result",
[
(["user.status"], b"offline", True),
(["user.status"], b"notmigrated2d2", False),
([], b"", False),
],
)
async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path):
with mock.patch.object(os, "listxattr", lambda path: xattr_list):
with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get):
res = await wait_on_transfer(
str(tmp_path),
max_tries_completion=1,
max_tries_attributes=1,
sleep_attributes=1,
)
assert res is expected_result
@pytest.mark.asyncio
......
......@@ -523,41 +523,78 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str:
return message
async def wait_on_transfer(rpath, max_tries=300) -> bool:
async def wait_on_transfer(
run_path: str,
max_tries_completion: int = 300,
max_tries_attributes: int = 5,
sleep_completion: int = 10,
sleep_attributes: int = 60,
) -> bool:
"""
Wait on data files to be transferred to Maxwell
:param rpath: Folder, which contains data files migrated to Maxwell
:param max_tries: Maximum number of checks if files are transferred
:param run_path: Folder, which contains data files migrated to Maxwell
:param max_tries_completion: Maximum number of tries to check for migration completed attribute
:param max_tries_attributes: Maximum number of tries to check for migration attributes being present
:param sleep_completion: Sleep time between checks for migration completed attribute
:param sleep_attributes: Sleep time between checks for migration attributes being present
:return: True if files are transferred
"""
# TODO: Make use of MyMDC to request whether the run has been copied.
# It is not sufficient to know that the files are on disk, but also to
# check the copy is finished (ie. that the files are complete).
if 'pnfs' in os.path.realpath(rpath):
if 'pnfs' in os.path.realpath(run_path):
return True
tries = 0
tries_for_completion = 0
tries_for_attributes = 0
# FIXME: if not kafka, then do event-driven, no sleep
# wait until folder gets created
while not os.path.exists(rpath):
if tries > max_tries:
while not os.path.exists(run_path):
if tries_for_completion > max_tries_completion:
return False
tries += 1
await asyncio.sleep(10)
tries_for_completion += 1
await asyncio.sleep(sleep_completion)
# FIXME: if not kafka, then do event-driven, no sleep
# wait until files are migrated
while True:
retcode, stdout = await run_proc_async([
"getfattr", "-n", "user.status", rpath
])
if retcode == 0 and 'status="online"' not in stdout.decode().lower():
# TODO: add test case for migration issues/missing attribute handling
if "user.status" not in os.listxattr(run_path):
if tries_for_attributes >= max_tries_attributes:
logging.critical(
"`status` attribute missing after max tries for migration reached. "
"Migration may have failed, try triggering migration manually again."
)
# TODO: automatically re-trigger migration...?
return False
tries_for_attributes += 1
logging.warning(
f"`status` attribute missing, migration may have failed, on attempt "
f"{tries_for_attributes}/{max_tries_attributes}"
)
# Wait for a longer time if the attributes are missing
await asyncio.sleep(sleep_attributes)
continue
user_status = os.getxattr(run_path, "user.status").decode().lower()
logging.debug(f"{run_path}: {user_status=}") # TODO: use `trace` instead of `debug` w/ loguru
if user_status in ["dcache", "tape", "offline"]:
return True
if tries > max_tries:
elif user_status == "notmigrated2d2":
logging.critical(f"Migration failed for {run_path}")
return False
tries += 1
await asyncio.sleep(10)
elif user_status != "migration_in_progress":
logging.critical("Unknown status: {user_status}")
if tries_for_completion > max_tries_completion:
return False
tries_for_completion += 1
await asyncio.sleep(sleep_completion)
async def wait_transfers(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment