Skip to content
Snippets Groups Projects
Commit 0e3d814b authored by Robert Rosca's avatar Robert Rosca
Browse files

Merge branch 'fix/user.status-fattr-missing' into 'master'

Fix/user.status fattr missing

See merge request detectors/pycalibration!542
parents f0a62dce 9e031a8c
No related branches found
No related tags found
1 merge request!542Fix/user.status fattr missing
import os
import sys import sys
from pathlib import Path from pathlib import Path
from unittest import mock from unittest import mock
...@@ -76,14 +77,24 @@ def test_parse_config(): ...@@ -76,14 +77,24 @@ def test_parse_config():
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_wait_on_transfer(tmp_path): @pytest.mark.parametrize(
mock_getfattr = MockCommand( "xattr_list, xattr_get, expected_result",
'getfattr', [
content="""#!{}\nprint('user.status="dCache"')""".format(sys.executable) (["user.status"], b"offline", True),
) (["user.status"], b"notmigrated2d2", False),
with mock_getfattr: ([], b"", False),
res = await wait_on_transfer(str(tmp_path), max_tries=1) ],
assert res is True )
async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path):
with mock.patch.object(os, "listxattr", lambda path: xattr_list):
with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get):
res = await wait_on_transfer(
str(tmp_path),
max_tries_completion=1,
max_tries_attributes=1,
sleep_attributes=1,
)
assert res is expected_result
@pytest.mark.asyncio @pytest.mark.asyncio
......
...@@ -523,41 +523,78 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: ...@@ -523,41 +523,78 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str:
return message return message
async def wait_on_transfer(rpath, max_tries=300) -> bool: async def wait_on_transfer(
run_path: str,
max_tries_completion: int = 300,
max_tries_attributes: int = 5,
sleep_completion: int = 10,
sleep_attributes: int = 60,
) -> bool:
""" """
Wait on data files to be transferred to Maxwell Wait on data files to be transferred to Maxwell
:param rpath: Folder, which contains data files migrated to Maxwell :param run_path: Folder, which contains data files migrated to Maxwell
:param max_tries: Maximum number of checks if files are transferred :param max_tries_completion: Maximum number of tries to check for migration completed attribute
:param max_tries_attributes: Maximum number of tries to check for migration attributes being present
:param sleep_completion: Sleep time between checks for migration completed attribute
:param sleep_attributes: Sleep time between checks for migration attributes being present
:return: True if files are transferred :return: True if files are transferred
""" """
# TODO: Make use of MyMDC to request whether the run has been copied. # TODO: Make use of MyMDC to request whether the run has been copied.
# It is not sufficient to know that the files are on disk, but also to # It is not sufficient to know that the files are on disk, but also to
# check the copy is finished (ie. that the files are complete). # check the copy is finished (ie. that the files are complete).
if 'pnfs' in os.path.realpath(rpath): if 'pnfs' in os.path.realpath(run_path):
return True return True
tries = 0
tries_for_completion = 0
tries_for_attributes = 0
# FIXME: if not kafka, then do event-driven, no sleep # FIXME: if not kafka, then do event-driven, no sleep
# wait until folder gets created # wait until folder gets created
while not os.path.exists(rpath): while not os.path.exists(run_path):
if tries > max_tries: if tries_for_completion > max_tries_completion:
return False return False
tries += 1 tries_for_completion += 1
await asyncio.sleep(10) await asyncio.sleep(sleep_completion)
# FIXME: if not kafka, then do event-driven, no sleep # FIXME: if not kafka, then do event-driven, no sleep
# wait until files are migrated # wait until files are migrated
while True: while True:
retcode, stdout = await run_proc_async([ # TODO: add test case for migration issues/missing attribute handling
"getfattr", "-n", "user.status", rpath if "user.status" not in os.listxattr(run_path):
]) if tries_for_attributes >= max_tries_attributes:
if retcode == 0 and 'status="online"' not in stdout.decode().lower(): logging.critical(
"`status` attribute missing after max tries for migration reached. "
"Migration may have failed, try triggering migration manually again."
)
# TODO: automatically re-trigger migration...?
return False
tries_for_attributes += 1
logging.warning(
f"`status` attribute missing, migration may have failed, on attempt "
f"{tries_for_attributes}/{max_tries_attributes}"
)
# Wait for a longer time if the attributes are missing
await asyncio.sleep(sleep_attributes)
continue
user_status = os.getxattr(run_path, "user.status").decode().lower()
logging.debug(f"{run_path}: {user_status=}") # TODO: use `trace` instead of `debug` w/ loguru
if user_status in ["dcache", "tape", "offline"]:
return True return True
if tries > max_tries: elif user_status == "notmigrated2d2":
logging.critical(f"Migration failed for {run_path}")
return False return False
tries += 1 elif user_status != "migration_in_progress":
await asyncio.sleep(10) logging.critical("Unknown status: {user_status}")
if tries_for_completion > max_tries_completion:
return False
tries_for_completion += 1
await asyncio.sleep(sleep_completion)
async def wait_transfers( async def wait_transfers(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment