Skip to content
Snippets Groups Projects
Commit 6de29a7e authored by Karim Ahmed's avatar Karim Ahmed
Browse files

feat: First draft for processing calibration logs and adding better...

feat: First draft for processing calibration logs and adding better warning/error text to myMDC CAL feedback
parent 349d27a3
No related branches found
No related tags found
No related merge requests found
......@@ -106,6 +106,84 @@ def slurm_job_status(jobid):
return "NA", "NA", "NA"
def parse_log_file(file_path):
results = []
with open(file_path, 'r') as file:
for line in file:
try:
log_entry = json.loads(line.strip())
error_message = log_entry.get('message', '')
error_class = log_entry.get('class', '')
results.append((error_message, error_class))
except json.JSONDecodeError:
log.error(f"Skipping invalid JSON: {line.strip()}")
return results
def get_report_dir(command):
args = shlex.split(command)
try:
return args[args.index("--report-to") + 1]
except (ValueError, IndexError):
log.error("Couldn't find report directory in %r", args)
return
def process_log_file(job_id, karabo_id, report_dir, karabo_id_log, file):
if file.exists():
with open(file, 'r') as f:
for line in f:
try:
json_line = json.loads(line)
if "default" not in json_line['class'].lower():
message = json_line['message']
if message:
karabo_id_log.setdefault(
message, []).append(job_id)
except json.JSONDecodeError:
log.error(
f"Invalid JSON in errors file {file}:"
f" {line.strip()}")
return karabo_id_log
def compress_job_ids(job_ids):
"""Compress list of job IDs to a shorter representation.
Args:
job_ids (list): List of job IDs
Returns:
str: Compressed representation like "16 jobs (11498126-11498141)"
or "2 jobs (11498142, 11498143)" for non-sequential IDs
"""
if not job_ids:
return "0 jobs"
# Convert to integers and sort
ids = sorted(int(id) for id in job_ids)
# Check if they're sequential
if len(ids) > 2 and ids[-1] - ids[0] + 1 == len(ids):
return f"{len(ids)} jobs ({ids[0]}-{ids[-1]})"
if len(ids) > 4:
return f"{len(ids)} jobs (e.g. {ids[0]}, {ids[1]}...)"
return f"{len(ids)} jobs ({', '.join(str(id) for id in ids)})"
def format_log_message(errors):
"""Format log messages with compressed job IDs."""
formatted = {}
for karabo_id, messages in errors.items():
formatted[karabo_id] = {
msg: compress_job_ids(job_ids)
for msg, job_ids in messages.items()
}
return formatted
class JobsMonitor:
def __init__(self, config):
log.info("Starting jobs monitor")
......@@ -276,11 +354,65 @@ class JobsMonitor:
def process_request_finished(self, req_id):
"""Send Kafka notifications and update myMDC that a request has finished."""
krb_id_successes = {r[0]: r[1] for r in self.job_db.execute(
"SELECT karabo_id, success FROM executions WHERE req_id = ?",
(req_id,)
).fetchall()}
success = (set(krb_id_successes.values()) == {1})
execs = self.job_db.execute(
"SELECT karabo_id, success, command FROM executions WHERE req_id = ?",
(req_id,)).fetchall()
# both dicts will be structured as {karabo_id: {message: [Job IDs]}}
warnings = {}
errors = {}
krb_ids_failed = []
krb_ids_success = []
for karabo_id, exec_success, command in execs:
# Get status for all jobs in this execution
job_statuses = self.job_db.execute(
"SELECT job_id, status FROM slurm_jobs WHERE exec_id IN "
"(SELECT exec_id FROM executions WHERE karabo_id = ? AND req_id = ?)",
(karabo_id, req_id)
).fetchall()
# Look at logs and check if there are warnings/errors
report_dir = get_report_dir(command)
if not report_dir:
log.error(f"Could not get report directory from command: {command}")
continue
report_dir = Path(report_dir)
if not report_dir.exists():
log.error(f"Report directory does not exist: {report_dir}")
continue
if exec_success:
krb_ids_success.append(karabo_id)
else:
krb_ids_failed.append(karabo_id)
for job_id, status in job_statuses:
if status == "COMPLETED":
continue
if not exec_success: # no errors expected for successful execution.
karabo_id_err = errors.setdefault(karabo_id, {})
if status == "FAILED": # process error logs
error_file = report_dir / f"errors_{job_id}.log"
process_log_file(
job_id, karabo_id, report_dir, karabo_id_err, error_file)
if len(karabo_id_err) == 0:
log.warning(f"Job {job_id} failed but no error log/messages found.")
karabo_id_err.setdefault(
"Job failed but no error logs found", []).append(job_id)
else: # Job unsucessefull with a status other than `FAILED`
karabo_id_err.setdefault(
f"SLURM job terminated with status: {status}", []).append(job_id)
# Process warning logs
warning_file = report_dir / f"warnings_{job_id}.log"
process_log_file(
job_id, karabo_id, report_dir, karabo_id_err, warning_file)
success = (not krb_ids_failed)
r = self.job_db.execute(
"SELECT * FROM requests WHERE req_id = ?", (req_id,)
......@@ -302,25 +434,31 @@ class JobsMonitor:
exc_info=True)
if success:
msg = "Calibration jobs succeeded"
if warnings:
msg = f"Calibration jobs succeeded with warnings: {json.dumps(format_log_message(warnings), indent=4)}"
else:
msg = "Calibration jobs succeeded"
else:
# List success & failure by karabo_id
krb_ids_ok = {k for (k, v) in krb_id_successes.items() if v == 1}
ok = ', '.join(sorted(krb_ids_ok)) if krb_ids_ok else 'none'
krb_ids_failed = {k for (k, v) in krb_id_successes.items() if v == 0}
msg = f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))}"
ok = ', '.join(sorted(krb_ids_success)) if krb_ids_success else 'none'
msg = (
f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))} :"
f" {json.dumps(format_log_message(errors), indent=4)}"
)
log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg)
if r['action'] == 'CORRECT':
if success:
status = 'A' # Available
elif set(krb_id_successes.values()) == {0, 1}:
elif (krb_ids_success and krb_ids_failed) or warnings:
# TODO: do we keep only an error when one detector fail?
status = 'AW' # Available with Warning (failed for some detectors)
else:
status = 'E' # Error
self.mymdc_update_run(r['mymdc_id'], msg, status)
else: # r['action'] == 'DARK'
# TODO: add warning when available at myMDC.
status = 'F' if success else 'E' # Finished/Error
self.mymdc_update_dark(r['mymdc_id'], msg, status)
......@@ -348,13 +486,7 @@ class JobsMonitor:
self, mymdc_run_id, command, karabo_id, success, request_time: str
):
"""Add report to MyMdC when a correction execution has finished"""
args = shlex.split(command)
try:
report_path = args[args.index("--report-to") + 1] + ".pdf"
except (ValueError, IndexError):
log.error("Couldn't find report path in %r", args)
return
report_path = get_report_dir(command) + ".pdf"
if not os.path.isfile(report_path):
log.error("Jobs finished, but report file %s missing", report_path)
return
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment