Skip to content
Snippets Groups Projects
Commit 4f3ef7f8 authored by Thomas Kluyver's avatar Thomas Kluyver
Browse files

Don't mark jobs as finished just because they disappear from squeue output

parent 9d3bde27
No related branches found
No related tags found
1 merge request!833[Webservice] Don't mark jobs as finished just because they disappear from squeue output
...@@ -23,6 +23,11 @@ except ImportError: ...@@ -23,6 +23,11 @@ except ImportError:
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG
'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED',
'NODE_FAIL', 'OUT_OF_MEMORY', 'PREEMPTED', 'SPECIAL_EXIT', 'TIMEOUT',
}
class NoOpProducer: class NoOpProducer:
"""Fills in for Kafka producer object when setting that up fails""" """Fills in for Kafka producer object when setting that up fails"""
...@@ -53,7 +58,7 @@ def slurm_status(filter_user=True): ...@@ -53,7 +58,7 @@ def slurm_status(filter_user=True):
cmd = ["squeue"] cmd = ["squeue"]
if filter_user: if filter_user:
cmd += ["--me"] cmd += ["--me"]
res = run(cmd, stdout=PIPE) res = run(cmd, stdout=PIPE, stderr=PIPE)
if res.returncode == 0: if res.returncode == 0:
rlines = res.stdout.decode().split("\n") rlines = res.stdout.decode().split("\n")
statii = {} statii = {}
...@@ -65,6 +70,10 @@ def slurm_status(filter_user=True): ...@@ -65,6 +70,10 @@ def slurm_status(filter_user=True):
except ValueError: # not enough values to unpack in split except ValueError: # not enough values to unpack in split
pass pass
return statii return statii
else:
log.error("Running squeue failed. stdout: %r, stderr: %r",
res.stdout.decode(), res.stderr.decode())
return None
def slurm_job_status(jobid): def slurm_job_status(jobid):
...@@ -148,15 +157,19 @@ class JobsMonitor: ...@@ -148,15 +157,19 @@ class JobsMonitor:
Newly completed executions are present with an empty list. Newly completed executions are present with an empty list.
""" """
jobs_to_check = self.job_db.execute(
"SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0"
).fetchall()
if not jobs_to_check:
log.debug("No unfinished jobs to check")
return {}
statii = slurm_status() statii = slurm_status()
# Check that slurm is giving proper feedback # Check that slurm is giving proper feedback
if statii is None: if statii is None:
return {} return {}
log.debug(f"SLURM info {statii}") log.debug(f"SLURM info {statii}")
jobs_to_check = self.job_db.execute(
"SELECT job_id, exec_id FROM slurm_jobs WHERE finished = 0"
).fetchall()
ongoing_jobs_by_exn = {} ongoing_jobs_by_exn = {}
updates = [] updates = []
for r in jobs_to_check: for r in jobs_to_check:
...@@ -166,13 +179,13 @@ class JobsMonitor: ...@@ -166,13 +179,13 @@ class JobsMonitor:
if str(r['job_id']) in statii: if str(r['job_id']) in statii:
# statii contains jobs which are still going (from squeue) # statii contains jobs which are still going (from squeue)
slstatus, runtime = statii[str(r['job_id'])] slstatus, runtime = statii[str(r['job_id'])]
finished = False
execn_ongoing_jobs.append(f"{slstatus}-{runtime}") execn_ongoing_jobs.append(f"{slstatus}-{runtime}")
else: else:
# These jobs have finished (successfully or otherwise) # These jobs have finished (successfully or otherwise)
_, runtime, slstatus = slurm_job_status(r['job_id']) _, runtime, slstatus = slurm_job_status(r['job_id'])
finished = True
finished = slstatus in STATES_FINISHED
updates.append((finished, runtime, slstatus, r['job_id'])) updates.append((finished, runtime, slstatus, r['job_id']))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment