diff --git a/setup.py b/setup.py index 879da9c40515aa4ac4fea1d8b80b64c1ca0312d5..cfd9aaf32ebd94b6117163f02b30a367ca5a2e52 100644 --- a/setup.py +++ b/setup.py @@ -79,7 +79,7 @@ install_requires = [ "lxml==4.5.0", "markupsafe==2.0.1", "matplotlib==3.4.2", - "metadata_client==3.0.8", + "metadata_client==4.0.0", "nbclient==0.5.1", "nbconvert==5.6.1", "nbformat==5.0.7", diff --git a/webservice/add_reports.py b/webservice/add_reports.py new file mode 100644 index 0000000000000000000000000000000000000000..5dafdda744718544cda5c82f700e952ef934422e --- /dev/null +++ b/webservice/add_reports.py @@ -0,0 +1,79 @@ +"""Backfill reports to myMdC from job databases before we started injecting them""" +import argparse +import os.path +import re +import shlex +import sys +import sqlite3 + +from .config import webservice as config +from .webservice import init_md_client + +ap = argparse.ArgumentParser() +ap.add_argument("db_file") +ap.add_argument("--really", action="store_true") +args = ap.parse_args() + +db_file = sys.argv[1] +conn = sqlite3.connect(args.db_file) +conn.row_factory = sqlite3.Row + +mdc = init_md_client(config) +print("MyMdC API is:", mdc.base_api_url) + +rows = conn.execute( + "SELECT det_type, karabo_id, command, success, " + "req_id, proposal, run, action, mymdc_id, timestamp " + "FROM executions JOIN requests USING (req_id) " + "WHERE success IS NOT NULL" +).fetchall() + +nreports = 0 +for i, r in enumerate(rows): + cmd_args = shlex.split(r["command"]) + try: + report_path = cmd_args[cmd_args.index("--report-to") + 1] + ".pdf" + except (ValueError, IndexError): + print("Couldn't find report path in %r", cmd_args) + continue + + # Workaround: we retrospectively renamed folders like 'r67' to 'r0067'. + def runnr_sub(m: re.Match): + return f'/r{m[1].zfill(4)}/' + report_path = re.sub(r'/r(\d{1,3})/', runnr_sub, report_path, count=1) + + if not os.path.isfile(report_path): + print(f"Report file {report_path} missing (p{r['proposal']}, r{r['run']})") + continue + + desc = f"{r['karabo_id']} detector corrections" + if not r["success"]: + desc += " (errors occurred)" + + nreports += 1 + if not args.really: + continue + + response = mdc.create_report_api( + { + "name": os.path.basename(report_path), + "cal_report_path": os.path.dirname(report_path).rstrip("/") + "/", + "cal_report_at": r["timestamp"], + "run_id": r["mymdc_id"], + "description": desc, + } + ) + + if response.status_code >= 400: + print( + f"Failed to add report to MDC for run ID {r['mymdc_id']}: " + f"HTTP status {response.status_code}", + ) + if i % 20 == 0: + print(f"Done {i}") + +print(f"Found {nreports} reports from {len(rows)} executions") +if args.really: + print(f" Injected to myMdC") +else: + print(f" Re-run with --really to add to myMdC") diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index cde702c69c71e2935027490b4073ffd0955e4951..d30ed8230ea62c527db01351f4d5c50a14834ddf 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -3,6 +3,8 @@ import argparse import json import locale import logging +import os.path +import shlex import signal import time from datetime import datetime, timezone @@ -232,7 +234,8 @@ class JobsMonitor: ).fetchall()] success = set(statuses) == {'COMPLETED'} r = self.job_db.execute( - "SELECT det_type, karabo_id, req_id, proposal, run, action " + "SELECT det_type, karabo_id, command, " + "req_id, proposal, run, action, mymdc_id, timestamp " "FROM executions JOIN requests USING (req_id)" "WHERE exec_id = ?", (exec_id,) @@ -259,6 +262,10 @@ class JobsMonitor: log.warning("Error sending Kafka notification", exc_info=True) + self.record_correction_report( + r['mymdc_id'], r['command'], r['karabo_id'], success, r['timestamp'] + ) + return r['req_id'] def process_request_finished(self, req_id): @@ -331,9 +338,45 @@ class JobsMonitor: log.error("Failed to update MDC dark run id %s", dark_run_id) log.error(Errors.MDC_RESPONSE.format(response)) + def record_correction_report( + self, mymdc_run_id, command, karabo_id, success, request_time: str + ): + """Add report to MyMdC when a correction execution has finished""" + args = shlex.split(command) + try: + report_path = args[args.index("--report-to") + 1] + ".pdf" + except (ValueError, IndexError): + log.error("Couldn't find report path in %r", args) + return + + if not os.path.isfile(report_path): + log.error("Jobs finished, but report file %s missing", report_path) + return + + desc = f"{karabo_id} detector corrections" + if not success: + desc += " (errors occurred)" + + log.debug("Adding report file %s to MDC for run ID %s", + report_path, mymdc_run_id) + + response = self.mdc.create_report_api({ + "name": os.path.basename(report_path), + "cal_report_path": os.path.dirname(report_path).rstrip('/') + '/', + "cal_report_at": request_time, + "run_id": mymdc_run_id, + "description": desc, + }) + + if response.status_code >= 400: + log.error("Failed to add report to MDC for run ID %s: HTTP status %s", + mymdc_run_id, response.status_code) + + def interrupted(signum, frame): raise KeyboardInterrupt + def main(argv=None): # Ensure files are opened as UTF-8 by default, regardless of environment. locale.setlocale(locale.LC_CTYPE, ('en_US', 'UTF-8')) diff --git a/webservice/webservice.py b/webservice/webservice.py index 3379516d8d151947fb59424cb169bfd49531365d..d43b787f1066f54f6c4dd741e0cd961f721dd7bc 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -956,7 +956,7 @@ class ActionsServer: This will trigger a correction process to be launched for that run in the given cycle and proposal. """ - request_time = datetime.now() + request_time = datetime.now(tz=timezone.utc) try: runnr = runnr.strip('r') @@ -1093,7 +1093,7 @@ class ActionsServer: return queued_msg.encode() async def handle_recorrect(self, rid, instrument, cycle, proposal, runnr): - request_time = datetime.now() + request_time = datetime.now(tz=timezone.utc) try: if self.check_unfinished_correction(proposal, int(runnr)): @@ -1219,7 +1219,7 @@ class ActionsServer: :param runnr: is the run number in integer form, i.e. without leading "r" """ - request_time = datetime.now() + request_time = datetime.now(tz=timezone.utc) try: pdus, karabo_das, wait_runs = ast.literal_eval(','.join(extra))