Skip to main content

Streaming and Event Processing

Exactly-once semantics, windowing, watermarks, checkpointing, and backpressure in unbounded data

TL;DR

Stream processing handles unbounded, continuous data from multiple sources (sensors, logs, user actions). Three key semantics: at-most-once (fastest, may lose data), at-least-once (some duplication, exactly-once achieved via deduplication), exactly-once (most expensive, requires checkpointing and idempotent updates). Kafka provides durable, replicated message queue. Flink/Spark offer windowing, watermarks, stateful processing. Watermarks coordinate event time vs processing time. Backpressure prevents overwhelming downstream systems.

Learning Objectives

  • Understand exactly-once vs at-least-once vs at-most-once semantics
  • Design checkpoint-based fault tolerance
  • Implement windowing and watermarks
  • Handle late/out-of-order events
  • Manage backpressure and flow control
  • Choose streaming platform (Kafka, Flink, Spark)

Motivating Scenario

You're building a real-time fraud detection system. Credit card transactions flow in from thousands of merchants. System must detect patterns (rapid purchases in distant cities, unusual amounts) and flag fraudulent transactions within 200 milliseconds. Key challenges: transactions can arrive out of order (network delays), arrive late, or fail mid-processing. If a transaction is processed twice, the fraud score is corrupted. Exactly-once semantics are non-negotiable for financial accuracy.

Core Concepts

Stream processing handles unbounded, continuous data with emphasis on delivery guarantees and time semantics:

At-Most-Once: Event processed ≤1 time. Fastest but data loss. Acceptable for metrics (sampling).

At-Least-Once: Event processed ≥1 time. Some duplication. Requires idempotent operations or deduplication.

Exactly-Once: Event processed exactly 1 time. Most expensive via transactions + snapshots.

Event Time: Timestamp when event actually occurred (transaction time). Key for correctness.

Processing Time: When system processes event. Affected by queuing, network delays.

Watermark: Marker indicating "all events before this timestamp have arrived" (approximately).

Window: Time-bounded aggregation (e.g., count transactions per 5-minute window).

Checkpoint: Snapshot of operator state + offset. Enables recovery without data loss.

Streaming architecture: Event time vs processing time, watermarks, windowing

Key Concepts

Idempotency: Same update applied twice = same result. Essential for at-least-once.

State Backend: Where operator state lives (in-memory, RocksDB, external store).

Savepoint: Manual snapshot for code updates or debugging.

Kafka Consumer Group: Multiple processes read partition, with offset management.

Exactly-Once via 2-Phase Commit: Coordinated commit between Kafka offset and sink.

Practical Example

from collections import defaultdict
from dataclasses import dataclass
from typing import Dict, List, Tuple
import heapq
import time

@dataclass
class Event:
event_id: str
event_time: int # seconds since epoch
value: float
processing_time: int = None # set when received

class StreamProcessor:
"""Simplified streaming processor with windowing and watermarks."""

def __init__(self, window_size_sec: int = 5, watermark_lag_sec: int = 2):
self.window_size = window_size_sec
self.watermark_lag = watermark_lag_sec
self.events: List[Event] = []
self.state: Dict[int, float] = defaultdict(float)
self.watermark_time = 0
self.offset = 0
self.checkpoint = {"offset": 0, "state": {}}

def ingest(self, event: Event):
"""Receive event and update watermark."""
event.processing_time = int(time.time())
self.events.append(event)
heapq.heappush(self.events, (event.event_time, event))

def watermark(self) -> int:
"""Calculate current watermark (max event_time - lag)."""
if not self.events:
return 0
max_event_time = max(e.event_time for e in self.events)
return max_event_time - self.watermark_lag

