Newer
Older
import os
import sys
import traceback
import warnings
import IPython
import json
from pythonjsonlogger import jsonlogger
NOTEBOOK_NAME = os.getenv('CAL_NOTEBOOK_NAME', 'Unknown notebook')
CURRENT_WORKING_DIR = os.getenv(
'CAL_WORKING_DIR', 'Unknown directory')
JOB_ID = os.getenv('SLURM_JOB_ID', 'local')
class ContextFilter(logging.Filter):
def filter(self, record):
record.notebook = NOTEBOOK_NAME
record.directory = CURRENT_WORKING_DIR
record.job_id = JOB_ID
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(
log_record, record, message_dict)
log_record['timestamp'] = self.formatTime(record, self.datefmt)
log_record['level'] = record.levelname
log_record['filename'] = record.filename
log_record['lineno'] = record.lineno
log_record['class'] = getattr(record, 'class', 'DefaultClass')
if record.exc_info:
log_record['exc_info'] = self.formatException(record.exc_info)
class JSONArrayHandler(logging.FileHandler):
def __init__(self, filename, mode='a', encoding=None, delay=False):
super().__init__(filename, mode, encoding, delay)
self.file_empty = True
def emit(self, record):
try:
msg = self.format(record)
json_msg = json.loads(msg)
with self.lock:
if self.stream is None:
self.stream = self._open()
if self.file_empty:
self.stream.write('[\n')
self.file_empty = False
else:
self.stream.write(',\n')
json.dump(json_msg, self.stream, indent=2)
self.stream.flush()
except Exception:
self.handleError(record)
def close(self):
if self.stream is not None:
with self.lock:
if not self.file_empty:
self.stream.write('\n]')
super().close()
# Create a logger
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Define a custom JSON format
formatter = CustomJsonFormatter(
'%(timestamp)s %(level)s %(filename)s %(lineno)d '
'%(notebook)s %(directory)s %(job_id)s %(class)s %(message)s')
# Function to create a file handler with job-specific JSON log file
def create_job_specific_handler(log_level, file_suffix):
log_file = f'{file_suffix}_{JOB_ID}.json'
handler = JSONArrayHandler(log_file, mode='a+', delay=True)
handler.setLevel(log_level)
handler.setFormatter(formatter)
return handler
# Create job-specific file handlers
error_handler = create_job_specific_handler(logging.ERROR, 'errors')
warning_handler = create_job_specific_handler(logging.WARNING, 'warnings')
info_handler = create_job_specific_handler(logging.DEBUG, 'info')
# Avoid errors being logged in warnings.json
warning_handler.addFilter(lambda record: record.levelno < logging.ERROR)
# Add the custom filter to handlers
context_filter = ContextFilter()
error_handler.addFilter(context_filter)
warning_handler.addFilter(context_filter)
info_handler.addFilter(context_filter)
# Add handlers to logger
logger.addHandler(error_handler)
logger.addHandler(warning_handler)
logger.addHandler(info_handler)
handling_error = False
def safe_handle_error(exc_type, exc_value, exc_traceback):
global handling_error
if handling_error: # Avoid infinite loop of errors.
sys.stderr.write("Recursive error detected!\n")
traceback.print_exception(
exc_type, exc_value, exc_traceback, file=sys.stderr)
return
handling_error = True
try:
# Log the error with the notebook name, job ID, and additional metadata
logger.error(
"An error occurred. Exception type: %s, Message: %s",
exc_type.__name__, str(exc_value),
extra={
'notebook': NOTEBOOK_NAME,
'directory': CURRENT_WORKING_DIR,
'job_id': JOB_ID,
'class': exc_type.__name__ if exc_type else 'DefaultErrorClass', # noqa
},
exc_info=(exc_type, exc_value, exc_traceback)
)
except Exception as log_error:
sys.stderr.write(f"Logging failed: {log_error}\n")
traceback.print_exception(
exc_type, exc_value, exc_traceback, file=sys.stderr)
finally:
handling_error = False
def handle_warning(message, category, filename, lineno, file=None, line=None):
try:
logger.warning(
"Warning occurred: %s, File: %s, Line: %d",
message, filename, lineno,
extra={
'notebook': NOTEBOOK_NAME,
'directory': CURRENT_WORKING_DIR,
'job_id': JOB_ID,
'class': category.__name__ if category else 'DefaultWarningClass', # noqa
}
)
except Exception as log_error:
sys.stderr.write(f"Logging failed: {log_error}\n")
# Replace the handlers with our custom ones
sys.excepthook = safe_handle_error
warnings.showwarning = handle_warning
# Override IPython's exception handling
def custom_showtraceback(self, *args, **kwargs):
return safe_handle_error(*sys.exc_info())
IPython.core.interactiveshell.InteractiveShell.showtraceback = custom_showtraceback # noqa