When I started developing applications for production environments, I quickly learned that proper logging is not just a “nice-to-have” feature—it’s essential for understanding what happens when things go wrong. Over the years, I’ve refined my approach to Python logging to make troubleshooting easier and more efficient.
Understanding Python Logging Fundamentals
Python’s built-in logging module provides a flexible framework for emitting log messages. The basic setup is straightforward:
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("app.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
logger.info("Application started")
However, this simple approach falls short for production applications. Let’s explore more sophisticated practices.
Implement Structured Logging
Traditional string-based logging makes parsing and analyzing logs difficult. Structured logging solves this by formatting logs as data objects, typically in JSON format.
import json
import logging
from datetime import datetime
class StructuredLogRecord(logging.LogRecord):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.timestamp = datetime.utcnow().isoformat()
class StructuredLogger(logging.Logger):
def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False, **kwargs):
if isinstance(msg, dict):
msg = json.dumps(msg)
super()._log(level, msg, args, exc_info, extra, stack_info)
logging.setLogRecordFactory(StructuredLogRecord)
logging.setLoggerClass(StructuredLogger)
logger = logging.getLogger("app")
logger.info({
"event": "user_login",
"user_id": 12345,
"ip_address": "192.168.1.1",
"success": True
})
This approach creates machine-readable logs that can be easily indexed and queried in log management systems like Elasticsearch, Splunk, or CloudWatch.
Use Appropriate Log Levels
I’ve found that using appropriate log levels significantly improves log management:
# DEBUG: Detailed information for debugging
logger.debug("Database query executed in 25ms")
# INFO: Confirmation that things are working as expected
logger.info("User successfully registered")
# WARNING: Indication that something unexpected happened
logger.warning("Rate limit approaching for IP 192.168.1.1")
# ERROR: Due to a more serious problem, the software couldn't perform some function
logger.error("Payment gateway timeout after 30s")
# CRITICAL: A very serious error, indicating program may be unable to continue
logger.critical("Database connection pool exhausted")
For production environments, I typically set the default log level to INFO or WARNING to reduce noise while capturing important events.
Implement Contextual Logging
Context is crucial for understanding log messages. Adding request IDs, user IDs, and other contextual information makes troubleshooting much easier.
import logging
import uuid
from contextvars import ContextVar
request_id_var = ContextVar('request_id', default=None)
class ContextFilter(logging.Filter):
def filter(self, record):
record.request_id = request_id_var.get()
return True
def get_logger():
logger = logging.getLogger("app")
logger.addFilter(ContextFilter())
return logger
def process_request():
# Generate a unique ID for this request
request_id = str(uuid.uuid4())
request_id_var.set(request_id)
logger = get_logger()
logger.info(f"Processing request")
# Deeper function calls will automatically include the request_id
validate_user()
def validate_user():
logger = get_logger()
logger.info("Validating user credentials")
For web applications, I often extend this approach with middleware that automatically assigns a request ID and makes it available to all handlers.
Configure Log Rotation
Log files can grow rapidly and consume disk space. Setting up log rotation is essential for production environments:
import logging
from logging.handlers import RotatingFileHandler
handler = RotatingFileHandler(
"app.log",
maxBytes=10_000_000, # 10MB
backupCount=5 # Keep 5 backup files
)
handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
logger.addHandler(handler)
For more complex scenarios, I use the TimedRotatingFileHandler
to rotate logs based on time intervals rather than size.
from logging.handlers import TimedRotatingFileHandler
handler = TimedRotatingFileHandler(
"app.log",
when="midnight", # Rotate at midnight
interval=1, # Daily rotation
backupCount=30 # Keep logs for 30 days
)
Handle Sensitive Data Properly
Logging sensitive information is a common security mistake. I implement redaction to ensure passwords, tokens, and personal data never appear in logs:
import logging
import re
class SensitiveDataFilter(logging.Filter):
def __init__(self):
super().__init__()
# Patterns for sensitive data
self.patterns = [
(re.compile(r'password["\']?\s*[:=]\s*["\']?([^"\']+)["\']?', re.IGNORECASE), r'password\1: "REDACTED"'),
(re.compile(r'token["\']?\s*[:=]\s*["\']?([^"\']+)["\']?', re.IGNORECASE), r'token\1: "REDACTED"'),
(re.compile(r'\b([A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,})\b'), r'EMAIL_REDACTED'),
(re.compile(r'\b\d{4}[- ]?\d{4}[- ]?\d{4}[- ]?\d{4}\b'), r'CARD_REDACTED'),
]
def filter(self, record):
if isinstance(record.msg, str):
for pattern, replacement in self.patterns:
record.msg = pattern.sub(replacement, record.msg)
return True
logger = logging.getLogger("app")
logger.addFilter(SensitiveDataFilter())
# This sensitive data will be redacted
logger.info('User provided password: "Secret123"')
logger.info('API token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9"')
For applications handling highly sensitive data, I consider more comprehensive solutions like specialized logging libraries with built-in PII detection.
Include Exception Context
When exceptions occur, capturing the full context is vital for debugging:
import logging
import traceback
logger = logging.getLogger("app")
def process_transaction(user_id, amount):
try:
if amount <= 0:
raise ValueError("Amount must be positive")
# Process payment...
result = payment_gateway.charge(user_id, amount)
logger.info({
"event": "payment_processed",
"user_id": user_id,
"amount": amount,
"transaction_id": result.transaction_id
})
return result
except Exception as e:
logger.error({
"event": "payment_failed",
"user_id": user_id,
"amount": amount,
"error": str(e),
"traceback": traceback.format_exc()
})
raise
I also create custom exception handlers for frameworks like Flask or Django to ensure consistent error logging across the application.
from flask import Flask, request, jsonify
app = Flask(__name__)
logger = logging.getLogger("app")
@app.errorhandler(Exception)
def handle_exception(e):
logger.error({
"event": "unhandled_exception",
"path": request.path,
"method": request.method,
"error": str(e),
"traceback": traceback.format_exc()
})
return jsonify({"error": "Internal server error"}), 500
Set Up Centralized Logging
For distributed applications, centralized logging is a must. I typically use ELK Stack (Elasticsearch, Logstash, Kibana) or a managed service like Datadog, New Relic, or AWS CloudWatch.
import logging
import watchtower
import boto3
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
# Local logging
file_handler = logging.FileHandler("app.log")
logger.addHandler(file_handler)
# CloudWatch logging
cloudwatch_handler = watchtower.CloudWatchLogHandler(
log_group="MyApp",
stream_name="production",
boto3_session=boto3.Session(
region_name="us-west-2",
)
)
logger.addHandler(cloudwatch_handler)
For larger applications, I set up asynchronous handlers to prevent logging from blocking the main application thread:
import logging
import threading
import queue
from logging.handlers import QueueHandler, QueueListener
# Create queue and handlers
log_queue = queue.Queue(-1)
file_handler = logging.FileHandler("app.log")
stream_handler = logging.StreamHandler()
# Configure queue handler and listener
queue_handler = QueueHandler(log_queue)
listener = QueueListener(log_queue, file_handler, stream_handler)
# Set up logger
logger = logging.getLogger("app")
logger.setLevel(logging.INFO)
logger.addHandler(queue_handler)
# Start listener in background thread
listener.start()
# Log messages now go through the queue
logger.info("Application started")
# Stop listener when application exits
listener.stop()
Create Custom Log Formatters
I often create custom formatters to make logs more readable or to add extra information:
import logging
import time
import json
from datetime import datetime
import socket
class CustomJsonFormatter(logging.Formatter):
def format(self, record):
log_record = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"hostname": socket.gethostname(),
"process_id": record.process,
"thread_id": record.thread,
}
# Add exception info if available
if record.exc_info:
log_record["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": self.formatException(record.exc_info)
}
# Add extra attributes
if hasattr(record, "request_id"):
log_record["request_id"] = record.request_id
return json.dumps(log_record)
# Set up handler with custom formatter
handler = logging.StreamHandler()
handler.setFormatter(CustomJsonFormatter())
logger = logging.getLogger("app")
logger.addHandler(handler)
Implement Performance Metrics in Logs
I like to include performance metrics in my logs to monitor application health:
import logging
import time
import functools
logger = logging.getLogger("app")
def log_execution_time(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
result = func(*args, **kwargs)
execution_time = time.time() - start_time
logger.info({
"event": "function_execution",
"function": func.__name__,
"execution_time_ms": round(execution_time * 1000, 2)
})
return result
return wrapper
@log_execution_time
def process_data(items):
# Processing logic
return processed_items
This approach provides valuable data for spotting performance bottlenecks in production.
Monitor and Alert on Log Patterns
Setting up monitoring for specific log patterns helps catch issues early:
import logging
import smtplib
from email.message import EmailMessage
class AlertHandler(logging.Handler):
def __init__(self, threshold=logging.ERROR, email_to=None):
super().__init__()
self.threshold = threshold
self.email_to = email_to or ["sysadmin@example.com"]
def emit(self, record):
if record.levelno >= self.threshold:
self.send_alert(record)
def send_alert(self, record):
msg = EmailMessage()
msg.set_content(f"""
Alert Level: {record.levelname}
Time: {self.format(record)}
Logger: {record.name}
Message: {record.getMessage()}
""")
msg['Subject'] = f"[ALERT] {record.levelname} in application"
msg['From'] = "alerts@example.com"
msg['To'] = ", ".join(self.email_to)
# Send the email
# Implementation depends on your environment
# This is a simple example using smtplib
with smtplib.SMTP('smtp.example.com') as server:
server.send_message(msg)
# Add the alert handler
alert_handler = AlertHandler()
alert_handler.setLevel(logging.ERROR)
logger = logging.getLogger("app")
logger.addHandler(alert_handler)
For production systems, I replace this simple implementation with integration to tools like PagerDuty, OpsGenie, or Slack.
Adapt Logging for Serverless Environments
In serverless environments like AWS Lambda, logging requires special consideration:
import logging
import json
import os
# Lambda automatically captures logs sent to stdout
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Remove any existing handlers to avoid duplicate logs
for handler in logger.handlers:
logger.removeHandler(handler)
# Add a stream handler that formats logs as JSON
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
def lambda_handler(event, context):
request_id = context.aws_request_id
logger.info(json.dumps({
"event": "function_invoked",
"request_id": request_id,
"function_name": context.function_name,
"function_version": context.function_version,
"remaining_time_ms": context.get_remaining_time_in_millis()
}))
try:
# Function logic
result = process_event(event)
logger.info(json.dumps({
"event": "function_succeeded",
"request_id": request_id,
"execution_result": "success"
}))
return result
except Exception as e:
logger.error(json.dumps({
"event": "function_failed",
"request_id": request_id,
"error": str(e),
"error_type": type(e).__name__
}))
raise
Configure Logging Through External Files
For applications with multiple environments, I configure logging through external files:
import logging.config
import yaml
import os
def setup_logging():
# Determine environment
env = os.environ.get('ENVIRONMENT', 'development')
# Load appropriate config file
config_path = f"config/logging_{env}.yaml"
if os.path.exists(config_path):
with open(config_path, 'rt') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
else:
# Fallback configuration
logging.basicConfig(level=logging.INFO)
return logging.getLogger("app")
logger = setup_logging()
A sample YAML configuration file might look like:
version: 1
formatters:
simple:
format: '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
json:
(): my_app.logging.CustomJsonFormatter
handlers:
console:
class: logging.StreamHandler
level: INFO
formatter: simple
stream: ext://sys.stdout
file:
class: logging.handlers.RotatingFileHandler
level: INFO
formatter: json
filename: app.log
maxBytes: 10485760 # 10MB
backupCount: 5
loggers:
app:
level: INFO
handlers: [console, file]
propagate: no
This approach makes it easy to configure different logging behaviors for development, testing, and production environments.
By implementing these practices, I’ve been able to create robust logging systems that provide timely, actionable information when issues arise in production. While setting up proper logging requires an upfront investment, the time saved during troubleshooting more than compensates for the effort.
Remember that effective logging is a balance—too little information leaves you blind when problems occur, while too much creates noise that obscures important signals. Finding the right balance for your application is key to maintaining a reliable production system.