diff --git a/webservice/job_monitor.py b/webservice/job_monitor.py index eca1ce0f0d6228652de034608d06cff7827620df..2b0e6aabc5f2ec388eefd130075187d19a64602a 100644 --- a/webservice/job_monitor.py +++ b/webservice/job_monitor.py @@ -106,20 +106,6 @@ def slurm_job_status(jobid): return "NA", "NA", "NA" -def parse_log_file(file_path): - results = [] - with open(file_path, 'r') as file: - for line in file: - try: - log_entry = json.loads(line.strip()) - error_message = log_entry.get('message', '') - error_class = log_entry.get('class', '') - results.append((error_message, error_class)) - except json.JSONDecodeError: - log.error(f"Skipping invalid JSON: {line.strip()}") - return results - - def get_report_dir(command): args = shlex.split(command) try: @@ -129,56 +115,30 @@ def get_report_dir(command): return -def process_log_file(job_id, karabo_id, report_dir, karabo_id_log, file): - if file.exists(): - with open(file, 'r') as f: - for line in f: - try: - json_line = json.loads(line) - if "default" not in json_line['class'].lower(): - message = json_line['message'] - if message: - karabo_id_log.setdefault( - message, []).append(job_id) - except json.JSONDecodeError: - log.error( - f"Invalid JSON in errors file {file}:" - f" {line.strip()}") - return karabo_id_log - - -def compress_job_ids(job_ids): - """Compress list of job IDs to a shorter representation. - - Args: - job_ids (list): List of job IDs - - Returns: - str: Compressed representation like "16 jobs (11498126-11498141)" - or "2 jobs (11498142, 11498143)" for non-sequential IDs - """ - if not job_ids: - return "0 jobs" - - # Convert to integers and sort - ids = sorted(int(id) for id in job_ids) - - # Check if they're sequential - if len(ids) > 2 and ids[-1] - ids[0] + 1 == len(ids): - return f"{len(ids)} jobs ({ids[0]}-{ids[-1]})" - - if len(ids) > 4: - return f"{len(ids)} jobs (e.g. {ids[0]}, {ids[1]}...)" - - return f"{len(ids)} jobs ({', '.join(str(id) for id in ids)})" +def process_log_file(job_id,karabo_id_log, file): + with open(file, 'r') as f: + for line in f: + try: + json_line = json.loads(line) + message = json_line['level'] + json_line['message'] + log_class = json_line['log_class'].strip(".") + if message: + if not any(["calibration" in c.lower() for c in log_class]): + message = "Error: Uncaught error!" + karabo_id_log.setdefault( + message, []).append(job_id) + except json.JSONDecodeError: + log.error( + f"Invalid JSON in errors file {file}:" + f" {line.strip()}") -def format_log_message(errors): +def format_log_message(logs): """Format log messages with compressed job IDs.""" formatted = {} - for karabo_id, messages in errors.items(): + for karabo_id, messages in logs.items(): formatted[karabo_id] = { - msg: compress_job_ids(job_ids) + msg: f" {len(job_ids)} jobs" for msg, job_ids in messages.items() } return formatted @@ -390,27 +350,27 @@ class JobsMonitor: for job_id, status in job_statuses: if status == "COMPLETED": - continue + # No need to process warnings for failed jobs. + warning_file = report_dir / f"warnings_{job_id}.log" + if warning_file.exists(): + karabo_id_warn = warnings.setdefault(karabo_id, {}) + process_log_file(job_id, karabo_id_warn, warning_file) + continue # No errors expected for COMPLETED jobs. if not exec_success: # no errors expected for successful execution. karabo_id_err = errors.setdefault(karabo_id, {}) if status == "FAILED": # process error logs error_file = report_dir / f"errors_{job_id}.log" - process_log_file( - job_id, karabo_id, report_dir, karabo_id_err, error_file) + if error_file.exists(): + process_log_file(job_id, karabo_id_err, error_file) if len(karabo_id_err) == 0: log.warning(f"Job {job_id} failed but no error log/messages found.") karabo_id_err.setdefault( - "Job failed but no error logs found", []).append(job_id) + "Error: Job failed but no error logs found", []).append(job_id) else: # Job unsucessefull with a status other than `FAILED` karabo_id_err.setdefault( - f"SLURM job terminated with status: {status}", []).append(job_id) - - # Process warning logs - warning_file = report_dir / f"warnings_{job_id}.log" - process_log_file( - job_id, karabo_id, report_dir, karabo_id_err, warning_file) + f"Error: SLURM job terminated with status: {status}", []).append(job_id) success = (not krb_ids_failed) @@ -440,11 +400,11 @@ class JobsMonitor: msg = "Calibration jobs succeeded" else: # List success & failure by karabo_id - ok = ', '.join(sorted(krb_ids_success)) if krb_ids_success else 'none' - msg = ( - f"Succeeded: {ok}; Failed: {', '.join(sorted(krb_ids_failed))} :" - f" {json.dumps(format_log_message(errors), indent=4)}" - ) + ok = ', '.join(sorted(krb_ids_success)) if krb_ids_success else None + msg = "" + if ok: + msg += f"Succeeded: {ok}; " + msg += f"Failed: {json.dumps(format_log_message(errors), indent=4)}" log.debug("Update MDC for %s, %s: %s", r['action'], r['mymdc_id'], msg)