Structured Logs, Context, and Correlation IDs
Log structured data to enable queries, use correlation IDs to track requests, and maintain context across async boundaries.
TL;DR
Unstructured logs are hard to search and query. Log as JSON with consistent field names (e.g., timestamp, level, service, correlation_id, user_id, error). Correlation IDs uniquely identify user requests and flow through all services touched by that request. Propagate correlation IDs in HTTP headers, message queues, and async tasks. Use context management (request-scoped storage) to avoid passing context through function parameters. Log structured data with context automatically injected—this enables powerful queries: find all logs for a user, trace requests across services, identify error patterns. Structured logging plus correlation IDs transform logs from noise into a queryable audit trail.
Learning Objectives
- Understand why unstructured logs fail in distributed systems
- Design structured log formats for queryability
- Implement correlation IDs for request tracking
- Propagate context across service boundaries and async tasks
- Use context managers to inject contextual fields automatically
- Query structured logs to troubleshoot incidents
Motivating Scenario
A customer reports a payment failure. You search your logs for "payment" and get 50,000 results. You filter by timestamp, user ID, and finally find the relevant entries. But they reference calls to other services—fraud detection, inventory, accounting. Their logs are separate systems. You manually correlate timestamps and user IDs across three services to understand the sequence. It takes two hours. A better approach: every request has a unique correlation ID assigned at entry. That ID flows through all services. All logs include it. A single query across all services tells the complete story in seconds.
Core Concepts
Structured Logging
Logs should be machine-readable, queryable objects—typically JSON. Each log entry contains fields like timestamp, service, level, message, user_id, request_id. This enables filtering and aggregation: find all errors for a service, find all activity for a user, find all slow requests.
Correlation IDs
A correlation ID (or request ID, trace ID) uniquely identifies a user request from entry point to completion. It flows through all services that handle that request. Every log entry includes the correlation ID, creating an explicit chain of causality.
Context Propagation
In synchronous systems (HTTP), context flows in headers. In asynchronous systems (queues, scheduled tasks), context must be explicitly propagated. Context managers enable automatic injection of contextual fields (user_id, request_id, session_id) into all logs without passing these through function parameters.
Contextual Fields
Beyond the standard fields (timestamp, level, message), add context: which user, which request, which transaction, which feature flag. These fields enable powerful queries and isolation of issues to specific users, requests, or features.
Practical Example
- Python
- Node.js
- Go
# ❌ POOR - Unstructured logging, no correlation
import logging
logging.basicConfig(format='%(message)s')
logger = logging.getLogger(__name__)
def process_payment(user_id, amount):
logger.info(f"Processing payment for user {user_id}")
fraud_check = check_fraud(user_id, amount)
if not fraud_check:
logger.error(f"Fraud detected for user {user_id}")
return False
logger.info(f"Payment of {amount} processed")
return True
# Logs are unstructured strings - hard to query across services
# No correlation ID - can't link logs from different services
# User ID is in message text, not a field - can't filter efficiently
# ✅ EXCELLENT - Structured logging with context and correlation
import json
import logging
import uuid
from contextvars import ContextVar
from typing import Optional
# Context variables for automatic injection
correlation_id_var: ContextVar[str] = ContextVar('correlation_id', default='')
user_id_var: ContextVar[Optional[str]] = ContextVar('user_id', default=None)
request_type_var: ContextVar[str] = ContextVar('request_type', default='')
class StructuredLogger:
def __init__(self, name: str):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.DEBUG)
# JSON formatter
handler = logging.StreamHandler()
handler.setFormatter(JsonFormatter())
self.logger.addHandler(handler)
def _get_context(self) -> dict:
"""Extract contextual fields from context vars."""
return {
'correlation_id': correlation_id_var.get(),
'user_id': user_id_var.get(),
'request_type': request_type_var.get(),
}
def info(self, message: str, **fields):
context = self._get_context()
self.logger.info(json.dumps({
'level': 'info',
'message': message,
'timestamp': datetime.now().isoformat(),
**context,
**fields
}))
def error(self, message: str, **fields):
context = self._get_context()
self.logger.error(json.dumps({
'level': 'error',
'message': message,
'timestamp': datetime.now().isoformat(),
**context,
**fields
}))
class JsonFormatter(logging.Formatter):
def format(self, record):
return record.getMessage()
logger = StructuredLogger('payment_service')
class CorrelationContext:
"""Context manager for request handling."""
def __init__(self, correlation_id: str, user_id: str, request_type: str):
self.correlation_id = correlation_id
self.user_id = user_id
self.request_type = request_type
def __enter__(self):
correlation_id_var.set(self.correlation_id)
user_id_var.set(self.user_id)
request_type_var.set(self.request_type)
logger.info("Request started",
endpoint="/process-payment",
method="POST")
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
logger.error("Request failed",
error_type=exc_type.__name__,
error_message=str(exc_val))
else:
logger.info("Request completed successfully")
correlation_id_var.set('')
user_id_var.set(None)
request_type_var.set('')
def process_payment(user_id: str, amount: float) -> bool:
correlation_id = str(uuid.uuid4())
with CorrelationContext(correlation_id, user_id, 'payment_processing'):
logger.info("Processing payment",
amount=amount,
currency='USD')
fraud_check = check_fraud(user_id, amount)
if not fraud_check:
logger.error("Fraud detected",
fraud_score=fraud_check['score'],
reason=fraud_check['reason'])
return False
charge_result = charge_card(user_id, amount)
if not charge_result:
logger.error("Card charge failed",
card_last_four='****1234',
processor_response='insufficient_funds')
return False
logger.info("Payment completed successfully",
transaction_id=charge_result['id'],
processor='stripe')
return True
# Query examples (using JSON log aggregation system):
# Find all payment processing logs for a user:
# correlation_id="..." AND request_type="payment_processing"
#
# Find all fraud detections across all users:
# level="error" AND message="Fraud detected"
#
# Find all requests with errors:
# level="error" AND correlation_id!=""
// ❌ POOR - Unstructured logs, lost context across async boundaries
const logger = console;
async function processPayment(userId, amount) {
logger.log(`Processing payment for user ${userId}`);
const fraudResult = await checkFraud(userId, amount);
if (!fraudResult) {
logger.error(`Fraud detected for user ${userId}`);
return false;
}
logger.log(`Payment of ${amount} processed`);
return true;
}
// Problem: No correlation ID, hard to trace across services
// Async operations lose context (userId not available without passing)
// ✅ EXCELLENT - Structured logging with context propagation
// AsyncLocalStorage replaces contextvars in Python
const requestContext = new AsyncLocalStorage();
class StructuredLogger {
constructor(serviceName) {
this.serviceName = serviceName;
}
getContext() {
return requestContext.getStore() || {};
}
log(level, message, fields = {}) {
const context = this.getContext();
const logEntry = {
timestamp: new Date().toISOString(),
level,
service: this.serviceName,
message,
correlation_id: context.correlationId,
user_id: context.userId,
request_type: context.requestType,
...fields
};
console.log(JSON.stringify(logEntry));
}
info(message, fields = {}) {
this.log('info', message, fields);
}
error(message, fields = {}) {
this.log('error', message, fields);
}
}
const logger = new StructuredLogger('payment-service');
// Middleware for HTTP frameworks
function correlationMiddleware(req, res, next) {
const correlationId = req.headers['x-correlation-id'] || uuid();
const userId = req.user?.id;
requestContext.run({
correlationId,
userId,
requestType: req.path,
}, () => {
res.setHeader('x-correlation-id', correlationId);
logger.info('Request started', {
method: req.method,
path: req.path,
ip: req.ip
});
res.on('finish', () => {
logger.info('Request completed', {
status: res.statusCode,
duration_ms: Date.now() - req._startTime
});
});
next();
});
}
// Using structured logging with context
async function processPayment(userId, amount) {
const context = requestContext.getStore();
logger.info('Processing payment', {
amount,
currency: 'USD'
});
try {
const fraudResult = await checkFraud(userId, amount);
if (!fraudResult.passed) {
logger.error('Fraud detected', {
fraud_score: fraudResult.score,
reason: fraudResult.reason
});
return false;
}
const chargeResult = await chargeCard(userId, amount);
if (!chargeResult.success) {
logger.error('Card charge failed', {
card_last_four: '****1234',
processor_error: chargeResult.error
});
return false;
}
logger.info('Payment completed', {
transaction_id: chargeResult.id,
processor: 'stripe'
});
return true;
} catch (error) {
logger.error('Payment processing error', {
error_message: error.message,
error_stack: error.stack.split('\n').slice(0, 3)
});
throw error;
}
}
// For queue/async tasks - explicitly propagate context
async function publishPaymentEvent(userId, amount, correlationId) {
return requestContext.run({
correlationId,
userId,
requestType: 'async_payment_event'
}, async () => {
logger.info('Publishing payment event to queue', {
queue: 'payment-events',
topic: 'payments.processed'
});
await queue.publish('payments.processed', {
userId,
amount,
correlationId, // Explicitly include for downstream services
timestamp: new Date().toISOString()
});
});
}
// ❌ POOR - Unstructured logs, no context propagation
package payment
import (
"log"
"fmt"
)
func ProcessPayment(userID string, amount float64) bool {
log.Println(fmt.Sprintf("Processing payment for user %s", userID))
fraudCheck := checkFraud(userID, amount)
if !fraudCheck {
log.Println(fmt.Sprintf("Fraud detected for user %s", userID))
return false
}
log.Println(fmt.Sprintf("Payment of %.2f processed", amount))
return true
}
// Problem: Unstructured, no correlation IDs
// ✅ EXCELLENT - Structured logging with correlation IDs
package payment
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
)
// Context keys for passing correlation data
type contextKey string
const (
correlationIDKey contextKey = "correlation_id"
userIDKey contextKey = "user_id"
requestTypeKey contextKey = "request_type"
)
type StructuredLogger struct {
serviceName string
}
type LogEntry struct {
Timestamp string `json:"timestamp"`
Level string `json:"level"`
Service string `json:"service"`
Message string `json:"message"`
CorrelationID string `json:"correlation_id,omitempty"`
UserID string `json:"user_id,omitempty"`
RequestType string `json:"request_type,omitempty"`
Fields map[string]interface{} `json:"fields,omitempty"`
}
func NewStructuredLogger(serviceName string) *StructuredLogger {
return &StructuredLogger{serviceName: serviceName}
}
func (sl *StructuredLogger) extractContext(ctx context.Context) map[string]string {
result := make(map[string]string)
if corrID := ctx.Value(correlationIDKey); corrID != nil {
result["correlation_id"] = corrID.(string)
}
if userID := ctx.Value(userIDKey); userID != nil {
result["user_id"] = userID.(string)
}
if reqType := ctx.Value(requestTypeKey); reqType != nil {
result["request_type"] = reqType.(string)
}
return result
}
func (sl *StructuredLogger) Log(ctx context.Context, level string, message string, fields map[string]interface{}) {
contextData := sl.extractContext(ctx)
entry := LogEntry{
Timestamp: time.Now().Format(time.RFC3339),
Level: level,
Service: sl.serviceName,
Message: message,
CorrelationID: contextData["correlation_id"],
UserID: contextData["user_id"],
RequestType: contextData["request_type"],
Fields: fields,
}
data, _ := json.Marshal(entry)
log.Println(string(data))
}
func (sl *StructuredLogger) Info(ctx context.Context, message string, fields map[string]interface{}) {
sl.Log(ctx, "info", message, fields)
}
func (sl *StructuredLogger) Error(ctx context.Context, message string, fields map[string]interface{}) {
sl.Log(ctx, "error", message, fields)
}
// HTTP middleware for correlation ID
func CorrelationMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
correlationID := r.Header.Get("X-Correlation-ID")
if correlationID == "" {
correlationID = uuid.New().String()
}
userID := r.Header.Get("X-User-ID")
ctx := context.WithValue(r.Context(), correlationIDKey, correlationID)
ctx = context.WithValue(ctx, userIDKey, userID)
ctx = context.WithValue(ctx, requestTypeKey, r.RequestURI)
w.Header().Set("X-Correlation-ID", correlationID)
logger := NewStructuredLogger("api-gateway")
logger.Info(ctx, "Request started", map[string]interface{}{
"method": r.Method,
"path": r.RequestURI,
})
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func ProcessPayment(ctx context.Context, userID string, amount float64) bool {
logger := NewStructuredLogger("payment-service")
logger.Info(ctx, "Processing payment", map[string]interface{}{
"amount": amount,
"currency": "USD",
})
fraudCheck := checkFraud(ctx, userID, amount)
if !fraudCheck {
logger.Error(ctx, "Fraud detected", map[string]interface{}{
"fraud_score": 0.85,
"reason": "velocity_check_failed",
})
return false
}
chargeResult := chargeCard(ctx, userID, amount)
if !chargeResult {
logger.Error(ctx, "Card charge failed", map[string]interface{}{
"card_last_four": "****1234",
"processor_response": "insufficient_funds",
})
return false
}
logger.Info(ctx, "Payment completed", map[string]interface{}{
"transaction_id": "txn_123abc",
"processor": "stripe",
})
return true
}
// Publishing to async system with correlation propagation
func PublishPaymentEvent(ctx context.Context, userID string, amount float64) error {
correlationID := ctx.Value(correlationIDKey).(string)
logger := NewStructuredLogger("event-publisher")
logger.Info(ctx, "Publishing payment event", map[string]interface{}{
"queue": "payment-events",
"topic": "payments.processed",
})
event := map[string]interface{}{
"user_id": userID,
"amount": amount,
"correlation_id": correlationID, // Explicit propagation for downstream
"timestamp": time.Now().Unix(),
}
return queue.Publish("payments.processed", event)
}
Propagation Patterns
HTTP Header Propagation
// Request side
const response = await fetch('https://api.example.com/charge', {
method: 'POST',
headers: {
'X-Correlation-ID': correlationId,
'X-User-ID': userId,
'Content-Type': 'application/json'
},
body: JSON.stringify({ amount })
});
// Receiving side - extract and propagate
app.use((req, res, next) => {
const correlationId = req.headers['x-correlation-id'];
const userId = req.headers['x-user-id'];
requestContext.run({ correlationId, userId }, () => {
next();
});
});
Message Queue Propagation
// Publishing
await queue.publish('payment.processed', {
userId,
amount,
_context: {
correlation_id: correlationId,
user_id: userId,
request_type: 'payment'
}
});
// Consuming
queue.on('payment.processed', (message) => {
const { correlation_id, user_id } = message._context;
requestContext.run({ correlationId: correlation_id, userId: user_id }, () => {
processPayment(message);
});
});
Async Task Propagation
# Scheduling with context
def schedule_job(user_id, job_type):
correlation_id = correlation_id_var.get()
# Explicitly pass context
task_queue.enqueue(
run_job,
user_id=user_id,
job_type=job_type,
correlation_id=correlation_id, # Explicit parameter
user_id_context=user_id
)
# Executing with context restored
def run_job(user_id, job_type, correlation_id, user_id_context):
correlation_id_var.set(correlation_id)
user_id_var.set(user_id_context)
# All logs now include correlation_id and user_id automatically
logger.info(f"Executing {job_type} job")
Design Review Checklist
- Are logs JSON-structured with consistent field names?
- Does every log entry include timestamp, level, service, and message?
- Is there a correlation ID or request ID that flows through all services?
- Are correlation IDs propagated in HTTP headers?
- Are correlation IDs propagated to async tasks and message queues?
- Does context management (request scopes, async-local storage) inject contextual fields?
- Can you query logs to find all activity for a single user, request, or transaction?
- Are sensitive fields (passwords, tokens, PII) excluded from logs?
Self-Check
-
Design a correlation ID generation and propagation strategy for a system with HTTP APIs, message queues, and scheduled tasks.
-
How would you handle a situation where a request spawns multiple parallel async tasks? Should all tasks share the same correlation ID?
-
Write a query to find all logs for a specific user across three services that all log to a centralized system.
Unstructured logs are noise. Structured JSON logs with correlation IDs enable investigation: a single query across all services reveals the complete sequence of events for a user request. Context managers inject correlation IDs and other contextual fields automatically, eliminating the need to thread context through function parameters. Invest in structured logging early—it pays dividends every time something breaks in production.
Next Steps
- Explore log levels and governance ↗ to control verbosity
- Learn retention and privacy ↗ for managing logs responsibly
- Study trace context propagation ↗ for deeper request visibility
- Review metrics ↗ for complementary observability
References
- Google Cloud Logging Best Practices. (2024). Retrieved from https://cloud.google.com/logging/docs/best-practices
- Cindy Sridharan. (2018). Distributed Systems Observability. O'Reilly Media.
- OpenTelemetry Specification. (2024). Retrieved from https://opentelemetry.io/docs/specs/
- The Twelve-Factor App - Logs. (2024). Retrieved from https://12factor.net/logs