Skip to content
Snippets Groups Projects

[AGIPD][TESTs]test_agipdlib AGIPDCtrl and get_bias_voltage for AGIPD1M and AGIPD500K

Merged Karim Ahmed requested to merge test/test_agipdlib into master
All threads resolved!
1 file
+ 38
9
Compare changes
  • Side-by-side
  • Inline
+ 38
9
@@ -526,7 +526,9 @@ async def run_action(job_db, cmd, mode, proposal, run, rid) -> str:
async def wait_on_transfer(
mdc,
run: int,
proposal: int,
proposal: str,
cycle: int,
instrument: str,
max_tries: int = 300,
sleep_completion: int = 10,
) -> List[str]:
@@ -571,10 +573,33 @@ async def wait_on_transfer(
run_details, = response.json()['runs']
repositories = list(run_details['repositories'].keys())
# List of locations where the data is stored, values are:
# {"XFEL_GPFS_ONLINE_SASE_X", "XFEL_GPFS_OFFLINE_RAW_CC", "DESY_DCACHE_RAW_CC"}
#
# where `X` is the SASE number
proposal_dir = Path(
config['correct']['in-folder'].format(
instrument=instrument, cycle=cycle, proposal=proposal
)
)
run_dir = (proposal_dir / f"{run:06d}").resolve()
for identifier in repositories:
if any(s in identifier for s in ('DCACHE', 'OFFLINE')):
return repositories
logging.debug(f"{run_dir=}, {repositories=}")
gpfs_transfer_done = (
run_dir.is_relative_to("/gpfs/exfel/d")
and "XFEL_GPFS" in repositories
)
dcache_transfer_done = (
run_dir.is_relative_to("/pnfs")
and "DESY_DCACHE" in repositories
)
if gpfs_transfer_done or dcache_transfer_done:
return repositories
logging.info(f"Proposal {proposal} run {run} not migrated yet. "
f"Will try again ({iteration}/{max_tries})")
@@ -587,7 +612,9 @@ async def wait_on_transfer(
async def wait_transfers(
mdc: MetadataClient,
runs: List[int],
proposal: int
proposal: str,
cycle: int,
instrument: str,
) -> bool:
"""Wait for multiple runs to be transferred to Maxwell.
@@ -597,7 +624,7 @@ async def wait_transfers(
"""
logging.debug(f"Waiting for: proposal {proposal}, runs {runs}")
coros = [wait_on_transfer(mdc, run, proposal) for run in runs]
coros = [wait_on_transfer(mdc, run, proposal, cycle, instrument) for run in runs]
try:
await asyncio.gather(*coros)
except (MigrationError, ValueError):
@@ -985,7 +1012,7 @@ class ActionsServer:
await update_mdc_status(self.mdc, 'correct', rid, queued_msg)
try:
transfer_complete = await wait_transfers(
self.mdc, [runnr], proposal
self.mdc, [runnr], proposal, cycle, instrument
)
if not transfer_complete:
# Timed out
@@ -1074,7 +1101,7 @@ class ActionsServer:
pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra))
karabo_das = [val.strip() for val in karabo_das]
runs = [str(val) for val in wait_runs]
runs = [str(val) for val in wait_runs] # FIX: could this be int instead of str?
proposal = self._normalise_proposal_num(proposal)
@@ -1116,7 +1143,9 @@ class ActionsServer:
async def _continue():
"""Runs in the background after we reply to the 'dark_request' request"""
await update_mdc_status(self.mdc, 'dark_request', rid, queued_msg)
transfer_complete = await wait_transfers(self.mdc, runs, proposal)
transfer_complete = await wait_transfers(
self.mdc, runs, proposal, cycle, instrument
)
if not transfer_complete:
# Timed out
await update_mdc_status(
Loading