diff --git a/webservice/config/serve_overview.yaml b/webservice/config/serve_overview.yaml index a1e6b6b3ec82aa017bd37e030cddc2ea082c81e2..10dea021628f61ba79facd76e7f44d63dbbc7f78 100644 --- a/webservice/config/serve_overview.yaml +++ b/webservice/config/serve_overview.yaml @@ -12,6 +12,7 @@ shell-commands: total-jobs: "sinfo -p exfel -o %A --noheader" tail-log: "tail -5000 web.log" cat-log: "cat web.log" + tail-log-monitor: "tail -5000 monitor.log" run-candidates: - "--run-high" diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..802d74596d365fb6f98d3c88efe3a809c1299ac7 --- /dev/null +++ b/webservice/job_monitor.py @@ -0,0 +1,225 @@ +"""Monitor calibration jobs in Slurm and send status updates""" +import argparse +import json +import locale +import logging +import time +from datetime import datetime, timezone +from pathlib import Path +from subprocess import run, PIPE + +from kafka import KafkaProducer +from kafka.errors import KafkaError + +try: + from .config import webservice as config + from .messages import MDC, Errors, MigrationError, Success + from .webservice import init_job_db, init_md_client +except ImportError: + from config import webservice as config + from messages import MDC, Errors, MigrationError, Success + from webservice import init_job_db, init_md_client + +log = logging.getLogger(__name__) + + +class NoOpProducer: + """Fills in for Kafka producer object when setting that up fails""" + def send(self, topic, value): + pass + + +def init_kafka_producer(config): + try: + return KafkaProducer( + bootstrap_servers=config['kafka']['brokers'], + value_serializer=lambda d: json.dumps(d).encode('utf-8'), + max_block_ms=2000, # Don't get stuck trying to send Kafka messages + ) + except KafkaError: + log.warning("Problem initialising Kafka producer; " + "Kafka notifications will not be sent.", exc_info=True) + return NoOpProducer() + + +def slurm_status(filter_user=True): + """ Return the status of slurm jobs by calling squeue + + :param filter_user: set to true to filter ony jobs from current user + :return: a dictionary indexed by slurm jobid and containing a tuple + of (status, run time) as values. + """ + cmd = ["squeue"] + if filter_user: + cmd += ["--me"] + res = run(cmd, stdout=PIPE) + if res.returncode == 0: + rlines = res.stdout.decode().split("\n") + statii = {} + for r in rlines[1:]: + try: + jobid, _, _, _, status, runtime, _, _ = r.split() + jobid = jobid.strip() + statii[jobid] = status, runtime + except ValueError: # not enough values to unpack in split + pass + return statii + + +def slurm_job_status(jobid): + """ Return the status of slurm job + + :param jobid: Slurm job Id + :return: Slurm state, Elapsed. + """ + cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"] + + res = run(cmd, stdout=PIPE) + if res.returncode == 0: + rlines = res.stdout.decode().split("\n") + + log.debug("Job {} state {}".format(jobid, rlines[2].split())) + if len(rlines[2].split()) == 3: + return rlines[2].replace("+", "").split() + return "NA", "NA", "NA" + + +def update_job_db(config): + """ Update the job database and send out updates to MDC + + :param config: configuration parsed from webservice YAML + """ + log.info("Starting jobs monitor") + conn = init_job_db(config) + mdc = init_md_client(config) + kafka_prod = init_kafka_producer(config) + kafka_topic = config['kafka']['topic'] + time_interval = int(config['web-service']['job-update-interval']) + + while True: + statii = slurm_status() + # Check that slurm is giving proper feedback + if statii is None: + time.sleep(time_interval) + continue + try: + c = conn.cursor() + c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ") + combined = {} + log.debug(f"SLURM info {statii}") + + for r in c.fetchall(): + rid, jobid, proposal, run, status, _time, det, action = r + log.debug(f"DB info {r}") + + cflg, cstatus, *_ = combined.setdefault((rid, action), ( + [], [], proposal, run, det + )) + if jobid in statii: + slstatus, runtime = statii[jobid] + query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" + c.execute(query, (slstatus, runtime, jobid)) + + cflg.append('R') + cstatus.append(f"{slstatus}-{runtime}") + else: + _, sltime, slstatus = slurm_job_status(jobid) + query = "UPDATE jobs SET status=? WHERE jobid LIKE ?" + c.execute(query, (slstatus, jobid)) + + if slstatus == 'COMPLETED': + cflg.append("A") + else: + cflg.append("NA") + cstatus.append(slstatus) + conn.commit() + + flg_order = {"R": 2, "A": 1, "NA": 0} + dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'} + for rid, action in combined: + if int(rid) == 0: # this job was not submitted from MyMDC + continue + flgs, statii, proposal, run, det = combined[rid, action] + # sort by least done status + flg = max(flgs, key=lambda i: flg_order[i]) + if flg != 'R': + log.info( + "Jobs finished - action: %s, run id: %s, status: %s", + action, rid, flg, + ) + if action == 'CORRECT': + try: + kafka_prod.send(kafka_topic, { + 'event': 'correction_complete', + 'proposal': proposal, + 'run': run, + 'detector': det, + 'success': (flg == 'A'), # A for Available + }) + except KafkaError: + log.warning("Error sending Kafka notification", + exc_info=True) + + if all(s.startswith('PD-') for s in statii): + # Avoid swamping myMdC with updates for jobs still pending. + log.debug( + "No update for action %s, rid %s: jobs pending", + action, rid + ) + continue + + msg = "\n".join(statii) + msg_debug = f"Update MDC {rid}, {msg}" + log.debug(msg_debug.replace('\n', ', ')) + + if action == 'CORRECT': + data = {'flg_cal_data_status': flg, + 'cal_pipeline_reply': msg} + if flg != 'R': + data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat() + response = mdc.update_run_api(rid, data) + + else: # action == 'DARK' but it's dark_request + data = {'dark_run': {'flg_status': dark_flags[flg], + 'calcat_feedback': msg}} + response = mdc.update_dark_run_api(rid, data) + + if response.status_code != 200: + log.error("Failed to update MDC for action %s, rid %s", + action, rid) + log.error(Errors.MDC_RESPONSE.format(response)) + except Exception: + log.error("Failure to update job DB", exc_info=True) + + time.sleep(time_interval) + + + +def main(argv=None): + # Ensure files are opened as UTF-8 by default, regardless of environment. + locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8')) + + parser = argparse.ArgumentParser( + description='Start the calibration webservice' + ) + parser.add_argument('--config-file', type=str, default=None) + parser.add_argument('--log-file', type=str, default='./monitor.log') + parser.add_argument( + '--log-level', type=str, default="INFO", choices=['INFO', 'DEBUG', 'ERROR'] # noqa + ) + args = parser.parse_args(argv) + + if args.config_file is not None: + config.configure(includes_for_dynaconf=[Path(args.config_file).absolute()]) + + fmt = '%(asctime)s - %(name)s - %(levelname)s - [%(filename)s:%(lineno)d] %(message)s' # noqa + logging.basicConfig( + filename=args.log_file, + level=getattr(logging, args.log_level), + format=fmt + ) + update_job_db(config) + + +if __name__ == "__main__": + main() diff --git a/webservice/serve_overview.py b/webservice/serve_overview.py index ded99ef5897692c578cd24974e8564d1ebc94810..2fdf7abeed59176fb1c86b080d2b51b0c1fb46c2 100644 --- a/webservice/serve_overview.py +++ b/webservice/serve_overview.py @@ -147,7 +147,16 @@ class RequestHandler(BaseHTTPRequestHandler): if ("Response error from MDC" not in l and "DEBUG" not in l)] tmpl = Template(self.templates["log-output"]) - log_output_r = tmpl.render(logout="<br>".join(last_n_lines[::-1])) + log_output_r = tmpl.render(service="Webservice", lines=last_n_lines) + + last_n_lines_monitor = [l for l in check_output( + config["shell-commands"]["tail-log-monitor"], shell=True + ).decode('utf8').split("\n") + if "DEBUG" not in l + ] + log_output_monitor_r = tmpl.render( + service="Job monitor", lines=last_n_lines_monitor + ) last_n_lines = check_output(self.cat_log_cmd, shell=True).decode('utf8').split("\n")[ @@ -280,6 +289,7 @@ class RequestHandler(BaseHTTPRequestHandler): tmpl = Template(self.templates["main-doc"]) message = tmpl.render(maxwell_status=maxwell_status_r, log_output=log_output_r, + log_output_monitor=log_output_monitor_r, last_characterizations=last_characterizations_r, last_correction=last_correction_r, running_jobs=running_jobs_r) diff --git a/webservice/templates/log_output.html b/webservice/templates/log_output.html index 8d70880e92045c981e7d8768a7bd21122a3b1f46..59b5c4661de4d4ad650e48a596963b840f8c1de0 100644 --- a/webservice/templates/log_output.html +++ b/webservice/templates/log_output.html @@ -1,6 +1,6 @@ <div class="block"> - <h2> Webservice log </h2> + <h2> {{ service }} log </h2> <div class="log-out"> - {{ logout }} + {{ lines | reverse | join("<br>") }} </div> </div> diff --git a/webservice/templates/main_doc.html b/webservice/templates/main_doc.html index 9537848a90f6daa3acd0a4e42e2c589bef982a27..6c7042a2516d75f898630ee7bfb236e0328c6bd3 100644 --- a/webservice/templates/main_doc.html +++ b/webservice/templates/main_doc.html @@ -10,5 +10,6 @@ {{ last_characterizations }} {{ last_correction }} {{ log_output }} +{{ log_output_monitor }} </body> </html> diff --git a/webservice/webservice.py b/webservice/webservice.py index 63d37e11fe3fa5270b935ef39f68a7bfb80f1a19..8785e0ece0d138fe2c95a2f0d945f0c017689817 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -2,23 +2,18 @@ import argparse import ast import asyncio import copy -import getpass import glob import inspect import json import locale import logging import os -import re import sqlite3 import sys -import time import urllib.parse from asyncio import get_event_loop, shield from datetime import datetime, timezone from pathlib import Path -from subprocess import PIPE, run -from threading import Thread from typing import Any, Dict, List, Optional, Tuple, Union import requests @@ -27,8 +22,6 @@ import zmq import zmq.asyncio import zmq.auth.thread from git import InvalidGitRepositoryError, Repo -from kafka import KafkaProducer -from kafka.errors import KafkaError from metadata_client.metadata_client import MetadataClient try: @@ -96,23 +89,6 @@ def init_config_repo(config): logging.info("Config repo is initialized") -def init_kafka_producer(config): - try: - return KafkaProducer( - bootstrap_servers=config['kafka']['brokers'], - value_serializer=lambda d: json.dumps(d).encode('utf-8'), - max_block_ms=2000, # Don't get stuck trying to send Kafka messages - ) - except KafkaError: - logging.warning("Problem initialising Kafka producer; " - "Kafka notifications will not be sent.", exc_info=True) - return NoOpProducer() - - -class NoOpProducer: - """Fills in for Kafka producer object when setting that up fails""" - def send(self, topic, value): - pass async def upload_config(config, yaml, instrument, cycle, proposal) -> bytes: @@ -248,48 +224,6 @@ async def run_proc_async(cmd: List[str]) -> Tuple[int, bytes]: return proc.returncode, stdout -def slurm_status(filter_user=True): - """ Return the status of slurm jobs by calling squeue - - :param filter_user: set to true to filter ony jobs from current user - :return: a dictionary indexed by slurm jobid and containing a tuple - of (status, run time) as values. - """ - cmd = ["squeue"] - if filter_user: - cmd += ["-u", getpass.getuser()] - res = run(cmd, stdout=PIPE) - if res.returncode == 0: - rlines = res.stdout.decode().split("\n") - statii = {} - for r in rlines[1:]: - try: - jobid, _, _, _, status, runtime, _, _ = r.split() - jobid = jobid.strip() - statii[jobid] = status, runtime - except ValueError: # not enough values to unpack in split - pass - return statii - - -def slurm_job_status(jobid): - """ Return the status of slurm job - - :param jobid: Slurm job Id - :return: Slurm state, Elapsed. - """ - cmd = ["sacct", "-j", str(jobid), "--format=JobID,Elapsed,state"] - - res = run(cmd, stdout=PIPE) - if res.returncode == 0: - rlines = res.stdout.decode().split("\n") - - logging.debug("Job {} state {}".format(jobid, rlines[2].split())) - if len(rlines[2].split()) == 3: - return rlines[2].replace("+", "").split() - return "NA", "NA", "NA" - - def query_rid(conn, rid) -> bytes: c = conn.cursor() c.execute("SELECT * FROM jobs WHERE rid LIKE ?", (rid,)) @@ -351,116 +285,6 @@ def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: return cmd -def update_job_db(config): - """ Update the job database and send out updates to MDC - - This runs in its own thread. - - :param config: configuration parsed from webservice YAML - """ - logging.info("Starting config db handling") - conn = init_job_db(config) - mdc = init_md_client(config) - kafka_prod = init_kafka_producer(config) - kafka_topic = config['kafka']['topic'] - time_interval = int(config['web-service']['job-update-interval']) - - while True: - statii = slurm_status() - # Check that slurm is giving proper feedback - if statii is None: - time.sleep(time_interval) - continue - try: - c = conn.cursor() - c.execute("SELECT * FROM jobs WHERE status IN ('R', 'PD', 'CG') ") - combined = {} - logging.debug("SLURM info {}".format(statii)) - - for r in c.fetchall(): - rid, jobid, proposal, run, status, _time, det, action = r - logging.debug("DB info {}".format(r)) - - cflg, cstatus, *_ = combined.setdefault((rid, action), ( - [], [], proposal, run, det - )) - if jobid in statii: - slstatus, runtime = statii[jobid] - query = "UPDATE jobs SET status=?, time=? WHERE jobid LIKE ?" - c.execute(query, (slstatus, runtime, jobid)) - - cflg.append('R') - cstatus.append("{}-{}".format(slstatus, runtime)) - else: - _, sltime, slstatus = slurm_job_status(jobid) - query = "UPDATE jobs SET status=? WHERE jobid LIKE ?" - c.execute(query, (slstatus, jobid)) - - if slstatus == 'COMPLETED': - cflg.append("A") - else: - cflg.append("NA") - cstatus.append(slstatus) - conn.commit() - - flg_order = {"R": 2, "A": 1, "NA": 0} - dark_flags = {'NA': 'E', 'R': 'IP', 'A': 'F'} - for rid, action in combined: - if int(rid) == 0: # this job was not submitted from MyMDC - continue - flgs, statii, proposal, run, det = combined[rid, action] - # sort by least done status - flg = max(flgs, key=lambda i: flg_order[i]) - if flg != 'R': - logging.info( - "Jobs finished - action: %s, run id: %s, status: %s", - action, rid, flg, - ) - if action == 'CORRECT': - try: - kafka_prod.send(kafka_topic, { - 'event': 'correction_complete', - 'proposal': proposal, - 'run': run, - 'detector': det, - 'success': (flg == 'A'), # A for Available - }) - except KafkaError: - logging.warning("Error sending Kafka notification", - exc_info=True) - - if all(s.startswith('PD-') for s in statii): - # Avoid swamping myMdC with updates for jobs still pending. - logging.debug( - "No update for action %s, rid %s: jobs pending", - action, rid - ) - continue - - msg = "\n".join(statii) - msg_debug = f"Update MDC {rid}, {msg}" - logging.debug(msg_debug.replace('\n', ', ')) - - if action == 'CORRECT': - data = {'flg_cal_data_status': flg, - 'cal_pipeline_reply': msg} - if flg != 'R': - data['cal_last_end_at'] = datetime.now(tz=timezone.utc).isoformat() - response = mdc.update_run_api(rid, data) - - else: # action == 'DARK' but it's dark_request - data = {'dark_run': {'flg_status': dark_flags[flg], - 'calcat_feedback': msg}} - response = mdc.update_dark_run_api(rid, data) - - if response.status_code != 200: - logging.error("Failed to update MDC for action %s, rid %s", - action, rid) - logging.error(Errors.MDC_RESPONSE.format(response)) - except Exception: - logging.error("Failure to update job DB", exc_info=True) - - time.sleep(time_interval) async def run_action(job_db, cmd, mode, proposal, run, rid) -> str: @@ -1327,12 +1151,6 @@ def main(argv: Optional[List[str]] = None): format=fmt ) - # Update job statuses from Slurm in a separate thread - slurm_monitor_thread = Thread( - target=update_job_db, args=(config,), daemon=True - ) - slurm_monitor_thread.start() - # Launch the ZMQ server to handle requests for calibration server = ActionsServer(config, mode) loop = asyncio.get_event_loop()