Skip to main content

Producer-Consumer Pattern

Decouple work production from consumption using shared buffers for flexible, scalable systems

TL;DR

Producer-Consumer decouples work generation from processing through a shared buffer (queue). Producers add work items while consumers retrieve and process them independently. This pattern scales with buffer size, handles variable production/consumption rates, and enables natural load leveling. Use bounded queues with blocking semantics to prevent memory exhaustion.

Learning Objectives

You will be able to:

  • Understand how shared queues enable producer-consumer decoupling
  • Implement bounded queues with blocking and timeout semantics
  • Coordinate multiple producers and consumers safely
  • Apply backpressure to prevent system overload
  • Recognize blocking vs. non-blocking queue variants and their tradeoffs
  • Debug producer-consumer deadlocks and starvation issues

Motivating Scenario

Your payment processing system accepts requests from users at highly variable rates. Sometimes 10 requests per second, sometimes 1,000. Your backend needs to:

  1. Receive requests immediately without keeping users waiting
  2. Process payments safely with transactional consistency
  3. Handle bursts gracefully without dropping requests
  4. Scale workers independently based on current load

A naive approach (synchronous handler) blocks each request until payment completes, creating bottlenecks. Producer-Consumer solves this: requests (producers) add items to a bounded queue; payment processors (consumers) pull and handle items at their own pace. When the queue fills, producers block, providing natural backpressure.

Core Concepts

The Pattern Structure

Producer-Consumer architecture with shared buffer

Blocking vs. Non-Blocking Queues

Blocking Queue: When full, producers block until space becomes available. When empty, consumers block until items arrive. This prevents memory overload and provides automatic backpressure.

Non-Blocking Queue: Operations return immediately. Producers must check for success and handle rejections. Useful for real-time systems where blocking is unacceptable, but shifts responsibility to callers.

Bounded vs. Unbounded Queues

Bounded: Fixed maximum size. Prevents unbounded memory growth. Producers block when full—natural load regulation.

Unbounded: Grows with demand. Simplifies implementation but risks memory exhaustion under sustained high production rates.

Practical Example

import threading
import queue
import time
from typing import Any

class PaymentProcessor:
def __init__(self, num_workers: int = 3, queue_size: int = 100):
self.work_queue = queue.Queue(maxsize=queue_size)
self.workers = []
self.shutdown = False

# Start worker threads
for _ in range(num_workers):
worker = threading.Thread(target=self._worker_loop, daemon=False)
worker.start()
self.workers.append(worker)

def submit_payment(self, payment_id: str, amount: float, timeout: float = 5.0) -> bool:
"""Producers call this to submit work. Blocks if queue is full."""
try:
self.work_queue.put(
{"id": payment_id, "amount": amount},
timeout=timeout
)
return True
except queue.Full:
print(f"Queue full, rejected payment {payment_id}")
return False

def _worker_loop(self):
"""Consumer threads run this loop continuously."""
while not self.shutdown:
try:
# Get item with timeout to allow periodic shutdown checks
payment = self.work_queue.get(timeout=1.0)
self._process_payment(payment)
self.work_queue.task_done() # Signal completion
except queue.Empty:
continue
except Exception as e:
print(f"Error processing payment: {e}")

def _process_payment(self, payment: dict):
"""Simulate payment processing."""
print(f"Processing payment {payment['id']}: ${payment['amount']}")
time.sleep(0.5) # Simulate work
print(f"Completed payment {payment['id']}")

def shutdown_gracefully(self, timeout: float = 10.0):
"""Wait for queued items to complete, then shut down."""
print("Waiting for queue to drain...")
self.work_queue.join() # Blocks until all tasks done
self.shutdown = True
for worker in self.workers:
worker.join(timeout=timeout)
print("Shutdown complete")

# Usage
processor = PaymentProcessor(num_workers=3, queue_size=10)
try:
for i in range(25):
processor.submit_payment(f"PAY-{i:04d}", 99.99)
print(f"Submitted payment {i}")
time.sleep(0.1)
finally:
processor.shutdown_gracefully()

When to Use / When Not to Use

Use Producer-Consumer when:

  • Production and consumption rates are different and variable
  • You need backpressure to prevent memory exhaustion
  • Multiple independent producers or consumers exist
  • You want natural load leveling and elasticity
  • Decoupling producers from consumers reduces system complexity

Avoid when:

  • Latency-critical systems cannot tolerate queueing delays
  • The buffer size is unpredictable and potentially unbounded
  • Synchronous request-response is required (use async patterns instead)
  • A single producer-consumer pair with tight synchronization needs simpler approaches

Patterns and Pitfalls

Pitfall: Unbounded Queue Growth

Using an unbounded queue "temporarily" often leads to eventual memory exhaustion. Always use bounded queues with explicit overflow handling.

Pattern: Graceful Shutdown

Implement drain-and-shutdown semantics:

  1. Stop accepting new work
  2. Wait for queued items to complete
  3. Shut down consumer threads cleanly

Pitfall: Consumer Exceptions Stopping Processing

If a consumer throws an exception without catching and continuing, the worker thread dies. Other workers keep processing, but you've lost capacity. Wrap consumer logic in try-catch.

Pattern: Multiple Queue Depths

Use separate queues for different work priorities. High-priority work gets a dedicated consumer with lower latency.

Design Review Checklist

  • Queue is bounded to prevent memory exhaustion
  • Overflow behavior is explicitly defined (blocking, rejecting, or priority)
  • Shutdown is graceful (drains queue before terminating)
  • Consumer exceptions don't kill worker threads
  • Dead letters or error handling exists for failed items
  • Queue size is monitored and tuned based on production rates
  • Multiple producers/consumers coordinate safely via the queue
  • Timeout values prevent indefinite blocking
  • Queue metrics (size, throughput) are observable

Self-Check

  1. What happens if producers are 10x faster than consumers? How does your queue handle this?
  2. Can your consumer threads shut down gracefully without losing in-flight work?
  3. If a consumer crashes while processing an item, is it recovered or lost?
One Takeaway

A bounded shared queue decouples producers from consumers, enabling flexible scaling and natural backpressure. Always bound your queues, handle overflows explicitly, and ensure graceful shutdown.

Next Steps

References

  1. "Concurrent Programming in Java: Design Principles and Patterns" by Doug Lea
  2. "Pattern-Oriented Software Architecture Volume 2: Patterns for Concurrent and Distributed Objects" by Kircher & Jän