Skip to content
Snippets Groups Projects
Commit 1a73e080 authored by Cyril Danilevski's avatar Cyril Danilevski :scooter:
Browse files

Merge branch 'feat/mymdc_run_check' into 'master'

[webservice] Check run migration status using MyMDC

See merge request detectors/pycalibration!553
parents 05986280 90d306f3
No related branches found
No related tags found
1 merge request!553[webservice] Check run migration status using MyMDC
import os
import sys import sys
from pathlib import Path from pathlib import Path
from unittest import mock from unittest import mock
from webservice.messages import MigrationError
import pytest import pytest
from testpath import MockCommand
sys.path.insert(0, Path(__file__).parent / 'webservice') sys.path.insert(0, Path(__file__).parent / 'webservice')
import webservice # noqa: import not at top of file import webservice # noqa: import not at top of file
from webservice.messages import MigrationError # noqa: import not at top
from webservice.webservice import ( # noqa: import not at top of file from webservice.webservice import ( # noqa: import not at top of file
check_files, check_files,
merge, merge,
...@@ -79,58 +77,57 @@ def test_parse_config(): ...@@ -79,58 +77,57 @@ def test_parse_config():
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize( @pytest.mark.parametrize(
"xattr_list, xattr_get, expected_result", "mock_json, expected_result",
[ [
(["user.status"], b"offline", True), ({'runs': [{'repositories': {'DESY_DCACHE_RAW_CC': 'whatever_its_a_dict'}}]}, # noqa
(["user.status"], b"tape", True), ['DESY_DCACHE_RAW_CC']),
(["user.status"], b"dCache", True),
({'runs': [{'repositories': {'XFEL_GPFS_OFFLINE_RAW_CC': 'whatever_its_a_dict'}}]}, # noqa
['XFEL_GPFS_OFFLINE_RAW_CC']),
], ],
) )
async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path): async def test_wait_on_transfer(mock_json, expected_result):
with mock.patch.object(os, "listxattr", lambda path: xattr_list): response = mock.Mock()
with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): response.status_code = 200
res = await wait_on_transfer( response.json = mock.Mock(return_value=mock_json)
str(tmp_path), client = mock.Mock()
max_tries_completion=1, client.get_runs_by_proposal_number_api = mock.Mock(
max_tries_attributes=1, return_value=response)
sleep_attributes=1,
) res = await wait_on_transfer(client, 1, 1, 1, 1)
assert res is expected_result assert res == expected_result
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize( @pytest.mark.parametrize(
"xattr_list, xattr_get, exception_match", "mock_json, status_code, exception_class, exception_match",
[ [
([], b"", r"FAILED:.*user.status.*"), ([],
(["user.status"], b"notmigrated2d2", r"FAILED:.*notmigratedr2d2.*"), 200,
(["user.status"], b"foobar", r"FAILED:.*unknown.*"), ValueError,
r"FAILED:.*empty response.*"),
([],
404,
ValueError,
r"FAILED:.*MyMDC.*404.*"),
({'runs': [{'repositories': {}}]},
200,
MigrationError,
r"FAILED:.*migration issue.*"),
], ],
) )
async def test_wait_on_transfer_exceptions( async def test_wait_on_transfer_exceptions(
xattr_list, xattr_get, exception_match, tmp_path mock_json, status_code, exception_class, exception_match
): ):
with mock.patch.object(os, "listxattr", lambda path: xattr_list): response = mock.Mock()
with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get): response.status_code = status_code
with pytest.raises(MigrationError, match=exception_match): response.json = mock.Mock(return_value=mock_json)
await wait_on_transfer( client = mock.Mock()
str(tmp_path), client.get_runs_by_proposal_number_api = mock.Mock(
max_tries_completion=1, return_value=response)
max_tries_attributes=1,
sleep_attributes=1, with pytest.raises(exception_class, match=exception_match):
) await wait_on_transfer(client, 1, 1, 1, 1)
@pytest.mark.asyncio
async def test_wait_on_transfer_timeout(tmp_path):
with mock.patch.object(os, "listxattr", lambda path: ["user.status"]):
with mock.patch.object(os, "getxattr", lambda path, attr: b"migration_in_progress"):
with pytest.raises(MigrationError, match=r"FAILED:.*progress.*"):
await wait_on_transfer(
str(tmp_path),
max_tries_completion=1,
max_tries_attributes=1,
sleep_attributes=1,
)
@pytest.mark.asyncio @pytest.mark.asyncio
......
...@@ -17,35 +17,15 @@ class Errors: ...@@ -17,35 +17,15 @@ class Errors:
class MigrationError(Exception): class MigrationError(Exception):
@classmethod
def no_user_status_xattr(cls, run_path: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: `user.status` xattr not "
f"present after 5 minutes, migration may have failed, manually restarting "
f"migration may help"
)
@classmethod
def migration_failed(cls, run_path: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: migration marked as failed "
f"(`run user.status` set to `notmigratedr2d2`), manually restarting "
f"migration may help"
)
@classmethod @classmethod
def unknown_user_status(cls, run_path: str, status: str): def timeout(cls, run: int, proposal: int, time: str):
return cls( return cls(
f"FAILED: migration issue for run `{run_path}`: run has an unknown " f"FAILED: migration issue for proposal {proposal} run `{run}`: "
f"`user.status` xattr `{status}`, manually restarting migration may help" f"migration still incompleted after {time}, "
"manually restarting migration may help"
) )
@classmethod
def timeout(cls, run_path: str, time: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: migration still marked as"
f"in progress after {time}, manually restarting migration may help"
)
class MDC: class MDC:
MIGRATION_TIMEOUT = "Timeout waiting for migration. Contact it-support@xfel.eu" MIGRATION_TIMEOUT = "Timeout waiting for migration. Contact it-support@xfel.eu"
......
...@@ -30,12 +30,12 @@ from kafka import KafkaProducer ...@@ -30,12 +30,12 @@ from kafka import KafkaProducer
from kafka.errors import KafkaError from kafka.errors import KafkaError
from metadata_client.metadata_client import MetadataClient from metadata_client.metadata_client import MetadataClient
from .config import webservice as config
try: try:
from .messages import MDC, Errors, Success, MigrationError from .config import webservice as config
from .messages import MDC, Errors, MigrationError, Success
except ImportError: except ImportError:
from messages import MDC, Errors, Success, MigrationError from config import webservice as config
from messages import MDC, Errors, MigrationError, Success
def init_job_db(config): def init_job_db(config):
...@@ -534,108 +534,89 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: ...@@ -534,108 +534,89 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str:
async def wait_on_transfer( async def wait_on_transfer(
run_path: str, mdc,
max_tries_completion: int = 300, run: int,
max_tries_attributes: int = 5, proposal: int,
max_tries: int = 300,
sleep_completion: int = 10, sleep_completion: int = 10,
sleep_attributes: int = 60, ) -> List[str]:
) -> bool: """Query MyMDC to get run migration status.
"""
Wait on data files to be transferred to Maxwell This coro queries MyMDC to get the storage spaces where the data is
available from (known as repositories.)
:param run_path: Folder, which contains data files migrated to Maxwell If the data is available on offline or dCache storage, this coro will
:param max_tries_completion: Maximum number of tries to check for migration completed attribute return. Else, it will loop until available.
:param max_tries_attributes: Maximum number of tries to check for migration attributes being present
:param sleep_completion: Sleep time between checks for migration completed attribute If the data is not avalailable after all retries, a MigrationError will be
:param sleep_attributes: Sleep time between checks for migration attributes being present raised.
:return: True if files are transferred A ValueError can also be raised if a repository is not supported in this
webservice.
:param run: the run id
:param proposal: the proposal id
:param max_tries: maximum number of tries to check for migration completed
:param sleep_completion: sleep time in seconds between checks
:return repos: a list of repositories in which the data is available
:raise MigrationError: when the data is not on offline or dCache within the
maximum max_tries * sleep_completion time
:raise ValueError: if the reply from MyMDC is not 200 ok or no data was
returned.
""" """
# TODO: Make use of MyMDC to request whether the run has been copied. loop = get_event_loop()
# It is not sufficient to know that the files are on disk, but also to for iteration in range(max_tries):
# check the copy is finished (ie. that the files are complete). response = await loop.run_in_executor(
if 'pnfs' in os.path.realpath(run_path): None,
return True mdc.get_runs_by_proposal_number_api,
proposal,
tries_for_completion = 0 run
tries_for_attributes = 0 )
# FIXME: if not kafka, then do event-driven, no sleep
# wait until folder gets created
while not os.path.exists(run_path):
if tries_for_completion > max_tries_completion:
return False
tries_for_completion += 1
await asyncio.sleep(sleep_completion)
# FIXME: if not kafka, then do event-driven, no sleep if response.status_code != 200:
# wait until files are migrated raise ValueError("FAILED: MyMDC replied with "
while True: f"{response.status_code}: {response.reason}")
# TODO: add test case for migration issues/missing attribute handling
if "user.status" not in os.listxattr(run_path):
if tries_for_attributes >= max_tries_attributes:
logging.critical(
"`status` attribute missing after max tries for migration reached. "
"Migration may have failed, try triggering migration manually again."
)
# TODO: automatically re-trigger migration...?
raise MigrationError.no_user_status_xattr(run_path)
tries_for_attributes += 1 if not response.json():
logging.warning( raise ValueError("FAILED: MyMDC replied empty response for "
f"`status` attribute missing, migration may have failed, on attempt " f"proposal {proposal}, run {run}")
f"{tries_for_attributes}/{max_tries_attributes}"
)
# Wait for a longer time if the attributes are missing run_details, = response.json()['runs']
await asyncio.sleep(sleep_attributes) repositories = list(run_details['repositories'].keys())
continue
user_status = os.getxattr(run_path, "user.status").decode().lower() for identifier in repositories:
logging.debug(f"{run_path}: {user_status=}") # TODO: use `trace` instead of `debug` w/ loguru if any(s in identifier for s in ('DCACHE', 'OFFLINE')):
if user_status in ["dcache", "tape", "offline"]: return repositories
return True
elif user_status == "notmigrated2d2":
logging.critical(f"Migration failed for {run_path}")
raise MigrationError.migration_failed(run_path)
elif user_status != "migration_in_progress":
logging.critical("Unknown status: {user_status}")
if tries_for_completion > max_tries_completion:
raise MigrationError.unknown_user_status(run_path, user_status)
if tries_for_completion > max_tries_completion:
raise MigrationError.timeout(
run_path, f"{tries_for_completion*sleep_completion}s"
)
tries_for_completion += 1 logging.info(f"Proposal {proposal} run {run} not migrated yet. "
f"Will try again ({iteration}/{max_tries})")
await asyncio.sleep(sleep_completion) await asyncio.sleep(sleep_completion)
raise MigrationError.timeout(run, proposal,
f"{max_tries*sleep_completion}s")
async def wait_transfers( async def wait_transfers(
wait_runs: List[str], in_folder: str, proposal: str mdc: MetadataClient,
runs: List[int],
proposal: int
) -> bool: ) -> bool:
"""Wait for multiple runs to be transferred to Maxwell. """Wait for multiple runs to be transferred to Maxwell.
:param wait_runs: Run numbers to wait for :param runs: Run numbers to wait for
:param in_folder: Proposal raw directory containing runs
:param proposal: Proposal number :param proposal: Proposal number
:return: True if all runs transferred, false on timeout :return: True if all runs transferred, false on timeout
""" """
logging.debug(f"Waiting for: proposal {proposal}, runs {wait_runs}") logging.debug(f"Waiting for: proposal {proposal}, runs {runs}")
# FIXME: this loop should be an asyncio.gather coros = [wait_on_transfer(mdc, run, proposal) for run in runs]
for runnr in wait_runs: try:
rpath = "{}/r{:04d}/".format(in_folder, int(runnr)) await asyncio.gather(*coros)
transfer_complete = await wait_on_transfer(rpath) except (MigrationError, ValueError):
if not transfer_complete: logging.error(
logging.error( Errors.TRANSFER_EVAL_FAILED.format(proposal, runs), exc_info=True
Errors.TRANSFER_EVAL_FAILED.format(proposal, runnr) )
) return False
return False
logging.info( logging.info("Transfer complete: proposal %s, runs %s", proposal, runs)
"Transfer complete: proposal %s, runs %s", proposal, wait_runs
)
return True return True
...@@ -881,7 +862,7 @@ class ActionsServer: ...@@ -881,7 +862,7 @@ class ActionsServer:
await update_mdc_status(self.mdc, 'correct', rid, queued_msg) await update_mdc_status(self.mdc, 'correct', rid, queued_msg)
try: try:
transfer_complete = await wait_transfers( transfer_complete = await wait_transfers(
[runnr], in_folder, proposal self.mdc, [runnr], proposal
) )
if not transfer_complete: if not transfer_complete:
# Timed out # Timed out
...@@ -915,12 +896,8 @@ class ActionsServer: ...@@ -915,12 +896,8 @@ class ActionsServer:
thisconf["priority"] = str(priority) thisconf["priority"] = str(priority)
detectors[karabo_id] = thisconf detectors[karabo_id] = thisconf
copy_file_set = copy_file_list.difference(corr_file_list) # noqa copy_file_set = copy_file_list.difference(corr_file_list)
asyncio.ensure_future(copy_untouched_files(copy_file_set)) asyncio.ensure_future(copy_untouched_files(copy_file_set))
except MigrationError as e:
logging.error("Migration issue", exc_info=e)
await update_mdc_status(self.mdc, 'correct', rid, str(e))
return
except Exception as corr_e: except Exception as corr_e:
logging.error("Error during correction", exc_info=corr_e) logging.error("Error during correction", exc_info=corr_e)
await update_mdc_status(self.mdc, 'correct', rid, await update_mdc_status(self.mdc, 'correct', rid,
...@@ -1008,19 +985,12 @@ class ActionsServer: ...@@ -1008,19 +985,12 @@ class ActionsServer:
async def _continue(): async def _continue():
"""Runs in the background after we reply to the 'dark_request' request""" """Runs in the background after we reply to the 'dark_request' request"""
await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg)
try: transfer_complete = await wait_transfers(self.mdc, runs, proposal)
transfer_complete = await wait_transfers( if not transfer_complete:
runs, in_folder, proposal # Timed out
await update_mdc_status(
self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT
) )
if not transfer_complete:
# Timed out
await update_mdc_status(
self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT
)
return
except MigrationError as e:
logging.error("Migration issue", exc_info=e)
await update_mdc_status(self.mdc, 'dark_request', rid, str(e))
return return
# Notebooks require one or three runs, depending on the # Notebooks require one or three runs, depending on the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment