From 09b48f21184057dcf4734cc8ed91c3b2fcca7521 Mon Sep 17 00:00:00 2001
From: Thomas Kluyver <thomas@kluyver.me.uk>
Date: Wed, 24 Feb 2021 11:46:29 +0000
Subject: [PATCH] Use asyncio subprocess interface in webservice code

---
 webservice/webservice.py | 45 ++++++++++++++++++++++------------------
 1 file changed, 25 insertions(+), 20 deletions(-)

diff --git a/webservice/webservice.py b/webservice/webservice.py
index 0f2d28a23..29924e0ab 100644
--- a/webservice/webservice.py
+++ b/webservice/webservice.py
@@ -7,7 +7,6 @@ import json
 import logging
 import os
 import sqlite3
-import subprocess  # FIXME: use asyncio.create_subprocess_*
 import traceback
 import urllib.parse
 from asyncio import get_event_loop, shield
@@ -206,6 +205,16 @@ async def change_config(socket, config, updated_config, karabo_id, instrument,
     socket.send(yaml.dump(new_conf, default_flow_style=False).encode())
 
 
+async def run_proc_async(cmd: List[str]) -> (int, bytes):
+    """Run a subprocess to completion using asyncio, capturing stdout
+
+    Returns the numeric exit code and stdout (bytes)
+    """
+    proc = await asyncio.create_subprocess_exec(*cmd, stdout=asyncio.subprocess.PIPE)
+    stdout, _ = proc.communicate()
+    return proc.returncode, stdout
+
+
 async def slurm_status(filter_user=True):
     """ Return the status of slurm jobs by calling squeue
 
@@ -216,9 +225,9 @@ async def slurm_status(filter_user=True):
     cmd = ["squeue"]
     if filter_user:
         cmd += ["-u", getpass.getuser()]
-    ret = subprocess.run(cmd, stdout=subprocess.PIPE)  # FIXME: asyncio
-    if ret.returncode == 0:
-        rlines = ret.stdout.decode().split("\n")
+    retcode, stdout = run_proc_async(cmd)
+    if retcode == 0:
+        rlines = retcode.decode().split("\n")
         statii = {}
         for r in rlines[1:]:
             try:
@@ -239,9 +248,9 @@ async def slurm_job_status(jobid):
     """
     cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"]
 
-    ret = subprocess.run(cmd, stdout=subprocess.PIPE)  # FIXME: asyncio
-    if ret.returncode == 0:
-        rlines = ret.stdout.decode().split("\n")
+    retcode, stdout = run_proc_async(cmd)
+    if retcode == 0:
+        rlines = stdout.decode().split("\n")
 
         logging.debug("Job {} state {}".format(jobid, rlines[2].split()))
         if len(rlines[2].split()) == 3:
@@ -422,15 +431,15 @@ async def run_action(job_db, cmd, mode, proposal, run, rid):
     # FIXME: this coro has too many returns that can be simplified
     if mode == "prod":
         logging.info(" ".join(cmd))
-        ret = subprocess.run(cmd, stdout=subprocess.PIPE)  # FIXME: asyncio
-        if ret.returncode == 0:
+        retcode, stdout = run_proc_async(cmd)
+        if retcode == 0:
             if "DARK" in cmd:
                 logging.info(Success.START_CHAR.format(proposal, run))
             else:
                 logging.info(Success.START_CORRECTION.format(proposal, run))
             # enter jobs in job db
             c = job_db.cursor()  # FIXME: asyncio
-            rstr = ret.stdout.decode()
+            rstr = stdout.decode()
 
             query = "INSERT INTO jobs VALUES ('{rid}', '{jobid}', '{proposal}', '{run}', 'PD', '{now}', '{det}', '{act}')"  # noqa
             for r in rstr.split("\n"):
@@ -447,8 +456,8 @@ async def run_action(job_db, cmd, mode, proposal, run, rid):
             else:
                 return Success.START_CORRECTION.format(proposal, run)
         else:
-            logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode))
-            return Errors.JOB_LAUNCH_FAILED.format(cmd, ret.returncode)
+            logging.error(Errors.JOB_LAUNCH_FAILED.format(cmd, retcode))
+            return Errors.JOB_LAUNCH_FAILED.format(cmd, retcode)
 
     else:
         if "DARK" in cmd:
@@ -476,8 +485,6 @@ async def wait_on_transfer(rpath, max_tries=300):
     # check the copy is finished (ie. that the files are complete).
     if 'pnfs' in os.path.realpath(rpath):
         return True
-    rstr = None
-    ret = None
     tries = 0
 
     # FIXME: if not kafka, then do event-driven, no sleep
@@ -490,16 +497,14 @@ async def wait_on_transfer(rpath, max_tries=300):
 
     # FIXME: if not kafka, then do event-driven, no sleep
     # wait until files are migrated
-    while rstr is None or 'status="online"' in rstr or 'status="Online"' in rstr or ret.returncode != 0:  # noqa
+    while True:
         await asyncio.sleep(10)
-        # FIXME: make use of asyncio.subprocess.run
-        ret = subprocess.run(["getfattr", "-n", "user.status", rpath],
-                             stdout=subprocess.PIPE)
-        rstr = ret.stdout.decode()
+        retcode, stdout = run_proc_async(["getfattr", "-n", "user.status", rpath])
+        if retcode == 0 and 'status="online"' not in stdout.decode().lower():
+            return True
         if tries > max_tries:
             return False
         tries += 1
-    return ret.returncode == 0
 
 
 def check_files(in_folder: str,
-- 
GitLab