diff --git a/webservice/webservice.py b/webservice/webservice.py index 1563807952ec0923922c3e43a378b26abf245d61..5770aba730fdb1e0988f3bbd087d56d872a31f90 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -9,10 +9,13 @@ import json import logging import os import sqlite3 +import time import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime from pathlib import Path +from subprocess import run, PIPE +from threading import Thread from typing import Any, Dict, List, Optional import yaml @@ -28,7 +31,7 @@ except ImportError: from messages import MDC, Errors, Success -async def init_job_db(config): +def init_job_db(config): """ Initialize the sqlite job database A new database is created if no pre-existing one is present. A single @@ -39,8 +42,6 @@ async def init_job_db(config): :param config: the configuration parsed from the webservice YAML config :return: a sqlite3 connection instance to the database """ - # FIXME: sqlite3 is synchronous, it should be replaced with - # https://pypi.org/project/databases/ logging.info("Initializing database") conn = sqlite3.connect(config['web-service']['job-db']) conn.execute( @@ -50,31 +51,22 @@ async def init_job_db(config): return conn -async def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient: +def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient: """Initialize an MDC client connection. :param config: the configuration parsed from the webservice YAML config :return: an MDC client connection """ - # TODO: could the client be a global? This would recuce passing it around - - # During MetadataClient initialisation, this object requests authentication from MyMDC - # As such, it is needed to run the initialisation in a thread. - def _init_client(): - mdconf = config['metadata-client'] - client_conn = MetadataClient(client_id=mdconf['user-id'], - client_secret=mdconf['user-secret'], - user_email=mdconf['user-email'], - token_url=mdconf['token-url'], - refresh_url=mdconf['refresh-url'], - auth_url=mdconf['auth-url'], - scope=mdconf['scope'], - base_api_url=mdconf['base-api-url']) - return client_conn - - loop = get_event_loop() - client = await shield(loop.run_in_executor(None, _init_client)) - return client + mdconf = config['metadata-client'] + client_conn = MetadataClient(client_id=mdconf['user-id'], + client_secret=mdconf['user-secret'], + user_email=mdconf['user-email'], + token_url=mdconf['token-url'], + refresh_url=mdconf['refresh-url'], + auth_url=mdconf['auth-url'], + scope=mdconf['scope'], + base_api_url=mdconf['base-api-url']) + return client_conn def init_config_repo(config): @@ -218,7 +210,7 @@ async def run_proc_async(cmd: List[str]) -> (int, bytes): return proc.returncode, stdout -async def slurm_status(filter_user=True): +def slurm_status(filter_user=True): """ Return the status of slurm jobs by calling squeue :param filter_user: set to true to filter ony jobs from current user @@ -228,9 +220,9 @@ async def slurm_status(filter_user=True): cmd = ["squeue"] if filter_user: cmd += ["-u", getpass.getuser()] - retcode, stdout = await run_proc_async(cmd) - if retcode == 0: - rlines = stdout.decode().split("\n") + res = run(cmd, stdout=PIPE) + if res.returncode == 0: + rlines = res.stdout.decode().split("\n") statii = {} for r in rlines[1:]: try: @@ -243,7 +235,7 @@ async def slurm_status(filter_user=True): return None -async def slurm_job_status(jobid): +def slurm_job_status(jobid): """ Return the status of slurm job :param jobid: Slurm job Id @@ -251,9 +243,9 @@ async def slurm_job_status(jobid): """ cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"] - retcode, stdout = await run_proc_async(cmd) - if retcode == 0: - rlines = stdout.decode().split("\n") + res = run(cmd, stdout=PIPE) + if res.returncode == 0: + rlines = res.stdout.decode().split("\n") logging.debug("Job {} state {}".format(jobid, rlines[2].split())) if len(rlines[2].split()) == 3: @@ -322,22 +314,22 @@ def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: return cmd -async def update_job_db(config): +def update_job_db(config): """ Update the job database and send out updates to MDC - This coro runs as background task + This runs in its own thread. :param config: configuration parsed from webservice YAML """ logging.info("Starting config db handling") - conn = await init_job_db(config) - mdc = await init_md_client(config) + conn = init_job_db(config) + mdc = init_md_client(config) time_interval = int(config['web-service']['job-update-interval']) while True: - statii = await slurm_status() + statii = slurm_status() # Check that slurm is giving proper feedback if statii is None: - await asyncio.sleep(time_interval) + time.sleep(time_interval) continue try: c = conn.cursor() @@ -346,7 +338,7 @@ async def update_job_db(config): logging.debug("SLURM info {}".format(statii)) for r in c.fetchall(): - rid, jobid, proposal, run, status, time, _, action = r + rid, jobid, _proposal, _run, status, _time, _det, action = r logging.debug("DB info {}".format(r)) cflg, cstatus = combined.get((rid, action), ([], [])) @@ -358,7 +350,7 @@ async def update_job_db(config): cflg.append('R') cstatus.append("{}-{}".format(slstatus, runtime)) else: - _, sltime, slstatus = await slurm_job_status(jobid) + _, sltime, slstatus = slurm_job_status(jobid) query = "UPDATE jobs SET status=? WHERE jobid LIKE ?" c.execute(query, (slstatus, jobid)) @@ -401,7 +393,7 @@ async def update_job_db(config): except Exception: logging.error(f"Failure to update job DB", exc_info=True) - await asyncio.sleep(time_interval) + time.sleep(time_interval) async def copy_untouched_files(file_list, out_folder, run): @@ -656,8 +648,8 @@ class ActionsServer: @classmethod async def ainit(cls, config, mode): init_config_repo(config['config-repo']) - job_db = await init_job_db(config) - mdc = await init_md_client(config) + job_db = init_job_db(config) + mdc = init_md_client(config) return cls(config, mode, job_db, mdc) @classmethod @@ -1191,8 +1183,15 @@ def main(): logging.basicConfig(filename=logfile, level=getattr(logging, args['logging']), format=fmt) + + # Update job statuses from Slurm in a separate thread + slurm_monitor_thread = Thread( + target=update_job_db, args=(config,), daemon=True + ) + slurm_monitor_thread.start() + + # Launch the ZMQ server to handle requests for calibration loop = asyncio.get_event_loop() - loop.create_task(update_job_db(config)) loop.run_until_complete(ActionsServer.launch(config, mode)) loop.close()