From 6de29a7e51982a0e2f340c2d5025487a78ed8e80 Mon Sep 17 00:00:00 2001 From: ahmedk <karim.ahmed@xfel.eu> Date: Tue, 29 Oct 2024 15:48:15 +0100 Subject: [PATCH] feat: First draft for processing calibration logs and adding better warning/error text to myMDC CAL feedback --- webservice/job_monitor.py | 168 ++++++++++++++++++++++++++++++++++---- 1 file changed, 150 insertions(+), 18 deletions(-) diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index b9a6b5fcb..eca1ce0f0 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -106,6 +106,84 @@ def slurm_job_status(jobid): return "NA", "NA", "NA" +def parse_log_file(file_path): + results = [] + with open(file_path, 'r') as file: + for line in file: + try: + log_entry = json.loads(line.strip()) + error_message = log_entry.get('message', '') + error_class = log_entry.get('class', '') + results.append((error_message, error_class)) + except json.JSONDecodeError: + log.error(f"Skipping invalid JSON: {line.strip()}") + return results + + +def get_report_dir(command): + args = shlex.split(command) + try: + return args[args.index("--report-to") + 1] + except (ValueError, IndexError): + log.error("Couldn't find report directory in %r", args) + return + + +def process_log_file(job_id, karabo_id, report_dir, karabo_id_log, file): + if file.exists(): + with open(file, 'r') as f: + for line in f: + try: + json_line = json.loads(line) + if "default" not in json_line['class'].lower(): + message = json_line['message'] + if message: + karabo_id_log.setdefault( + message, []).append(job_id) + except json.JSONDecodeError: + log.error( + f"Invalid JSON in errors file {file}:" + f" {line.strip()}") + return karabo_id_log + + +def compress_job_ids(job_ids): + """Compress list of job IDs to a shorter representation. + + Args: + job_ids (list): List of job IDs + + Returns: + str: Compressed representation like "16 jobs (11498126-11498141)" + or "2 jobs (11498142, 11498143)" for non-sequential IDs + """ + if not job_ids: + return "0 jobs" + + # Convert to integers and sort + ids = sorted(int(id) for id in job_ids) + + # Check if they're sequential + if len(ids) > 2 and ids[-1] - ids[0] + 1 == len(ids): + return f"{len(ids)} jobs ({ids[0]}-{ids[-1]})" + + if len(ids) > 4: + return f"{len(ids)} jobs (e.g. {ids[0]}, {ids[1]}...)" + + return f"{len(ids)} jobs ({', '.join(str(id) for id in ids)})" + + +def format_log_message(errors): + """Format log messages with compressed job IDs.""" + formatted = {} + for karabo_id, messages in errors.items(): + formatted[karabo_id] = { + msg: compress_job_ids(job_ids) + for msg, job_ids in messages.items() + } + return formatted + + class JobsMonitor: def __init__(self, config): log.info("Starting jobs monitor") @@ -276,11 +354,65 @@ class JobsMonitor: def process_request_finished(self, req_id): """Send Kafka notifications and update myMDC that a request has finished.""" - krb_id_successes = {r[0]: r[1] for r in self.job_db.execute( - "SELECT karabo_id, success FROM executions WHERE req_id = ?", - (req_id,) - ).fetchall()} - success = (set(krb_id_successes.values()) == {1}) + execs = self.job_db.execute( + "SELECT karabo_id, success, command FROM executions WHERE req_id = ?", + (req_id,)).fetchall() + + # both dicts will be structured as {karabo_id: {message: [Job IDs]}} + warnings = {} + errors = {} + krb_ids_failed = [] + krb_ids_success = [] + + for karabo_id, exec_success, command in execs: + + # Get status for all jobs in this execution + job_statuses = self.job_db.execute( + "SELECT job_id, status FROM slurm_jobs WHERE exec_id IN " + "(SELECT exec_id FROM executions WHERE karabo_id = ? AND req_id = ?)", + (karabo_id, req_id) + ).fetchall() + + # Look at logs and check if there are warnings/errors + report_dir = get_report_dir(command) + if not report_dir: + log.error(f"Could not get report directory from command: {command}") + continue + report_dir = Path(report_dir) + if not report_dir.exists(): + log.error(f"Report directory does not exist: {report_dir}") + continue + + if exec_success: + krb_ids_success.append(karabo_id) + else: + krb_ids_failed.append(karabo_id) + + for job_id, status in job_statuses: + if status == "COMPLETED": + continue + + if not exec_success: # no errors expected for successful execution. + karabo_id_err = errors.setdefault(karabo_id, {}) + + if status == "FAILED": # process error logs + error_file = report_dir / f"errors_{job_id}.log" + process_log_file( + job_id, karabo_id, report_dir, karabo_id_err, error_file) + if len(karabo_id_err) == 0: + log.warning(f"Job {job_id} failed but no error log/messages found.") + karabo_id_err.setdefault( + "Job failed but no error logs found", []).append(job_id) + else: # Job unsucessefull with a status other than `FAILED` + karabo_id_err.setdefault( + f"SLURM job terminated with status: {status}", []).append(job_id) + + # Process warning logs + warning_file = report_dir / f"warnings_{job_id}.log" + process_log_file( + job_id, karabo_id, report_dir, karabo_id_err, warning_file) + + success = (not krb_ids_failed) r = self.job_db.execute( "SELECT * FROM requests WHERE req_id = ?", (req_id,) @@ -302,25 +434,31 @@ class JobsMonitor: exc_info=True) if success: - msg = "Calibration jobs succeeded" + if warnings: + msg = f"Calibration jobs succeeded with warnings: {json.dumps(format_log_message(warnings), indent=4)}" + else: + msg = "Calibration jobs succeeded" else: # List success & failure by karabo_id - krb_ids_ok = {k for (k, v) in krb_id_successes.items() if v == 1} - ok = ', '.join(sorted(krb_ids_ok)) if krb_ids_ok else 'none' - krb_ids_failed = {k for (k, v) in krb_id_successes.items() if v == 0} - msg = f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))}" + ok = ', '.join(sorted(krb_ids_success)) if krb_ids_success else 'none' + msg = ( + f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))} :" + f" {json.dumps(format_log_message(errors), indent=4)}" + ) log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg) if r['action'] == 'CORRECT': if success: status = 'A' # Available - elif set(krb_id_successes.values()) == {0, 1}: + elif (krb_ids_success and krb_ids_failed) or warnings: + # TODO: do we keep only an error when one detector fail? status = 'AW' # Available with Warning (failed for some detectors) else: status = 'E' # Error self.mymdc_update_run(r['mymdc_id'], msg, status) else: # r['action'] == 'DARK' + # TODO: add warning when available at myMDC. status = 'F' if success else 'E' # Finished/Error self.mymdc_update_dark(r['mymdc_id'], msg, status) @@ -348,13 +486,7 @@ class JobsMonitor: self, mymdc_run_id, command, karabo_id, success, request_time: str ): """Add report to MyMdC when a correction execution has finished""" - args = shlex.split(command) - try: - report_path = args[args.index("--report-to") + 1] + ".pdf" - except (ValueError, IndexError): - log.error("Couldn't find report path in %r", args) - return - + report_path = get_report_dir(command) + ".pdf" if not os.path.isfile(report_path): log.error("Jobs finished, but report file %s missing", report_path) return -- GitLab