Skip to content
Snippets Groups Projects
Commit 8b6b4b44 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Merge branch 'fix/jobmon-slurm-finish-states' into 'master'

[Webservice] Don't mark jobs as finished just because they disappear from squeue output

See merge request !833
parents f3027732 0d8f908d
No related branches found
No related tags found
1 merge request!833[Webservice] Don't mark jobs as finished just because they disappear from squeue output
......@@ -23,6 +23,11 @@ except ImportError:
log = logging.getLogger(__name__)
STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG
'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED',
'OUT_OF_MEMORY', 'SPECIAL_EXIT', 'TIMEOUT',
}
class NoOpProducer:
"""Fills in for Kafka producer object when setting that up fails"""
......@@ -50,10 +55,10 @@ def slurm_status(filter_user=True):
:return: a dictionary indexed by slurm jobid and containing a tuple
of (status, run time) as values.
"""
cmd = ["squeue"]
cmd = ["squeue", "--states=all"]
if filter_user:
cmd += ["--me"]
res = run(cmd, stdout=PIPE)
res = run(cmd, stdout=PIPE, stderr=PIPE)
if res.returncode == 0:
rlines = res.stdout.decode().split("\n")
statii = {}
......@@ -65,6 +70,10 @@ def slurm_status(filter_user=True):
except ValueError: # not enough values to unpack in split
pass
return statii
else:
log.error("Running squeue failed. stdout: %r, stderr: %r",
res.stdout.decode(), res.stderr.decode())
return None
def slurm_job_status(jobid):
......@@ -148,15 +157,19 @@ class JobsMonitor:
Newly completed executions are present with an empty list.
"""
jobs_to_check = self.job_db.execute(
"SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0"
).fetchall()
if not jobs_to_check:
log.debug("No unfinished jobs to check")
return {}
statii = slurm_status()
# Check that slurm is giving proper feedback
if statii is None:
return {}
log.debug(f"SLURM info {statii}")
jobs_to_check = self.job_db.execute(
"SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0"
).fetchall()
ongoing_jobs_by_exn = {}
updates = []
for r in jobs_to_check:
......@@ -166,13 +179,13 @@ class JobsMonitor:
if str(r['job_id']) in statii:
# statii contains jobs which are still going (from squeue)
slstatus, runtime = statii[str(r['job_id'])]
finished = False
execn_ongoing_jobs.append(f"{slstatus}-{runtime}")
else:
# These jobs have finished (successfully or otherwise)
_, runtime, slstatus = slurm_job_status(r['job_id'])
finished = True
finished = slstatus in STATES_FINISHED
updates.append((finished, runtime, slstatus, r['job_id']))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment