Skip to content
Snippets Groups Projects
Commit fa1b0603 authored by Philipp Schmidt's avatar Philipp Schmidt
Browse files

Merge branch 'feat/monitor-broken-nodes' into 'master'

[job_monitor] Watch and warn of jobs failing (almost) instantly by host

See merge request !1134
parents 75d11966 465194ca
No related branches found
No related tags found
1 merge request!1134[job_monitor] Watch and warn of jobs failing (almost) instantly by host
import pytest
import time
from webservice.common import elapsed_to_seconds
from webservice.job_monitor import ExpiringEvents
def test_elapsed_to_seconds():
assert elapsed_to_seconds('32') == 32
assert elapsed_to_seconds('00:32') == 32
assert elapsed_to_seconds('02:03:04') == 2*3600 + 3*60 + 4
assert elapsed_to_seconds('2-03:04:05') == 2 * 86400 + 3*3600 + 4*60 + 5
def test_elapsed_events():
ev = ExpiringEvents(600)
assert ev.add() == 1
assert ev.add() == 2
assert ev.add(time.monotonic() - 800) == 2
assert ev.add() == 3
assert len(ev) == 3
......@@ -10,10 +10,25 @@ import logging
import os
import shutil
import socket
from datetime import time
from datetime import time, timedelta
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
def elapsed_to_seconds(text):
if '-' in text:
days, _, text = text.partition('-')
else:
days = 0
return int(days) * 24*60*60 + sum(
[60**i * int(x) for i, x in enumerate(text.split(':')[::-1])])
def elapsed_to_timedelta(text):
return timedelta(seconds=elapsed_to_seconds(text))
def notify(message):
if not message:
raise ValueError("notify() requires a message")
......
......@@ -7,6 +7,8 @@ import os.path
import shlex
import signal
import time
from bisect import insort_left
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from subprocess import run, PIPE
......@@ -15,12 +17,12 @@ from kafka import KafkaProducer
from kafka.errors import KafkaError
try:
from .common import notify_ready, file_and_stderr_logs
from .common import notify_ready, file_and_stderr_logs, elapsed_to_seconds
from .config import webservice as config
from .messages import MDC, Errors, MigrationError, Success
from .webservice import init_job_db, init_md_client, time_db_transaction
except ImportError:
from common import notify_ready, file_and_stderr_logs
from common import notify_ready, file_and_stderr_logs, elapsed_to_seconds
from config import webservice as config
from messages import MDC, Errors, MigrationError, Success
from webservice import init_job_db, init_md_client, time_db_transaction
......@@ -32,12 +34,40 @@ STATES_FINISHED = { # https://slurm.schedmd.com/squeue.html#lbAG
'OUT_OF_MEMORY', 'SPECIAL_EXIT', 'TIMEOUT',
'NA', # Unknown (used internally if job ID missing)
}
STATES_FAILED = {'FAILED'}
STATE_ABBREVS = {
'PENDING': 'PD',
'RUNNING': 'R',
}
class ExpiringEvents:
"""Track events occuring within a time window.
Args:
window (int, optional): Time window in seconds, 600 by default.
"""
def __init__(self, window=600):
self.window = window
self.events = deque()
def add(self, event=None):
now = time.monotonic()
insort_left(self.events, event or now)
cutoff = now - self.window
while self.events and self.events[0] < cutoff:
self.events.popleft()
return len(self.events)
def __len__(self):
return len(self.events)
class NoOpProducer:
"""Fills in for Kafka producer object when setting that up fails"""
def send(self, topic, value):
......@@ -115,6 +145,7 @@ class JobsMonitor:
self.kafka_prod = init_kafka_producer(config)
self.kafka_topic = config['kafka']['topic']
self.time_interval = int(config['web-service']['job-update-interval'])
self.instant_fails_by_host = defauldict(ExpiringEvents)
def __enter__(self):
return self
......@@ -205,6 +236,12 @@ class JobsMonitor:
if not finished:
short_state = STATE_ABBREVS.get(slstatus, slstatus)
execn_ongoing_jobs.append(f"{short_state}-{runtime}")
elif slstatus in STATES_FAILED and elapsed_to_seconds(runtime) < 2:
# Specific branch to catch potentially broken nodes.
num_fails = self.instant_fails_by_host(hostname).add()
log.warning(f"Job {r['job_id']} failed instantly on "
f"{hostname}, {num_fails} on this host within "
f"the last 10 minutes")
updates.append((finished, runtime, slstatus, hostname, r['job_id']))
......
......@@ -17,17 +17,13 @@ from jinja2 import Template
from xfel_calibrate.settings import free_nodes_cmd, preempt_nodes_cmd
try:
from .common import notify_ready
from .common import notify_ready, elapsed_to_timedelta
from .config import serve_overview as config
except:
from common import notify_ready
from common import notify_ready, elapsed_to_timedelta
from config import serve_overview as config
def elapsed_to_timedelta(text):
return timedelta(seconds=sum(
[60**i * int(x) for i, x in enumerate(text.split(':')[::-1])]))
def datetime_to_grafana(dt):
return int(dt.timestamp() * 1000)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment