diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index b9a6b5fcbce62a4e4b3b862368b51a486bc1b528..ef521e6da67b3c3b9d3ef8347989362a5219b7da 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -30,6 +30,7 @@ log = logging.getLogger(__name__) STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG 'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED', 'OUT_OF_MEMORY', 'SPECIAL_EXIT', 'TIMEOUT', + 'NA', # Unknown (used internally if job ID missing) } STATE_ABBREVS = { 'PENDING': 'PD', @@ -67,7 +68,7 @@ def slurm_status(filter_user=True): :return: a dictionary indexed by slurm jobid and containing a tuple of (status, run time) as values. """ - cmd = ["squeue", "--states=all", "--format=%i %T %M"] + cmd = ["squeue", "--states=all", "--format=%i %T %M %N"] if filter_user: cmd += ["--me"] res = run(cmd, stdout=PIPE, stderr=PIPE) @@ -76,9 +77,9 @@ def slurm_status(filter_user=True): statii = {} for r in rlines[1:]: try: - jobid, status, runtime = r.split() + jobid, status, runtime, hostname = r.split() jobid = jobid.strip() - statii[jobid] = status, runtime + statii[jobid] = status, runtime, hostname except ValueError: # not enough values to unpack in split pass return statii @@ -170,7 +171,7 @@ class JobsMonitor: Newly completed executions are present with an empty list. """ jobs_to_check = self.job_db.execute( - "SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0" + "SELECT job_id, exec_id, hostname FROM slurm_jobs WHERE finished = 0" ).fetchall() if not jobs_to_check: log.debug("No unfinished jobs to check") @@ -189,22 +190,28 @@ class JobsMonitor: execn_ongoing_jobs = ongoing_jobs_by_exn.setdefault(r['exec_id'], []) if str(r['job_id']) in statii: - # statii contains jobs which are still going (from squeue) - slstatus, runtime = statii[str(r['job_id'])] + # Jobs which are pending, running, or recently finished (from squeue) + # Jobs stay for >= 150s after finishing, so we should always see them. + slstatus, runtime, hostname = statii[str(r['job_id'])] else: - # These jobs have finished (successfully or otherwise) + # Fallback: get job info from sacct _, runtime, slstatus = slurm_job_status(r['job_id']) + # We *don't* take hostname from sacct; with some GPFS issues + # the job may not get recorded, and we don't want to overwrite + # a hostname we previously got from squeue. + hostname = r['hostname'] finished = slstatus in STATES_FINISHED if not finished: short_state = STATE_ABBREVS.get(slstatus, slstatus) execn_ongoing_jobs.append(f"{short_state}-{runtime}") - updates.append((finished, runtime, slstatus, r['job_id'])) + updates.append((finished, runtime, slstatus, hostname, r['job_id'])) with time_db_transaction(self.job_db, 'Update jobs'): self.job_db.executemany( - "UPDATE slurm_jobs SET finished=?, elapsed=?, status=? WHERE job_id = ?", + "UPDATE slurm_jobs SET " + "finished=?, elapsed=?, status=?, hostname=? WHERE job_id = ?", updates ) diff --git a/webservice/webservice.py b/webservice/webservice.py index 54e94ddd0810af7bbeb8e862f910f2f044b87706..0766b82cfe47a3f033521a21ff99330dcd8d18ef 100644 --- a/webservice/webservice.py +++ b/webservice/webservice.py @@ -90,7 +90,8 @@ def init_job_db(config): exec_id REFERENCES executions(exec_id), status, finished, - elapsed + elapsed, + hostname ); CREATE INDEX IF NOT EXISTS job_by_exec ON slurm_jobs(exec_id); CREATE INDEX IF NOT EXISTS job_by_finished ON slurm_jobs(finished); @@ -488,7 +489,7 @@ async def run_action(job_db, cmd, mode, proposal, run, exec_id) -> str: jobs.append((int(jobid.strip()), exec_id)) with time_db_transaction(job_db, 'Insert jobs'): job_db.executemany( - "INSERT INTO slurm_jobs VALUES (?, ?, 'PENDING', 0, '0')", + "INSERT INTO slurm_jobs VALUES (?, ?, 'PENDING', 0, '0', '')", jobs )