Skip to content
Snippets Groups Projects
Commit cf3b3274 authored by Robert Rosca's avatar Robert Rosca
Browse files

Implement, raise, and catch, migration errors to send mdc messages

parent f88c96d9
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,7 @@ import os
import sys
from pathlib import Path
from unittest import mock
from webservice.messages import MigrationError
import pytest
from testpath import MockCommand
......@@ -81,8 +82,8 @@ def test_parse_config():
"xattr_list, xattr_get, expected_result",
[
(["user.status"], b"offline", True),
(["user.status"], b"notmigrated2d2", False),
([], b"", False),
(["user.status"], b"tape", True),
(["user.status"], b"dCache", True),
],
)
async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path):
......@@ -96,6 +97,41 @@ async def test_wait_on_transfer(xattr_list, xattr_get, expected_result, tmp_path
)
assert res is expected_result
@pytest.mark.asyncio
@pytest.mark.parametrize(
"xattr_list, xattr_get, exception_match",
[
([], b"", r"FAILED:.*user.status.*"),
(["user.status"], b"notmigrated2d2", r"FAILED:.*notmigratedr2d2.*"),
(["user.status"], b"foobar", r"FAILED:.*unknown.*"),
],
)
async def test_wait_on_transfer_exceptions(
xattr_list, xattr_get, exception_match, tmp_path
):
with mock.patch.object(os, "listxattr", lambda path: xattr_list):
with mock.patch.object(os, "getxattr", lambda path, attr: xattr_get):
with pytest.raises(MigrationError, match=exception_match):
await wait_on_transfer(
str(tmp_path),
max_tries_completion=1,
max_tries_attributes=1,
sleep_attributes=1,
)
@pytest.mark.asyncio
async def test_wait_on_transfer_timeout(tmp_path):
with mock.patch.object(os, "listxattr", lambda path: ["user.status"]):
with mock.patch.object(os, "getxattr", lambda path, attr: b"migration_in_progress"):
with pytest.raises(MigrationError, match=r"FAILED:.*progress.*"):
await wait_on_transfer(
str(tmp_path),
max_tries_completion=1,
max_tries_attributes=1,
sleep_attributes=1,
)
@pytest.mark.asyncio
@pytest.mark.parametrize(
......
......@@ -16,6 +16,37 @@ class Errors:
OTHER_ERROR = "FAILED: Error {}, please contact det-support@xfel.eu"
class MigrationError(Exception):
@classmethod
def no_user_status_xattr(cls, run_path: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: `user.status` xattr not "
f"present after 5 minutes, migration may have failed, manually restarting "
f"migration may help"
)
@classmethod
def migration_failed(cls, run_path: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: migration marked as failed "
f"(`run user.status` set to `notmigratedr2d2`), manually restarting "
f"migration may help"
)
@classmethod
def unknown_user_status(cls, run_path: str, status: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: run has an unknown "
f"`user.status` xattr `{status}`, manually restarting migration may help"
)
@classmethod
def timeout(cls, run_path: str, time: str):
return cls(
f"FAILED: migration issue for run `{run_path}`: migration still marked as"
f"in progress after {time}, manually restarting migration may help"
)
class MDC:
MIGRATION_TIMEOUT = "Timeout waiting for migration. Contact it-support@xfel.eu"
NOTHING_TO_DO = "Nothing to calibrate for this run, copied raw data only"
......
......@@ -33,9 +33,9 @@ from metadata_client.metadata_client import MetadataClient
from .config import webservice as config
try:
from .messages import MDC, Errors, Success
from .messages import MDC, Errors, Success, MigrationError
except ImportError:
from messages import MDC, Errors, Success
from messages import MDC, Errors, Success, MigrationError
def init_job_db(config):
......@@ -568,7 +568,7 @@ async def wait_on_transfer(
"Migration may have failed, try triggering migration manually again."
)
# TODO: automatically re-trigger migration...?
return False
raise MigrationError.no_user_status_xattr(run_path)
tries_for_attributes += 1
logging.warning(
......@@ -586,12 +586,16 @@ async def wait_on_transfer(
return True
elif user_status == "notmigrated2d2":
logging.critical(f"Migration failed for {run_path}")
return False
raise MigrationError.migration_failed(run_path)
elif user_status != "migration_in_progress":
logging.critical("Unknown status: {user_status}")
if tries_for_completion > max_tries_completion:
raise MigrationError.unknown_user_status(run_path, user_status)
if tries_for_completion > max_tries_completion:
return False
raise MigrationError.timeout(
run_path, f"{tries_for_completion*sleep_completion}s"
)
tries_for_completion += 1
await asyncio.sleep(sleep_completion)
......@@ -903,6 +907,10 @@ class ActionsServer:
detectors[karabo_id] = thisconf
copy_file_list = copy_file_list.difference(corr_file_list)
asyncio.ensure_future(copy_untouched_files(copy_file_list))
except MigrationError as e:
logging.error("Migration issue", exc_info=e)
await update_mdc_status(self.mdc, 'correct', rid, str(e))
return
except Exception as corr_e:
logging.error("Error during correction", exc_info=corr_e)
await update_mdc_status(self.mdc, 'correct', rid,
......@@ -985,15 +993,19 @@ class ActionsServer:
async def _continue():
"""Runs in the background after we reply to the 'dark_request' request"""
await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg)
transfer_complete = await wait_transfers(
runs, in_folder, proposal
)
if not transfer_complete:
# Timed out
await update_mdc_status(
self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT
try:
transfer_complete = await wait_transfers(
runs, in_folder, proposal
)
if not transfer_complete:
# Timed out
await update_mdc_status(
self.mdc, 'dark_request', rid, MDC.MIGRATION_TIMEOUT
)
return
except MigrationError as e:
logging.error("Migration issue", exc_info=e)
await update_mdc_status(self.mdc, 'dark_request', rid, str(e))
return
# Notebooks require one or three runs, depending on the
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment