From 3a7339c9ac09943041feed63f7b4c99a03f93ffb Mon Sep 17 00:00:00 2001 From: Thomas Kluyver <thomas@kluyver.me.uk> Date: Tue, 24 May 2022 17:12:47 +0100 Subject: [PATCH] Row object doesn't offer attribute access --- webservice/job_monitor.py | 50 +++++++++++++++++++-------------------- webservice/webservice.py | 6 ++--- 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index 195a9f737..289d909ba 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -114,7 +114,7 @@ class JobsMonitor: if running_jobs_info: req_id = self.job_db.execute( "SELECT req_id FROM executions WHERE exec_id = ?", (exec_id,) - ).fetchone().req_id + ).fetchone()[0] reqs_still_going.setdefault(req_id, []).extend(running_jobs_info) # For executions that have finished, send out notifications, and @@ -149,22 +149,22 @@ class JobsMonitor: ongoing_jobs_by_exn = {} for r in c.fetchall(): log.debug(f"DB info {r}") - execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r.exec_id, []) + execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], []) - if r.job_id in statii: + if r['job_id'] in statii: # statii contains jobs which are still going (from squeue) - slstatus, runtime = statii[r.job_id] + slstatus, runtime = statii[r['job_id']] finished = False execn_ongoing_jobs.append(f"{slstatus}-{runtime}") else: # These jobs have finished (successfully or otherwise) - _, runtime, slstatus = slurm_job_status(r.job_id) + _, runtime, slstatus = slurm_job_status(r['job_id']) finished = True c.execute( "UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?", - (finished, runtime, slstatus, r.job_id) + (finished, runtime, slstatus, r['job_id']) ) self.job_db.commit() @@ -195,7 +195,7 @@ class JobsMonitor: def process_execution_finished(self, exec_id): """Send notification that one execution has finished""" - statuses = [r.status for r in self.job_db.execute( + statuses = [r[0] for r in self.job_db.execute( "SELECT status FROM slurm_jobs WHERE exec_id = ?", (exec_id,) ).fetchall()] success = set(statuses) == {'COMPLETED'} @@ -211,27 +211,27 @@ class JobsMonitor: (success, exec_id) ) log.info("Execution finished: %s for (p%s, r%s, %s), success=%s", - r.action, r.proposal, r.run, r.karabo_id, success) + r['action'], r['proposal'], r['run'], r['karabo_id'], success) - if r.action == 'CORRECT': + if r['action'] == 'CORRECT': try: self.kafka_prod.send(self.kafka_topic, { 'event': 'correction_complete', - 'proposal': r.proposal, - 'run': r.run, - 'detector': r.det_type, - 'karabo_id': r.karabo_id, + 'proposal': r['proposal'], + 'run': r['run'], + 'detector': r['det_type'], + 'karabo_id': r['karabo_id'], 'success': success, }) except KafkaError: log.warning("Error sending Kafka notification", exc_info=True) - return r.req_id + return r['req_id'] def process_request_finished(self, req_id): """Send notifications that a request has finished""" - exec_successes = {r.success for r in self.job_db.execute( + exec_successes = {r[0] for r in self.job_db.execute( "SELECT success FROM executions WHERE req_id = ?", (req_id,) ).fetchall()} success = (exec_successes == {1}) @@ -241,14 +241,14 @@ class JobsMonitor: ).fetchone() log.info( "Jobs finished - action: %s, myMdC id: %s, success: %s", - r.action, r.mymdc_id, success, + r['action'], r['mymdc_id'], success, ) - if r.action == 'CORRECT': + if r['action'] == 'CORRECT': try: self.kafka_prod.send(self.kafka_topic, { 'event': 'run_corrections_complete', - 'proposal': r.proposal, - 'run': r.run, + 'proposal': r['proposal'], + 'run': r['run'], 'success': success, }) except KafkaError: @@ -259,7 +259,7 @@ class JobsMonitor: msg = "Calibration jobs succeeded" else: # Count failed jobs - job_statuses = [r.status for r in self.job_db.execute( + job_statuses = [r[0] for r in self.job_db.execute( "SELECT slurm_jobs.status FROM slurm_jobs " "INNER JOIN executions USING (exec_id) " "INNER JOIN requests USING (req_id) " @@ -268,14 +268,14 @@ class JobsMonitor: n_failed = sum(s != 'COMPLETED' for s in job_statuses) msg = f"{n_failed}/{len(job_statuses)} calibration jobs failed" - log.debug("Update MDC for %s, %s: %s", r.action, r.mymdc_id, msg) + log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg) - if r.action == 'CORRECT': + if r['action'] == 'CORRECT': status = 'A' if success else 'NA' # Not-/Available - self.mymdc_update_run(r.mymdc_id, msg, status) - else: # r.action == 'DARK' + self.mymdc_update_run(r['mymdc_id'], msg, status) + else: # r['action'] == 'DARK' status = 'F' if success else 'E' # Finished/Error - self.mymdc_update_dark(r.mymdc_id, msg, status) + self.mymdc_update_dark(r['mymdc_id'], msg, status) def mymdc_update_run(self, run_id, msg, status='R'): data = {'flg_cal_data_status': status, diff --git a/webservice/webservice.py b/webservice/webservice.py index c9cf1a1d2..f77e52836 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -260,9 +260,9 @@ def query_rid(conn, rid) -> bytes: "WHERE mymdc_id = ?", (rid,) ) statuses = [] - for r in c.fetchall(): - logging.debug("Job %s has status %s", r.job_id, r.status) - statuses.append(r.status) + for job_id, status in c.fetchall(): + logging.debug("Job %s has status %s", job_id, status) + statuses.append(status) if statuses: return "\n".join(statuses).encode() -- GitLab