def window_key(self, event_time: int) -> int:
"""Map event_time to window start."""
return (event_time // self.window_size) * self.window_size

def process(self):
"""Process events up to watermark."""
current_watermark = self.watermark()

# Process all events before watermark
events_to_process = [e for e in self.events if e.event_time < current_watermark]
events_to_process.sort(key=lambda e: e.event_time)

for event in events_to_process:
# Idempotent: process event, update state
window = self.window_key(event.event_time)
self.state[window] += event.value
self.offset += 1

# Remove processed events
self.events = [e for e in self.events if e.event_time >= current_watermark]

# Checkpoint: save offset and state
self.checkpoint["offset"] = self.offset
self.checkpoint["state"] = dict(self.state)

def get_windowed_results(self) -> Dict[int, float]:
"""Get aggregated windows."""
return dict(self.state)

def recover_from_checkpoint(self):
"""Restore from checkpoint after failure."""
self.offset = self.checkpoint["offset"]
self.state = self.checkpoint["state"].copy()
print(f"Recovered: offset={self.offset}, state keys={len(self.state)}")

# Example: Process transaction stream
processor = StreamProcessor(window_size_sec=5, watermark_lag_sec=2)

# Simulate events arriving out of order
events_in = [
Event("tx1", event_time=10, value=100),
Event("tx2", event_time=12, value=150), # Out of order
Event("tx3", event_time=8, value=200), # Late arrival
Event("tx4", event_time=15, value=50),
Event("tx5", event_time=20, value=300),
Event("tx6", event_time=19, value=100),
]

for event in events_in:
processor.ingest(event)
processor.process()
results = processor.get_windowed_results()
print(f"Window {processor.window_key(event.event_time)}: {results.get(processor.window_key(event.event_time), 0)}")

print(f"\nFinal state: {processor.get_windowed_results()}")
print(f"Checkpoint: offset={processor.checkpoint['offset']}")

# Simulate failure and recovery
print("\n--- Simulating failure ---")
processor.recover_from_checkpoint()
print(f"Recovered state: {processor.state}")

When to Use / When Not to Use

  1. Processing unbounded, continuous data streams
  2. Sub-second latency required for results
  3. Event order/time matters for correctness
  4. Real-time fraud detection, recommendations, monitoring
  5. Complex windowing, stateful aggregations
  6. Need exactly-once guarantees
  1. Bounded, batch data (use batch processing)
  2. Latency of minutes/hours is acceptable
  3. Simple aggregations (standard SQL suffices)
  4. Operational complexity is a concern
  5. One-time historical analysis

Patterns and Pitfalls

Patterns and Pitfalls

Design Review Checklist

  • What delivery guarantee is required (at-most-once, at-least-once, exactly-once)?
  • Is idempotency implemented in sinks (critical for at-least-once)?
  • Are late-arriving events handled (grace period, late pane)?
  • Is watermarking strategy appropriate for your data?
  • Are windowing semantics correct (tumbling, sliding, session)?
  • Is checkpoint/savepoint strategy tested for recovery?
  • Can system handle out-of-order events?
  • Is backpressure configured to prevent memory overflow?
  • Are you monitoring watermark lag and event time drift?
  • Is state backend appropriate (in-memory vs RocksDB vs external)?

Self-Check

  1. What's the difference between event time and processing time? Event time: when event occurred (transaction time). Processing time: when system processes it (affected by network, queueing).
  2. Why is exactly-once hard? Requires coordinated commit between source offset and sink. Also requires idempotent sinks to handle duplicate processing.
  3. What's a watermark? Marker indicating "all events before this timestamp have (probably) arrived." Signals when to close a window.
info

One Takeaway: The gap between event time and processing time is where correctness lives. Watermarks bridge this gap, but understanding the tradeoff (latency vs completeness) is critical.

Next Steps

  • Kafka: Topic design, consumer groups, exactly-once semantics (transactions)
  • Flink: DataStream API, windowing, savepoints, distributed snapshots
  • Spark Streaming: Micro-batch architecture, structured streaming
  • State Management: RocksDB backends, incremental checkpoints
  • Monitoring: Watermark lag, backpressure metrics, late event rates

References

  • Kreps, J. (2014). "The Log: What every software engineer should know about real-time data's unifying abstraction." ↗️
  • Akidau, T., et al. (2015). The Dataflow Model (Google). ↗️
  • Flink Documentation: Streaming Architecture. ↗️