diff --git a/webservice/README.md b/webservice/README.md index fef79ac81a6c2c76b30755fff4dd1d6a2bdfb485..a2921c4bcb2c2b70c30d889ac30f20f7c0638f1a 100644 --- a/webservice/README.md +++ b/webservice/README.md @@ -155,3 +155,37 @@ Use ``` to display a list of available options. + +Testing +------- + +There is a test environment on ``max-exfl017``, separate from the production +instance. + +```bash +ssh xcaltst@max-exfl017.desy.de +cd /home/xcaltst/pycalibration + +# Get the code you want to test +git pull +git checkout <branch-to-test> + +# Stop the already running server, start a new one with your code +ps aux | grep webservice/webservice.py | awk '{ print $2}' | xargs kill +source ~/pycalibration_venv/bin/activate +nohup ~/pycalibration_venv/bin/python ~/pycalibration/webservice/webservice.py \ + --mode prod --logging INFO --config-file ~/pycalibration/webservice/webservice.yaml & + +# Follow the logs +tail -f ~/pycalibration/web.log +``` + +The [test instance of myMdC](https://in.xfel.eu/test_metadata/) talks to this +test service. Using data in the [CALLAB proposal](https://in.xfel.eu/test_metadata/proposals/259) +there, request recalibration ([runs tab](https://in.xfel.eu/test_metadata/proposals/259#proposal-runs)) +and processing of dark runs ([calibration constants tab](https://in.xfel.eu/test_metadata/proposals/259#proposal-calibration)) +to send the 'correct' and 'dark_request' actions. The webservice logs and the +status in myMdC should update as the processing occurs. + +The command ``squeue -u xcaltst`` will show running & pending Slurm jobs started +by this test system. diff --git a/webservice/webservice.py b/webservice/webservice.py index 1563807952ec0923922c3e43a378b26abf245d61..43b0ecc6e6145361dca6d1d767d8135de1a8ec6d 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -9,10 +9,13 @@ import json import logging import os import sqlite3 +import time import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime from pathlib import Path +from subprocess import run, PIPE +from threading import Thread from typing import Any, Dict, List, Optional import yaml @@ -28,7 +31,7 @@ except ImportError: from messages import MDC, Errors, Success -async def init_job_db(config): +def init_job_db(config): """ Initialize the sqlite job database A new database is created if no pre-existing one is present. A single @@ -39,8 +42,6 @@ async def init_job_db(config): :param config: the configuration parsed from the webservice YAML config :return: a sqlite3 connection instance to the database """ - # FIXME: sqlite3 is synchronous, it should be replaced with - # https://pypi.org/project/databases/ logging.info("Initializing database") conn = sqlite3.connect(config['web-service']['job-db']) conn.execute( @@ -50,31 +51,22 @@ async def init_job_db(config): return conn -async def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient: +def init_md_client(config: Dict[str, Dict[str, str]]) -> MetadataClient: """Initialize an MDC client connection. :param config: the configuration parsed from the webservice YAML config :return: an MDC client connection """ - # TODO: could the client be a global? This would recuce passing it around - - # During MetadataClient initialisation, this object requests authentication from MyMDC - # As such, it is needed to run the initialisation in a thread. - def _init_client(): - mdconf = config['metadata-client'] - client_conn = MetadataClient(client_id=mdconf['user-id'], - client_secret=mdconf['user-secret'], - user_email=mdconf['user-email'], - token_url=mdconf['token-url'], - refresh_url=mdconf['refresh-url'], - auth_url=mdconf['auth-url'], - scope=mdconf['scope'], - base_api_url=mdconf['base-api-url']) - return client_conn - - loop = get_event_loop() - client = await shield(loop.run_in_executor(None, _init_client)) - return client + mdconf = config['metadata-client'] + client_conn = MetadataClient(client_id=mdconf['user-id'], + client_secret=mdconf['user-secret'], + user_email=mdconf['user-email'], + token_url=mdconf['token-url'], + refresh_url=mdconf['refresh-url'], + auth_url=mdconf['auth-url'], + scope=mdconf['scope'], + base_api_url=mdconf['base-api-url']) + return client_conn def init_config_repo(config): @@ -218,7 +210,7 @@ async def run_proc_async(cmd: List[str]) -> (int, bytes): return proc.returncode, stdout -async def slurm_status(filter_user=True): +def slurm_status(filter_user=True): """ Return the status of slurm jobs by calling squeue :param filter_user: set to true to filter ony jobs from current user @@ -228,9 +220,9 @@ async def slurm_status(filter_user=True): cmd = ["squeue"] if filter_user: cmd += ["-u", getpass.getuser()] - retcode, stdout = await run_proc_async(cmd) - if retcode == 0: - rlines = stdout.decode().split("\n") + res = run(cmd, stdout=PIPE) + if res.returncode == 0: + rlines = res.stdout.decode().split("\n") statii = {} for r in rlines[1:]: try: @@ -243,7 +235,7 @@ async def slurm_status(filter_user=True): return None -async def slurm_job_status(jobid): +def slurm_job_status(jobid): """ Return the status of slurm job :param jobid: Slurm job Id @@ -251,9 +243,9 @@ async def slurm_job_status(jobid): """ cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"] - retcode, stdout = await run_proc_async(cmd) - if retcode == 0: - rlines = stdout.decode().split("\n") + res = run(cmd, stdout=PIPE) + if res.returncode == 0: + rlines = res.stdout.decode().split("\n") logging.debug("Job {} state {}".format(jobid, rlines[2].split())) if len(rlines[2].split()) == 3: @@ -322,22 +314,22 @@ def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: return cmd -async def update_job_db(config): +def update_job_db(config): """ Update the job database and send out updates to MDC - This coro runs as background task + This runs in its own thread. :param config: configuration parsed from webservice YAML """ logging.info("Starting config db handling") - conn = await init_job_db(config) - mdc = await init_md_client(config) + conn = init_job_db(config) + mdc = init_md_client(config) time_interval = int(config['web-service']['job-update-interval']) while True: - statii = await slurm_status() + statii = slurm_status() # Check that slurm is giving proper feedback if statii is None: - await asyncio.sleep(time_interval) + time.sleep(time_interval) continue try: c = conn.cursor() @@ -346,7 +338,7 @@ async def update_job_db(config): logging.debug("SLURM info {}".format(statii)) for r in c.fetchall(): - rid, jobid, proposal, run, status, time, _, action = r + rid, jobid, _proposal, _run, status, _time, _det, action = r logging.debug("DB info {}".format(r)) cflg, cstatus = combined.get((rid, action), ([], [])) @@ -358,7 +350,7 @@ async def update_job_db(config): cflg.append('R') cstatus.append("{}-{}".format(slstatus, runtime)) else: - _, sltime, slstatus = await slurm_job_status(jobid) + _, sltime, slstatus = slurm_job_status(jobid) query = "UPDATE jobs SET status=? WHERE jobid LIKE ?" c.execute(query, (slstatus, jobid)) @@ -399,9 +391,9 @@ async def update_job_db(config): action, rid) logging.error(Errors.MDC_RESPONSE.format(response)) except Exception: - logging.error(f"Failure to update job DB", exc_info=True) + logging.error("Failure to update job DB", exc_info=True) - await asyncio.sleep(time_interval) + time.sleep(time_interval) async def copy_untouched_files(file_list, out_folder, run): @@ -634,11 +626,13 @@ async def update_mdc_status(mdc: MetadataClient, action: str, class ActionsServer: - def __init__(self, config, mode, job_db, mdc): + def __init__(self, config, mode): self.config = config self.mode = mode - self.job_db = job_db - self.mdc = mdc + + init_config_repo(config['config-repo']) + self.job_db = init_job_db(config) + self.mdc = init_md_client(config) # Set up a ZMQ socket to listen for requests self.zmq_ctx = zmq.asyncio.Context() @@ -652,19 +646,6 @@ class ActionsServer: self.socket.bind("{}:{}".format(config['web-service']['bind-to'], config['web-service']['port'])) - # __init__ can't be async - this is a workaround - @classmethod - async def ainit(cls, config, mode): - init_config_repo(config['config-repo']) - job_db = await init_job_db(config) - mdc = await init_md_client(config) - return cls(config, mode, job_db, mdc) - - @classmethod - async def launch(cls, config, mode): - server = await cls.ainit(config, mode) - return await server.run() - async def run(self): """The main server loop @@ -691,7 +672,7 @@ class ActionsServer: """Handle one request, and return the reply to be sent""" try: # protect against unparseable requests req = ast.literal_eval(raw_req.decode('utf-8')) - except SyntaxError as e: + except (SyntaxError, ValueError) as e: logging.error(str(e)) return Errors.REQUEST_FAILED.encode() @@ -824,7 +805,7 @@ class ActionsServer: out_folder, runnr)) except Exception as corr_e: - logging.error(f"Error during correction", exc_info=corr_e) + logging.error("Error during correction", exc_info=corr_e) await update_mdc_status(self.mdc, 'correct', rid, Errors.REQUEST_FAILED) return @@ -1191,9 +1172,17 @@ def main(): logging.basicConfig(filename=logfile, level=getattr(logging, args['logging']), format=fmt) + + # Update job statuses from Slurm in a separate thread + slurm_monitor_thread = Thread( + target=update_job_db, args=(config,), daemon=True + ) + slurm_monitor_thread.start() + + # Launch the ZMQ server to handle requests for calibration + server = ActionsServer(config, mode) loop = asyncio.get_event_loop() - loop.create_task(update_job_db(config)) - loop.run_until_complete(ActionsServer.launch(config, mode)) + loop.run_until_complete(server.run()) loop.close()