Skip to content
Snippets Groups Projects

Use status AW in myMdC if correction failed for some detectors in a run

Merged Thomas Kluyver requested to merge fix/mymdc-status-AW into master
2 unresolved threads
1 file
+ 15
5
Compare changes
  • Side-by-side
  • Inline
+ 15
5
@@ -27,6 +27,10 @@ STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG
@@ -27,6 +27,10 @@ STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG
'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED',
'BOOT_FAIL', 'CANCELLED', 'COMPLETED', 'DEADLINE', 'FAILED',
'OUT_OF_MEMORY', 'SPECIAL_EXIT', 'TIMEOUT',
'OUT_OF_MEMORY', 'SPECIAL_EXIT', 'TIMEOUT',
}
}
 
STATE_ABBREVS = {
 
'PENDING': 'PD',
 
'RUNNING': 'R',
 
}
class NoOpProducer:
class NoOpProducer:
@@ -55,7 +59,7 @@ def slurm_status(filter_user=True):
@@ -55,7 +59,7 @@ def slurm_status(filter_user=True):
:return: a dictionary indexed by slurm jobid and containing a tuple
:return: a dictionary indexed by slurm jobid and containing a tuple
of (status, run time) as values.
of (status, run time) as values.
"""
"""
cmd = ["squeue", "--states=all"]
cmd = ["squeue", "--states=all", "--format=%i %T %M"]
Please register or sign in to reply
if filter_user:
if filter_user:
cmd += ["--me"]
cmd += ["--me"]
res = run(cmd, stdout=PIPE, stderr=PIPE)
res = run(cmd, stdout=PIPE, stderr=PIPE)
@@ -64,7 +68,7 @@ def slurm_status(filter_user=True):
@@ -64,7 +68,7 @@ def slurm_status(filter_user=True):
statii = {}
statii = {}
for r in rlines[1:]:
for r in rlines[1:]:
try:
try:
jobid, _, _, _, status, runtime, _, _ = r.split()
jobid, status, runtime = r.split()
jobid = jobid.strip()
jobid = jobid.strip()
statii[jobid] = status, runtime
statii[jobid] = status, runtime
except ValueError: # not enough values to unpack in split
except ValueError: # not enough values to unpack in split
@@ -179,13 +183,14 @@ class JobsMonitor:
@@ -179,13 +183,14 @@ class JobsMonitor:
if str(r['job_id']) in statii:
if str(r['job_id']) in statii:
# statii contains jobs which are still going (from squeue)
# statii contains jobs which are still going (from squeue)
slstatus, runtime = statii[str(r['job_id'])]
slstatus, runtime = statii[str(r['job_id'])]
execn_ongoing_jobs.append(f"{slstatus}-{runtime}")
else:
else:
# These jobs have finished (successfully or otherwise)
# These jobs have finished (successfully or otherwise)
_, runtime, slstatus = slurm_job_status(r['job_id'])
_, runtime, slstatus = slurm_job_status(r['job_id'])
finished = slstatus in STATES_FINISHED
finished = slstatus in STATES_FINISHED
 
if not finished:
 
short_state = STATE_ABBREVS.get(slstatus, slstatus)
    • This translates the most common states we'll see back to their short forms: PD & R. We have a .startswith('PD-') below to skip updates when all jobs are pending, so it's not entirely cosmetic.

Please register or sign in to reply
 
execn_ongoing_jobs.append(f"{short_state}-{runtime}")
updates.append((finished, runtime, slstatus, r['job_id']))
updates.append((finished, runtime, slstatus, r['job_id']))
@@ -295,7 +300,12 @@ class JobsMonitor:
@@ -295,7 +300,12 @@ class JobsMonitor:
log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg)
log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg)
if r['action'] == 'CORRECT':
if r['action'] == 'CORRECT':
status = 'A' if success else 'E' # Available/Error
if success:
 
status = 'A' # Available
 
elif set(krb_id_successes.values()) == {0, 1}:
 
status = 'AW' # Available with Warning (failed for some detectors)
 
else:
 
status = 'E' # Error
self.mymdc_update_run(r['mymdc_id'], msg, status)
self.mymdc_update_run(r['mymdc_id'], msg, status)
else: # r['action'] == 'DARK'
else: # r['action'] == 'DARK'
status = 'F' if success else 'E' # Finished/Error
status = 'F' if success else 'E' # Finished/Error
Loading