Search Offloading & Aggregations
Use specialized systems for search and analytics
TL;DR
Databases excel at transactional operations but struggle with complex searches (full-text, facets) and analytics (massive aggregations). Offload to specialized systems: Elasticsearch/Solr for search, BigQuery/Snowflake for analytics. Primary database (PostgreSQL, MySQL) handles transactional operations; specialized systems handle search/analytics via denormalized copies. Synchronize via ETL pipelines or CDC (Change Data Capture). This separation provides optimal performance and scaling for each workload.
Learning Objectives
By the end of this article, you will understand:
- Why specialized systems outperform general-purpose databases for specific tasks
- Search engine architecture and use cases
- Data warehouse design for analytics
- Denormalization strategies for different workloads
- Synchronization patterns (ETL, CDC, streaming)
- Trade-offs between consistency and freshness in offloading
- Operational considerations and monitoring
Motivating Scenario
Your e-commerce platform has 10 million products in PostgreSQL. Users expect to search by keywords, filter by price/category (facets), and see results in < 500ms. A full-text search query on PostgreSQL takes 3+ seconds—too slow. Separately, the finance team needs daily analytics: revenue by region, top products, customer segments. These aggregations scan billions of rows—prohibitively expensive to run on the transactional database. Solution: PostgreSQL for CRUD operations; Elasticsearch for search; BigQuery for analytics.
Core Concepts
Three Database Types
OLTP (Online Transactional Processing):
- Primary database (PostgreSQL, MySQL)
- Optimized for: single row lookups, fast inserts/updates
- Normalized schema (reduce redundancy)
- Strong consistency
- Example: Order creation, user profile updates
Search:
- Elasticsearch, Solr, Meilisearch
- Optimized for: full-text search, facets, relevance ranking
- Inverted indexes (fast keyword search)
- Denormalized data (single document per searchable entity)
- Example: "blue shoes under $100" → milliseconds
OLAP (Online Analytical Processing):
- Data warehouse (BigQuery, Snowflake, Redshift)
- Optimized for: complex aggregations across billions of rows
- Star schema (fact tables + dimension tables)
- Columnstore compression (compress similar data together)
- Eventually consistent (freshness minutes to hours)
- Example: Revenue by region/product/month
Denormalization Strategies
Denormalization duplicates data for performance. But copy different fields in different systems:
OLTP (Normalized):
users (id, name, email)
addresses (id, user_id, street, city)
orders (id, user_id, total)
Search (Denormalized):
{
"product_id": 123,
"name": "Blue Running Shoes",
"description": "...",
"price": 89.99,
"category": "shoes",
"category_name": "Shoes", // Denormalized from categories table
"brand": "Nike", // Denormalized from brands table
"tags": ["running", "nike", "blue"]
}
Analytics (Star Schema):
Fact: orders (order_id, user_id, product_id, date, amount, quantity)
Dim: users (user_id, country, signup_date)
Dim: products (product_id, category, brand)
Dim: dates (date, month, quarter, year)
Practical Example
Building Synchronized OLTP + Search + Analytics
- Python (Sync Pipeline)
- Go (OLTP + BigQuery)
- Node.js (CDC Pipeline)
import psycopg2
from elasticsearch import Elasticsearch
import json
from datetime import datetime
class DataSyncPipeline:
def __init__(self, pg_conn_str, es_hosts):
self.pg_conn = psycopg2.connect(pg_conn_str)
self.es = Elasticsearch(es_hosts)
def sync_products_to_search(self):
"""ETL: PostgreSQL → Elasticsearch"""
cursor = self.pg_conn.cursor()
cursor.execute("""
SELECT p.id, p.name, p.description, p.price,
c.name as category_name, b.name as brand_name,
ARRAY_AGG(t.tag) as tags
FROM products p
LEFT JOIN categories c ON p.category_id = c.id
LEFT JOIN brands b ON p.brand_id = b.id
LEFT JOIN product_tags pt ON p.id = pt.product_id
LEFT JOIN tags t ON pt.tag_id = t.id
GROUP BY p.id, p.name, c.name, b.name
""")
bulk_actions = []
for product in cursor.fetchall():
product_id, name, description, price, category, brand, tags = product
doc = {
"product_id": product_id,
"name": name,
"description": description,
"price": price,
"category": category,
"brand": brand,
"tags": tags or [],
"indexed_at": datetime.now().isoformat()
}
bulk_actions.append({
"index": {"_index": "products", "_id": product_id}
})
bulk_actions.append(doc)
# Bulk index in Elasticsearch
if bulk_actions:
self.es.bulk(body=bulk_actions)
print(f"Indexed {len(bulk_actions)//2} products to Elasticsearch")
def search_products(self, query_text, filters=None):
"""Full-text search with facets"""
query_body = {
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": query_text,
"fields": ["name^2", "description", "brand", "tags"]
}
}
],
"filter": []
}
},
"aggs": {
"categories": {"terms": {"field": "category.keyword"}},
"brands": {"terms": {"field": "brand.keyword"}},
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 50},
{"from": 50, "to": 100},
{"from": 100, "to": 200},
{"from": 200}
]
}
}
}
}
# Apply filters
if filters:
if "category" in filters:
query_body["query"]["bool"]["filter"].append({
"term": {"category.keyword": filters["category"]}
})
if "brand" in filters:
query_body["query"]["bool"]["filter"].append({
"term": {"brand.keyword": filters["brand"]}
})
if "max_price" in filters:
query_body["query"]["bool"]["filter"].append({
"range": {"price": {"lte": filters["max_price"]}}
})
results = self.es.search(index="products", body=query_body)
return {
"hits": [
{
"product_id": hit["_source"]["product_id"],
"name": hit["_source"]["name"],
"price": hit["_source"]["price"],
"score": hit["_score"]
}
for hit in results["hits"]["hits"]
],
"facets": {
"categories": [
{"name": b["key"], "count": b["doc_count"]}
for b in results["aggregations"]["categories"]["buckets"]
],
"brands": [
{"name": b["key"], "count": b["doc_count"]}
for b in results["aggregations"]["brands"]["buckets"]
]
},
"total": results["hits"]["total"]["value"]
}
# Usage
pipeline = DataSyncPipeline(
pg_conn_str="postgresql://user:pwd@localhost/ecommerce",
es_hosts=["http://localhost:9200"]
)
# Sync to Elasticsearch (run hourly)
pipeline.sync_products_to_search()
# Search API
results = pipeline.search_products(
query_text="blue running shoes",
filters={"max_price": 150, "brand": "Nike"}
)
print(f"Found {results['total']} products")
for hit in results['hits'][:5]:
print(f" - {hit['name']} (${hit['price']})")
package main
import (
"context"
"database/sql"
"cloud.google.com/go/bigquery"
_ "github.com/lib/pq"
)
type AnalyticsSyncService struct {
pgDB *sql.DB
bqClient *bigquery.Client
}
func NewAnalyticsSyncService(pgConn string, bqProjectID string) (*AnalyticsSyncService, error) {
pgDB, err := sql.Open("postgres", pgConn)
if err != nil {
return nil, err
}
ctx := context.Background()
bqClient, err := bigquery.NewClient(ctx, bqProjectID)
if err != nil {
return nil, err
}
return &AnalyticsSyncService{pgDB, bqClient}, nil
}
// Sync daily sales to BigQuery (as fact table)
func (s *AnalyticsSyncService) SyncDailySales(ctx context.Context) error {
// Extract from PostgreSQL
rows, err := s.pgDB.QueryContext(ctx, `
SELECT
DATE(o.created_at) as order_date,
p.product_id,
u.user_id,
u.country,
o.amount,
o.quantity,
p.category
FROM orders o
JOIN products p ON o.product_id = p.id
JOIN users u ON o.user_id = u.id
WHERE o.created_at >= NOW() - INTERVAL '1 day'
`)
if err != nil {
return err
}
defer rows.Close()
// Load into BigQuery
inserter := s.bqClient.Dataset("analytics").Table("daily_sales").Inserter()
type DailySale struct {
OrderDate string `bigquery:"order_date"`
ProductID int64 `bigquery:"product_id"`
UserID int64 `bigquery:"user_id"`
Country string `bigquery:"country"`
Amount float64 `bigquery:"amount"`
Quantity int `bigquery:"quantity"`
Category string `bigquery:"category"`
}
var sales []*DailySale
for rows.Next() {
var orderDate, country, category string
var productID, userID int64
var amount float64
var quantity int
if err := rows.Scan(&orderDate, &productID, &userID, &country, &amount, &quantity, &category); err != nil {
return err
}
sales = append(sales, &DailySale{
OrderDate: orderDate,
ProductID: productID,
UserID: userID,
Country: country,
Amount: amount,
Quantity: quantity,
Category: category,
})
}
if len(sales) > 0 {
if err := inserter.Put(ctx, sales); err != nil {
return err
}
}
return nil
}
// Complex analytics query in BigQuery
func (s *AnalyticsSyncService) QueryRevenueByRegion(ctx context.Context) ([]map[string]interface{}, error) {
query := s.bqClient.Query(`
SELECT
country,
DATE_TRUNC(order_date, MONTH) as month,
SUM(amount) as revenue,
COUNT(DISTINCT user_id) as unique_customers,
SUM(quantity) as units_sold
FROM analytics.daily_sales
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 12 MONTH)
GROUP BY country, month
ORDER BY month DESC, revenue DESC
`)
job, err := query.Read(ctx)
if err != nil {
return nil, err
}
var results []map[string]interface{}
for {
var row []bigquery.Value
err := job.Next(&row)
if err == bigquery.IteratorDone {
break
}
if err != nil {
return nil, err
}
results = append(results, map[string]interface{}{
"country": row[0],
"month": row[1],
"revenue": row[2],
"customers": row[3],
"units": row[4],
})
}
return results, nil
}
func main() {
sync, _ := NewAnalyticsSyncService(
"postgresql://user:pwd@localhost/prod",
"my-project",
)
ctx := context.Background()
sync.SyncDailySales(ctx)
results, _ := sync.QueryRevenueByRegion(ctx)
for _, row := range results {
println("Country:", row["country"], "Revenue:", row["revenue"])
}
}
const { Client } = require('pg');
const { Kafka } = require('kafkajs');
class CDCPipeline {
constructor(pgConfig, kafkaConfig) {
this.pgClient = new Client(pgConfig);
this.kafka = new Kafka(kafkaConfig);
}
async startCDC() {
await this.pgClient.connect();
// PostgreSQL logical replication: decode changes to JSON
const result = await this.pgClient.query(`
CREATE PUBLICATION IF NOT EXISTS product_changes FOR TABLE products;
`);
// Consume changes (simplified; real CDC would use Debezium)
setInterval(async () => {
const changes = await this.pgClient.query(`
SELECT * FROM products
WHERE updated_at > NOW() - INTERVAL '1 minute'
`);
await this.publishChanges(changes.rows);
}, 60000); // Every minute
}
async publishChanges(changes) {
const producer = this.kafka.producer();
await producer.connect();
const messages = changes.map(row => ({
key: String(row.id),
value: JSON.stringify({
event: 'product.updated',
product_id: row.id,
name: row.name,
price: row.price,
timestamp: new Date().toISOString()
})
}));
await producer.send({
topic: 'products',
messages
});
await producer.disconnect();
}
async consumeChangesForSearch() {
const consumer = this.kafka.consumer({ groupId: 'search-sync' });
await consumer.connect();
await consumer.subscribe({ topic: 'products' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
// Update search index
console.log(`Updating search for product ${event.product_id}`);
// Call Elasticsearch update here
}
});
}
async consumeChangesForAnalytics() {
const consumer = this.kafka.consumer({ groupId: 'analytics-sync' });
await consumer.connect();
await consumer.subscribe({ topic: 'products' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
// Append to data warehouse
console.log(`Recording product change in warehouse for ${event.product_id}`);
// Insert into BigQuery/Snowflake here
}
});
}
}
// Usage
const pipeline = new CDCPipeline(
{ host: 'localhost', database: 'ecommerce', user: 'app', password: 'pwd' },
{ brokers: ['localhost:9092'] }
);
pipeline.startCDC();
pipeline.consumeChangesForSearch();
pipeline.consumeChangesForAnalytics();
When to Use / When Not to Use
- Need full-text search (keyword matching, fuzzy matching)
- Require faceted navigation (filter by category, price, etc.)
- Need relevance ranking (best matches first)
- High volume of search queries
- User-facing search feature (sub-second latency expected)
- Complex aggregations across billions of rows
- Historical analytics (trend analysis, cohorts)
- Cross-organizational reporting
- Data freshness: hours or days acceptable
- Complex joins across many dimension tables
Patterns and Pitfalls
Operational Considerations
1. Monitoring Sync Lag
# Track lag between source (OLTP) and destination (search/warehouse)
def monitor_sync_lag(pg_conn, es_client):
cursor = pg_conn.cursor()
cursor.execute("SELECT MAX(updated_at) FROM products")
latest_oltp = cursor.fetchone()[0]
latest_search = es_client.search(
index="products",
body={"aggs": {"max_timestamp": {"max": {"field": "indexed_at"}}}}
)
latest_search_ts = latest_search["aggregations"]["max_timestamp"]["value"]
lag_seconds = (latest_oltp - latest_search_ts).total_seconds()
if lag_seconds > 3600: # Alert if > 1 hour
logger.warn(f"Search sync lag: {lag_seconds}s")
2. Handling Failed Syncs
# Retry with exponential backoff
import time
from functools import wraps
def retry_sync(max_attempts=3, backoff=2):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_attempts - 1:
logger.error(f"Sync failed after {max_attempts} attempts: {e}")
raise
wait_time = backoff ** attempt
logger.warn(f"Sync failed, retrying in {wait_time}s: {e}")
time.sleep(wait_time)
return wrapper
return decorator
@retry_sync(max_attempts=3)
def sync_to_elasticsearch():
# ETL logic here
pass
Design Review Checklist
- Identified queries better served by specialized systems (search, analytics)
- Measured: current latency, acceptable latency, volume
- Chose appropriate systems (Elasticsearch for search, BigQuery for analytics)
- Designed denormalization strategy (different for each system)
- Selected synchronization method (batch ETL, CDC, streaming)
- Defined acceptable freshness lag for each system
- Implemented monitoring for sync lag and failures
- Set up retry logic with exponential backoff
- Documented data model in each system (search document, warehouse schema)
- Planned capacity: storage, compute, network bandwidth
- Tested failover: what if Elasticsearch is down? Search feature degrades gracefully?
- Verified cost-benefit: operational overhead vs performance gains
Self-Check Questions
-
When would you use search engine vs database?
- Search engine: full-text search, facets, relevance ranking. Database: single record lookups, transactional consistency.
-
What's a star schema and why use for analytics?
- Central fact table (transactions) with dimension tables (attributes). Enables fast aggregations; standard for data warehouses.
-
How do you keep search index synchronized with database?
- ETL pipelines (batch), CDC (continuous), or streaming. Choose based on acceptable lag.
-
Why denormalize for analytics but normalize for OLTP?
- OLTP: normalized = update once, consistency. Analytics: denormalized = pre-joined, fast aggregations.
Next Steps
- Audit queries: Identify slow searches and analytics
- Estimate cost-benefit: Does specialized system ROI justify operational overhead?
- Pilot with one system: Start with Elasticsearch for search (most common)
- Build sync pipeline: Batch ETL initially (simpler than CDC)
- Monitor sync lag: Set up alerts, establish SLA
- Extend to analytics: Add data warehouse for complex reporting
- Optimize denormalization: Balance consistency and performance
References
- Elasticsearch Guide: Official Docs ↗️
- BigQuery Architecture: Google Cloud ↗️
- Star Schema Design: Wikipedia ↗️