Newer
Older
import os
import sys
import traceback
import warnings
import IPython
import json
from pythonjsonlogger import jsonlogger
NOTEBOOK_NAME = os.getenv('CAL_NOTEBOOK_NAME', 'Unknown notebook')
CURRENT_WORKING_DIR = os.getenv(
'CAL_WORKING_DIR', 'Unknown directory')
JOB_ID = os.getenv('SLURM_JOB_ID', 'local')
class ContextFilter(logging.Filter):
def filter(self, record):
record.notebook = NOTEBOOK_NAME
record.directory = CURRENT_WORKING_DIR
record.job_id = JOB_ID
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(
log_record, record, message_dict)
log_record['timestamp'] = self.formatTime(record, self.datefmt)
log_record['level'] = record.levelname
log_record['filename'] = record.filename
log_record['lineno'] = record.lineno
log_record['class'] = getattr(record, 'class', 'DefaultClass')
if record.exc_info:
log_record['exc_info'] = self.formatException(record.exc_info)
# Create a logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Define a custom JSON format
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(filename)s %(lineno)d '
'%(notebook)s %(directory)s %(job_id)s %(class)s %(message)s')
# Function to create a file handler with job-specific JSON log file
def create_job_specific_handler(log_level, file_suffix):
log_file = f'{file_suffix}_{JOB_ID}.json'
handler = logging.FileHandler(log_file, delay=True)
handler.setLevel(log_level)
handler.setFormatter(formatter)
return handler
# Create job-specific file handlers
error_handler = create_job_specific_handler(logging.ERROR, 'errors')
warning_handler = create_job_specific_handler(logging.WARNING, 'warnings')
info_handler = create_job_specific_handler(logging.DEBUG, 'info')
# Avoid errors being logged in warnings.json
warning_handler.addFilter(lambda record: record.levelno < logging.ERROR)
# Add the custom filter to handlers
context_filter = ContextFilter()
error_handler.addFilter(context_filter)
warning_handler.addFilter(context_filter)
info_handler.addFilter(context_filter)
# Add handlers to logger
logger.addHandler(error_handler)
logger.addHandler(warning_handler)
logger.addHandler(info_handler)
handling_error = False
def safe_handle_error(exc_type, exc_value, exc_traceback):
global handling_error
if handling_error: # Avoid infinite loop of errors.
sys.stderr.write("Recursive error detected!\n")
traceback.print_exception(
exc_type, exc_value, exc_traceback, file=sys.stderr)
return
handling_error = True
try:
# Log the error with the notebook name, job ID, and additional metadata
logger.error(
"An error occurred. Exception type: %s, Message: %s",
exc_type.__name__, str(exc_value),
extra={
'notebook': NOTEBOOK_NAME,
'directory': CURRENT_WORKING_DIR,
'job_id': JOB_ID,
'class': exc_type.__name__ if exc_type else 'DefaultErrorClass', # noqa
},
exc_info=(exc_type, exc_value, exc_traceback)
)
except Exception as log_error:
sys.stderr.write(f"Logging failed: {log_error}\n")
traceback.print_exception(
exc_type, exc_value, exc_traceback, file=sys.stderr)
finally:
handling_error = False
def handle_warning(message, category, filename, lineno, file=None, line=None):
try:
logger.warning(
"Warning occurred: %s, File: %s, Line: %d",
message, filename, lineno,
extra={
'notebook': NOTEBOOK_NAME,
'directory': CURRENT_WORKING_DIR,
'job_id': JOB_ID,
'class': category.__name__ if category else 'DefaultWarningClass', # noqa
}
)
except Exception as log_error:
sys.stderr.write(f"Logging failed: {log_error}\n")
# Replace the handlers with our custom ones
sys.excepthook = safe_handle_error
warnings.showwarning = handle_warning
# Override IPython's exception handling
def custom_showtraceback(self, *args, **kwargs):
return safe_handle_error(*sys.exc_info())
IPython.core.interactiveshell.InteractiveShell.showtraceback = custom_showtraceback # noqa