Skip to content
Snippets Groups Projects
Commit 09b48f21 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Use asyncio subprocess interface in webservice code

parent 233b8f7e
No related branches found
No related tags found
1 merge request!435Use asyncio subprocess interface in webservice code
......@@ -7,7 +7,6 @@ import json
import logging
import os
import sqlite3
import subprocess # FIXME: use asyncio.create_subprocess_*
import traceback
import urllib.parse
from asyncio import get_event_loop, shield
......@@ -206,6 +205,16 @@ async def change_config(socket, config, updated_config, karabo_id, instrument,
socket.send(yaml.dump(new_conf, default_flow_style=False).encode())
async def run_proc_async(cmd: List[str]) -> (int, bytes):
"""Run a subprocess to completion using asyncio, capturing stdout
Returns the numeric exit code and stdout (bytes)
"""
proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
stdout, _ = proc.communicate()
return proc.returncode, stdout
async def slurm_status(filter_user=True):
""" Return the status of slurm jobs by calling squeue
......@@ -216,9 +225,9 @@ async def slurm_status(filter_user=True):
cmd = ["squeue"]
if filter_user:
cmd += ["-u", getpass.getuser()]
ret = subprocess.run(cmd, stdout=subprocess.PIPE) # FIXME: asyncio
if ret.returncode == 0:
rlines = ret.stdout.decode().split("\n")
retcode, stdout = run_proc_async(cmd)
if retcode == 0:
rlines = retcode.decode().split("\n")
statii = {}
for r in rlines[1:]:
try:
......@@ -239,9 +248,9 @@ async def slurm_job_status(jobid):
"""
cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"]
ret = subprocess.run(cmd, stdout=subprocess.PIPE) # FIXME: asyncio
if ret.returncode == 0:
rlines = ret.stdout.decode().split("\n")
retcode, stdout = run_proc_async(cmd)
if retcode == 0:
rlines = stdout.decode().split("\n")
logging.debug("Job {} state {}".format(jobid, rlines[2].split()))
if len(rlines[2].split()) == 3:
......@@ -422,15 +431,15 @@ async def run_action(job_db, cmd, mode, proposal, run, rid):
# FIXME: this coro has too many returns that can be simplified
if mode == "prod":
logging.info(" ".join(cmd))
ret = subprocess.run(cmd, stdout=subprocess.PIPE) # FIXME: asyncio
if ret.returncode == 0:
retcode, stdout = run_proc_async(cmd)
if retcode == 0:
if "DARK" in cmd:
logging.info(Success.START_CHAR.format(proposal, run))
else:
logging.info(Success.START_CORRECTION.format(proposal, run))
# enter jobs in job db
c = job_db.cursor() # FIXME: asyncio
rstr = ret.stdout.decode()
rstr = stdout.decode()
query = "INSERT INTO jobs VALUES ('{rid}', '{jobid}', '{proposal}', '{run}', 'PD', '{now}', '{det}', '{act}')" # noqa
for r in rstr.split("\n"):
......@@ -447,8 +456,8 @@ async def run_action(job_db, cmd, mode, proposal, run, rid):
else:
return Success.START_CORRECTION.format(proposal, run)
else:
logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode))
return Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode)
logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, retcode))
return Errors.JOB_LAUNCH_FAILED.format(cmd, retcode)
else:
if "DARK" in cmd:
......@@ -476,8 +485,6 @@ async def wait_on_transfer(rpath, max_tries=300):
# check the copy is finished (ie. that the files are complete).
if 'pnfs' in os.path.realpath(rpath):
return True
rstr = None
ret = None
tries = 0
# FIXME: if not kafka, then do event-driven, no sleep
......@@ -490,16 +497,14 @@ async def wait_on_transfer(rpath, max_tries=300):
# FIXME: if not kafka, then do event-driven, no sleep
# wait until files are migrated
while rstr is None or 'status="online"' in rstr or 'status="Online"' in rstr or ret.returncode != 0: # noqa
while True:
await asyncio.sleep(10)
# FIXME: make use of asyncio.subprocess.run
ret = subprocess.run(["getfattr", "-n", "user.status", rpath],
stdout=subprocess.PIPE)
rstr = ret.stdout.decode()
retcode, stdout = run_proc_async(["getfattr", "-n", "user.status", rpath])
if retcode == 0 and 'status="online"' not in stdout.decode().lower():
return True
if tries > max_tries:
return False
tries += 1
return ret.returncode == 0
def check_files(in_folder: str,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment