Skip to content
Snippets Groups Projects

[Webservice] Fix database lock timeouts

Merged Thomas Kluyver requested to merge fix/websvc-db-write-locking into master
1 unresolved thread
2 files
+ 54
23
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 13
10
@@ -15,11 +15,11 @@ from kafka.errors import KafkaError
try:
from .config import webservice as config
from .messages import MDC, Errors, MigrationError, Success
from .webservice import init_job_db, init_md_client
from .webservice import init_job_db, init_md_client, time_db_transaction
except ImportError:
from config import webservice as config
from messages import MDC, Errors, MigrationError, Success
from webservice import init_job_db, init_md_client
from webservice import init_job_db, init_md_client, time_db_transaction
log = logging.getLogger(__name__)
@@ -148,17 +148,18 @@ class JobsMonitor:
Newly completed executions are present with an empty list.
"""
c = self.job_db.cursor()
c.execute("SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0")
statii = slurm_status()
# Check that slurm is giving proper feedback
if statii is None:
return {}
log.debug(f"SLURM info {statii}")
jobs_to_check = self.job_db.execute(
"SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0"
).fetchall()
ongoing_jobs_by_exn = {}
for r in c.fetchall():
updates = []
for r in jobs_to_check:
log.debug(f"Job in DB before update: %s", tuple(r))
execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], [])
@@ -173,12 +174,14 @@ class JobsMonitor:
_, runtime, slstatus = slurm_job_status(r['job_id'])
finished = True
c.execute(
updates.append((finished, runtime, slstatus, r['job_id']))
with time_db_transaction(self.job_db, 'Update jobs'):
self.job_db.executemany(
"UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?",
(finished, runtime, slstatus, r['job_id'])
updates
)
self.job_db.commit()
return ongoing_jobs_by_exn
def process_request_still_going(self, req_id, running_jobs_info):
@@ -216,7 +219,7 @@ class JobsMonitor:
"WHERE exec_id = ?",
(exec_id,)
).fetchone()
with self.job_db:
with time_db_transaction(self.job_db, 'Update execution'):
self.job_db.execute(
"UPDATE executions SET success = ? WHERE exec_id = ?",
(success, exec_id)
Loading