From 83dbff435c7e9400968af9a6780acb595a2dbcc4 Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Mon, 17 Apr 2023 09:37:50 +0100 Subject: [PATCH] Hopefully fix webservice database lock timeouts --- webservice/job_monitor.py | 23 +++++++++++-------- webservice/webservice.py | 48 +++++++++++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index d0662a203..f86a6c771 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -15,11 +15,11 @@ from kafka.errors import KafkaError try: from .config import webservice as config from .messages import MDC, Errors, MigrationError, Success - from .webservice import init_job_db, init_md_client + from .webservice import init_job_db, init_md_client, time_db_transaction except ImportError: from config import webservice as config from messages import MDC, Errors, MigrationError, Success - from webservice import init_job_db, init_md_client + from webservice import init_job_db, init_md_client, time_db_transaction log = logging.getLogger(__name__) @@ -148,17 +148,18 @@ class JobsMonitor: Newly completed executions are present with an empty list. """ - c = self.job_db.cursor() - c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0") - statii = slurm_status() # Check that slurm is giving proper feedback if statii is None: return {} log.debug(f"SLURM info {statii}") + jobs_to_check = self.job_db.execute( + "SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0" + ).fetchall() ongoing_jobs_by_exn = {} - for r in c.fetchall(): + updates = [] + for r in jobs_to_check: log.debug(f"Job in DB before update: %s", tuple(r)) execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], []) @@ -173,12 +174,14 @@ class JobsMonitor: _, runtime, slstatus = slurm_job_status(r['job_id']) finished = True - c.execute( + updates.append((finished, runtime, slstatus, r['job_id'])) + + with time_db_transaction(self.job_db, 'Update jobs'): + self.job_db.executemany( "UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?", - (finished, runtime, slstatus, r['job_id']) + updates ) - self.job_db.commit() return ongoing_jobs_by_exn def process_request_still_going(self, req_id, running_jobs_info): @@ -216,7 +219,7 @@ class JobsMonitor: "WHERE exec_id = ?", (exec_id,) ).fetchone() - with self.job_db: + with time_db_transaction(self.job_db, 'Update execution'): self.job_db.execute( "UPDATE executions SET success = ? WHERE exec_id = ?", (success, exec_id) diff --git a/webservice/webservice.py b/webservice/webservice.py index 335cacdbe..a0f9d5679 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -1,3 +1,5 @@ +import time + import argparse import ast import asyncio @@ -301,6 +303,28 @@ def parse_config(cmd: List[str], config: Dict[str, Any]) -> List[str]: return cmd +class time_db_transaction: + t_start = 0 + + def __init__(self, conn: sqlite3.Connection, label: str): + self.conn = conn + self.label = label + + def __enter__(self): + self.conn.__enter__() + self.t_start = time.perf_counter() + return + + def __exit__(self, exc_type, exc_val, exc_tb): + t1 = time.perf_counter() + self.conn.__exit__(exc_type, exc_val, exc_tb) + t2 = time.perf_counter() + t_open = (t1 - self.t_start) * 1000 + t_finish = (t2 - t1) * 1000 + op = 'commit' if exc_val is None else 'rollback' + logging.debug("DB change (%s): %.1f ms in transaction, %.1f ms %s", + self.label, t_open, t_finish, op) + return False async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str: @@ -342,11 +366,11 @@ async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str: jobs = [] for jobid in jobids.split(','): jobs.append((int(jobid.strip()), exec_id)) - c.executemany( - "INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)", - jobs - ) - job_db.commit() + with time_db_transaction(job_db, 'Insert jobs'): + job_db.executemany( + "INSERT INTO slurm_jobs VALUES (?, ?, 'PD', 0, 0)", + jobs + ) else: # mode == "sim" if "DARK" in cmd: @@ -934,8 +958,8 @@ class ActionsServer: proposal = self._normalise_proposal_num(proposal) pconf_full = self.load_proposal_config(cycle, proposal) - with self.job_db: - cur = self.job_db.execute( + with time_db_transaction(self.job_db, 'Insert request'): + cur = self.job_db.execute( # 2 "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", (rid, proposal, int(runnr), request_time) ) @@ -1054,7 +1078,7 @@ class ActionsServer: request_time = datetime.now() try: - with self.job_db: + with time_db_transaction(self.job_db, 'Insert request'): cur = self.job_db.execute( "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'CORRECT', ?)", (rid, proposal, int(runnr), request_time.strftime('%Y-%m-%dT%H:%M:%S')) @@ -1125,7 +1149,7 @@ class ActionsServer: f'{reports_dir}/{karabo_id}_RECORRECT_{request_time:%y%m%d_%H%M%S}' ] - with self.job_db: + with time_db_transaction(self.job_db, 'Insert execution'): cur = self.job_db.execute( "INSERT INTO executions VALUES (NULL, ?, ?, NULL, ?, NULL)", (req_id, shlex.join(cmd), karabo_id) @@ -1185,7 +1209,7 @@ class ActionsServer: karabo_id=karabo_id, ) - with self.job_db: + with time_db_transaction(self.job_db, 'Insert request'): cur = self.job_db.execute( "INSERT INTO requests VALUES (NULL, ?, ?, ?, 'DARK', ?)", (rid, proposal, int(wait_runs[-1]), request_time) @@ -1393,8 +1417,8 @@ class ActionsServer: ).split() cmd = parse_config(cmd, dconfig) - with self.job_db: - cur = self.job_db.execute( + with time_db_transaction(self.job_db, 'Insert execution'): + cur = self.job_db.execute( # 2 "INSERT INTO executions VALUES (NULL, ?, ?, ?, ?, NULL)", (req_id, shlex.join(cmd), detector, karabo_id) ) -- GitLab