diff --git a/tests/test_webservice.py b/tests/test_webservice.py index 29dbfb2ab6f6ddf8f89432391c57682daa2e5211..a61c7e12be7ac6515b7105fc1ee9699449f39afd 100644 --- a/tests/test_webservice.py +++ b/tests/test_webservice.py @@ -1,3 +1,4 @@ +import logging import sys from pathlib import Path from unittest import mock @@ -78,27 +79,101 @@ def test_parse_config(): @pytest.mark.asyncio +@pytest.mark.parametrize("run_links_to", ["XFEL_GPFS_OFFLINE_RAW_CC", "DESY_DCACHE_RAW_CC"]) @pytest.mark.parametrize( - "mock_json, expected_result", - [ - ({'runs': [{'repositories': {'DESY_DCACHE_RAW_CC': 'whatever_its_a_dict'}}]}, # noqa - ['DESY_DCACHE_RAW_CC']), + "repositories", + { + (gpfs_online, gpfs_offline, dcache) + for gpfs_online in [None, None, "XFEL_GPFS_ONLINE_RAW_CC"] + for gpfs_offline in [None, None, "XFEL_GPFS_OFFLINE_RAW_CC"] + for dcache in [None, None, "DESY_DCACHE_RAW_CC"] + } +) +async def test_wait_on_transfer( + tmp_path: Path, + run_links_to: str, + repositories: list, +): + tmp_root = tmp_path + inst = "SCS" + cycle = 202202 + prop_no = 1111 + run_no = 1 - ({'runs': [{'repositories': {'XFEL_GPFS_OFFLINE_RAW_CC': 'whatever_its_a_dict'}}]}, # noqa - ['XFEL_GPFS_OFFLINE_RAW_CC']), - ], + repositories = list(filter(None, repositories)) + expected_error = run_links_to not in repositories + + gpfs_prefix = tmp_root / "gpfs/exfel/d" + gpfs_dir = gpfs_prefix / f"raw/{inst}/{cycle}/p{prop_no}" + gpfs_dir.mkdir(parents=True) + + pnfs_prefix = tmp_root / "pnfs/xfel.eu/exfel/archive/XFEL" + pnfs_dir = pnfs_prefix / f"raw/{inst}/{cycle}/p{prop_no}" + pnfs_dir.mkdir(parents=True) + + raw_path = tmp_root / f"gpfs/exfel/exp/{inst}/{cycle}/p{prop_no:06d}/raw" + raw_path.parent.mkdir(parents=True) + if run_links_to == "XFEL_GPFS_OFFLINE_RAW_CC": + raw_path.symlink_to(gpfs_dir) + else: + raw_path.symlink_to(pnfs_dir) + + run_path = raw_path / "r{run_no:04d}" + run_path.mkdir(parents=True) + + response = mock.Mock() + response.status_code = 200 + response.json = lambda: {"runs": [{"repositories": {r: None for r in repositories}}]} + client = mock.Mock() + client.get_runs_by_proposal_number_api = mock.Mock(return_value=response) + + _run_path = mock.Mock(spec=run_path) + _run_path.resolve = lambda: Path("/") / run_path.resolve().relative_to(tmp_path) + + try: + await wait_on_transfer( + client, + run=run_no, + proposal=str(prop_no), + run_dir=_run_path, + max_tries=1, + sleep_completion=0, + ) + except MigrationError as e: + if not expected_error: + raise e + + +@pytest.mark.parametrize( + "repositories", + { + (gpfs_online, gpfs_offline, dcache) + for gpfs_online in [None, None, "XFEL_GPFS_ONLINE_RAW_CC"] + for gpfs_offline in [None, None, "XFEL_GPFS_OFFLINE_RAW_CC"] + for dcache in [None, None, "DESY_DCACHE_RAW_CC"] + } ) -async def test_wait_on_transfer(mock_json, expected_result): +@pytest.mark.asyncio +async def test_wait_on_transfer_invalid_run_dir(caplog, repositories): response = mock.Mock() response.status_code = 200 - response.json = mock.Mock(return_value=mock_json) + response.json = lambda: {"runs": [{"repositories": {r: None for r in repositories}}]} client = mock.Mock() - client.get_runs_by_proposal_number_api = mock.Mock( - return_value=response) + client.get_runs_by_proposal_number_api = mock.Mock(return_value=response) - res = await wait_on_transfer(client, 1, 1, 1, 1) - assert res == expected_result + caplog.set_level(logging.WARNING) + + res = await wait_on_transfer( + client, + run=0, + proposal=str(10), + run_dir=Path("/invalid/run/dir"), + max_tries=1, + sleep_completion=0, + ) + assert res is None + assert "is not relative to" in caplog.text @pytest.mark.asyncio @pytest.mark.parametrize( @@ -129,7 +204,14 @@ async def test_wait_on_transfer_exceptions( return_value=response) with pytest.raises(exception_class, match=exception_match): - await wait_on_transfer(client, 1, 1, 1, 1) + await wait_on_transfer( + client, + run=1, + proposal="1", + max_tries=1, + sleep_completion=0, + run_dir=Path("/gpfs/exfel/d"), # valid path required to not exit early + ) @pytest.mark.asyncio @@ -148,7 +230,7 @@ async def test_run_action(mode, cmd, retcode, expected, monkeypatch): job_db = mock.Mock() async def mock_run_proc_async(*args): - return retcode, b'Submitted job: 42' + return retcode, b'Submitted job: 42', b'' monkeypatch.setattr( webservice.webservice, 'run_proc_async', mock_run_proc_async diff --git a/webservice/webservice.py b/webservice/webservice.py index 63d37e11fe3fa5270b935ef39f68a7bfb80f1a19..6e89c1ebe9f7a0c166ab9db4c187eccd6c0963d1 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -238,14 +238,16 @@ def change_config(config, updated_config, instrument, return yaml.safe_dump(new_conf, default_flow_style=False).encode() -async def run_proc_async(cmd: List[str]) -> Tuple[int, bytes]: +async def run_proc_async(cmd: List[str]) -> Tuple[Optional[int], bytes, bytes]: """Run a subprocess to completion using asyncio, capturing stdout - Returns the numeric exit code and stdout (bytes) + Returns the numeric exit code, stdout and stderr (bytes) """ - proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE) - stdout, _ = await proc.communicate() - return proc.returncode, stdout + proc = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await proc.communicate() + return proc.returncode, stdout, stderr def slurm_status(filter_user=True): @@ -479,9 +481,11 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: """ if mode == "prod": logging.info(" ".join(cmd)) - retcode, stdout = await run_proc_async(cmd) + retcode, stdout, stderr = await run_proc_async(cmd) if retcode != 0: logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, retcode)) + logging.error(f"Failed process stdout:\n%s\nFailed process stderr:\n%s", + stdout.decode(errors='replace'), stderr.decode(errors='replace')) return Errors.JOB_LAUNCH_FAILED.format(cmd, retcode) if "DARK" in cmd: @@ -526,10 +530,11 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: async def wait_on_transfer( mdc, run: int, - proposal: int, + proposal: str, + run_dir: Path, max_tries: int = 300, sleep_completion: int = 10, -) -> List[str]: +) -> Optional[List[str]]: """Query MyMDC to get run migration status. This coro queries MyMDC to get the storage spaces where the data is @@ -562,32 +567,64 @@ async def wait_on_transfer( ) if response.status_code != 200: - raise ValueError("FAILED: MyMDC replied with " - f"{response.status_code}: {response.reason}") + raise ValueError( + "FAILED: MyMDC replied with " + f"{response.status_code}: {response.reason}" + ) if not response.json(): - raise ValueError("FAILED: MyMDC replied empty response for " - f"proposal {proposal}, run {run}") + raise ValueError( + "FAILED: MyMDC replied empty response for " + f"proposal {proposal}, run {run}" + ) run_details, = response.json()['runs'] repositories = list(run_details['repositories'].keys()) + # List of locations where the data is stored, values are: + # {"XFEL_GPFS_ONLINE_SASE_X", "XFEL_GPFS_OFFLINE_RAW_CC", "DESY_DCACHE_RAW_CC"} + # + # where `X` is the SASE number + + logging.debug(f"{run_dir=}, {run_dir.resolve()=}, {repositories=}") + + # Resolving is done here in case the symlink target changed during the loop + run_dir = run_dir.resolve() + + required_repository = None + if run_dir.parts[1:4] == ("pnfs", "xfel.eu", "exfel"): + required_repository = "DESY_DCACHE_RAW" + elif run_dir.parts[1:4] == ("gpfs", "exfel", "d"): + required_repository = "XFEL_GPFS_OFFLINE" + else: + logging.warning( + f"Proposal {proposal} run {run} resolved path is not relative " + f"to `/pnfs/xfel.eu/exfel` or `/gpfs/exfel/d`: {run_dir=}" + ) + return + + logging.debug(f"{required_repository=}") - for identifier in repositories: - if any(s in identifier for s in ('DCACHE', 'OFFLINE')): - return repositories + if any(required_repository in r for r in repositories): + return repositories + + logging.info( + f"Proposal {proposal} run {run} not migrated yet. " + f"Will try again ({iteration}/{max_tries})" + ) - logging.info(f"Proposal {proposal} run {run} not migrated yet. " - f"Will try again ({iteration}/{max_tries})") await asyncio.sleep(sleep_completion) - raise MigrationError.timeout(run, proposal, - f"{max_tries*sleep_completion}s") + raise MigrationError.timeout( + run, proposal, f"{max_tries*sleep_completion}s" + ) async def wait_transfers( mdc: MetadataClient, - runs: List[int], - proposal: int + runs: List[str], + proposal: str, + cycle: int, + instrument: str, ) -> bool: """Wait for multiple runs to be transferred to Maxwell. @@ -597,7 +634,17 @@ async def wait_transfers( """ logging.debug(f"Waiting for: proposal {proposal}, runs {runs}") - coros = [wait_on_transfer(mdc, run, proposal) for run in runs] + in_folder = Path( + config['correct']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal + ) + ) + + coros = [] + for run in runs: + run_dir = (in_folder / f"r{int(run):04d}") + coros.append(wait_on_transfer(mdc, run, proposal, run_dir)) + try: await asyncio.gather(*coros) except (MigrationError, ValueError): @@ -698,12 +745,13 @@ async def get_slurm_nice(partition: str, instrument: str, return 0 # Don't apply degressive priority on exfel. # List all names for jobs running in the specified partition. - returncode, job_names = await run_proc_async( + returncode, job_names, stderr = await run_proc_async( ['squeue', '-h', '-o', '%.20j', '-p', partition, '--me']) if returncode != 0: logging.error(f'Non-zero return code {returncode} from ' f'`squeue` upon counting number of jobs') + logging.warning(f"{stderr=}") return 0 # Fallback if something went wrong. # Base value depending on proposal type using cycle, assuming that @@ -985,7 +1033,7 @@ class ActionsServer: await update_mdc_status(self.mdc, 'correct', rid, queued_msg) try: transfer_complete = await wait_transfers( - self.mdc, [runnr], proposal + self.mdc, [runnr], proposal, cycle, instrument ) if not transfer_complete: # Timed out @@ -1074,7 +1122,7 @@ class ActionsServer: pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra)) karabo_das = [val.strip() for val in karabo_das] - runs = [str(val) for val in wait_runs] + runs = [str(val) for val in wait_runs] # FIX: could this be int instead of str? proposal = self._normalise_proposal_num(proposal) @@ -1116,7 +1164,9 @@ class ActionsServer: async def _continue(): """Runs in the background after we reply to the 'dark_request' request""" await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) - transfer_complete = await wait_transfers(self.mdc, runs, proposal) + transfer_complete = await wait_transfers( + self.mdc, runs, proposal, cycle, instrument + ) if not transfer_complete: # Timed out await update_mdc_status(