diff --git a/webservice/webservice.py b/webservice/webservice.py index 63d37e11fe3fa5270b935ef39f68a7bfb80f1a19..8a1d95739d247a11e593abd1336405765fe6ea64 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -526,7 +526,9 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: async def wait_on_transfer( mdc, run: int, - proposal: int, + proposal: str, + cycle: int, + instrument: str, max_tries: int = 300, sleep_completion: int = 10, ) -> List[str]: @@ -571,10 +573,33 @@ async def wait_on_transfer( run_details, = response.json()['runs'] repositories = list(run_details['repositories'].keys()) + # List of locations where the data is stored, values are: + # {"XFEL_GPFS_ONLINE_SASE_X", "XFEL_GPFS_OFFLINE_RAW_CC", "DESY_DCACHE_RAW_CC"} + # + # where `X` is the SASE number + + proposal_dir = Path( + config['correct']['in-folder'].format( + instrument=instrument, cycle=cycle, proposal=proposal + ) + ) + + run_dir = (proposal_dir / f"{run:06d}").resolve() - for identifier in repositories: - if any(s in identifier for s in ('DCACHE', 'OFFLINE')): - return repositories + logging.debug(f"{run_dir=}, {repositories=}") + + gpfs_transfer_done = ( + run_dir.is_relative_to("/gpfs/exfel/d") + and "XFEL_GPFS" in repositories + ) + + dcache_transfer_done = ( + run_dir.is_relative_to("/pnfs") + and "DESY_DCACHE" in repositories + ) + + if gpfs_transfer_done or dcache_transfer_done: + return repositories logging.info(f"Proposal {proposal} run {run} not migrated yet. " f"Will try again ({iteration}/{max_tries})") @@ -587,7 +612,9 @@ async def wait_on_transfer( async def wait_transfers( mdc: MetadataClient, runs: List[int], - proposal: int + proposal: str, + cycle: int, + instrument: str, ) -> bool: """Wait for multiple runs to be transferred to Maxwell. @@ -597,7 +624,7 @@ async def wait_transfers( """ logging.debug(f"Waiting for: proposal {proposal}, runs {runs}") - coros = [wait_on_transfer(mdc, run, proposal) for run in runs] + coros = [wait_on_transfer(mdc, run, proposal, cycle, instrument) for run in runs] try: await asyncio.gather(*coros) except (MigrationError, ValueError): @@ -985,7 +1012,7 @@ class ActionsServer: await update_mdc_status(self.mdc, 'correct', rid, queued_msg) try: transfer_complete = await wait_transfers( - self.mdc, [runnr], proposal + self.mdc, [runnr], proposal, cycle, instrument ) if not transfer_complete: # Timed out @@ -1074,7 +1101,7 @@ class ActionsServer: pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra)) karabo_das = [val.strip() for val in karabo_das] - runs = [str(val) for val in wait_runs] + runs = [str(val) for val in wait_runs] # FIX: could this be int instead of str? proposal = self._normalise_proposal_num(proposal) @@ -1116,7 +1143,9 @@ class ActionsServer: async def _continue(): """Runs in the background after we reply to the 'dark_request' request""" await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg) - transfer_complete = await wait_transfers(self.mdc, runs, proposal) + transfer_complete = await wait_transfers( + self.mdc, runs, proposal, cycle, instrument + ) if not transfer_complete: # Timed out await update_mdc_status(