Skip to main content

Data Quality, Lineage & Governance

Maintain data integrity and track data flow

TL;DR

Data quality: validate schema, null constraints, detect anomalies at pipeline ingestion. Lineage: track table A → B → C dependencies for debugging. Data catalog: metadata repository (tables, columns, owners, tags). Governance: enforce PII tracking, retention policies, access control, data contracts. Modern tools: dbt (lineage), Great Expectations (quality), Apache Atlas (catalog), Collibra (governance). Invest in observability: monitor freshness, completeness, accuracy of critical datasets.

Learning Objectives

By the end of this article, you will understand:

  • Data quality checks and where to implement them
  • Data lineage concepts and tools
  • Data catalog structure and benefits
  • Governance policies: PII, retention, access
  • Data contracts and schema validation
  • Observability metrics for data pipelines
  • Operational best practices

Motivating Scenario

Your finance team runs daily revenue reports. One week, reported revenue drops 50% unexpectedly. Investigation reveals: a data pipeline misconfigured nullable fields, silently dropping transactions with NULL product_id (representing gift cards). Root cause took 3 days to track. With proper governance: data quality checks would catch missing product IDs immediately; lineage would show where rows dropped; catalog would document that product_id is required; contracts would enforce schema upstream.

Core Concepts

Data Governance Layers

1. Data Quality

Schema validation, null/uniqueness constraints, anomaly detection:

# Great Expectations: quality checks
expectation_suite = ExpectationSuite(name="transactions")
expectation_suite.add_expectation(
ExpectColumnValuesToNotBeNull(column="transaction_id")
)
expectation_suite.add_expectation(
ExpectColumnValuesToBeBetween(column="amount", min_value=0, max_value=1000000)
)
expectation_suite.add_expectation(
ExpectTableRowCountToBeBetween(min_value=1000, max_value=10000000)
)

2. Lineage

Tracks data dependencies: source → transform → target

raw_events (source)

events_parsed (cleaned)

user_events_daily (aggregated)

user_dashboards (reporting)

Tools: dbt (SQL transformations), Apache Atlas, OpenMetadata

3. Catalog

Metadata repository: what data exists, owner, freshness, SLA

{
"table": "user_events_daily",
"owner": "analytics@company.com",
"description": "Daily user event aggregations",
"tags": ["pii:email", "pii:user_id", "critical"],
"schema": {...},
"retention": "90 days",
"refresh_sla": "daily at 3 AM UTC",
"last_updated": "2025-02-14T03:05:00Z"
}

Practical Example

Data Quality & Governance Implementation

from great_expectations.core.expectation_suite import ExpectationSuite
import pandas as pd
import logging

class DataGovernance:
def __init__(self):
self.quality_suite = ExpectationSuite(name="production")
self.catalog = {}
self.lineage = {}

def define_quality_contract(self, table_name, schema):
"""Define expected data contract"""
expectations = ExpectationSuite(name=table_name)

for column, dtype in schema.items():
# Not null check
expectations.add_expectation(
ExpectColumnValuesToNotBeNull(column=column)
)

# Type check
if dtype == "integer":
expectations.add_expectation(
ExpectColumnValuesToBeOfType(column=column, type_="int64")
)
elif dtype == "email":
expectations.add_expectation(
ExpectColumnValuesToMatchRegex(
column=column,
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
)

self.quality_suite = expectations
return expectations

def validate_data(self, df, expectations):
"""Run quality checks"""
validation_result = expectations.validate(df)

if not validation_result.success:
failed = validation_result.failures
logging.error(f"Data quality check failed: {failed}")
raise ValueError(f"Data validation failed: {failed}")

return True

def register_catalog_entry(self, table_name, metadata):
"""Register table in catalog"""
self.catalog[table_name] = {
"name": table_name,
"owner": metadata.get("owner"),
"description": metadata.get("description"),
"tags": metadata.get("tags", []),
"freshness_sla_hours": metadata.get("freshness_hours", 24),
"pii_fields": metadata.get("pii_fields", []),
"retention_days": metadata.get("retention_days", 90),
"created_at": datetime.now().isoformat()
}

def track_lineage(self, source_tables, target_table, transformation):
"""Track data flow"""
self.lineage[target_table] = {
"sources": source_tables,
"transformation": transformation,
"created_at": datetime.now().isoformat()
}

def validate_pii_access(self, user_email, table_name):
"""Check if user can access PII fields"""
table_meta = self.catalog.get(table_name)
if not table_meta:
raise ValueError(f"Table {table_name} not in catalog")

pii_fields = table_meta.get("pii_fields", [])
if pii_fields:
# Check user permissions
if not self.user_has_pii_access(user_email):
logging.warn(f"User {user_email} denied access to PII in {table_name}")
raise PermissionError(f"No PII access for {user_email}")

return True

def check_freshness(self, table_name, last_update_time):
"""Monitor data freshness"""
table_meta = self.catalog.get(table_name)
sla_hours = table_meta.get("freshness_sla_hours", 24)

age_hours = (datetime.now() - last_update_time).total_seconds() / 3600
if age_hours > sla_hours:
logging.error(f"{table_name} is stale: {age_hours}h > SLA {sla_hours}h")
raise ValueError(f"Data freshness SLA violated for {table_name}")

return True

# Usage
gov = DataGovernance()

# Define contract
schema = {
"user_id": "integer",
"email": "email",
"created_at": "timestamp",
"purchase_amount": "float"
}
gov.define_quality_contract("transactions", schema)

# Register in catalog
gov.register_catalog_entry("transactions", {
"owner": "finance@company.com",
"description": "Daily transaction log",
"tags": ["pii:email", "financial", "critical"],
"pii_fields": ["user_id", "email"],
"freshness_hours": 4,
"retention_days": 365
})

# Track lineage
gov.track_lineage(
source_tables=["raw_events"],
target_table="transactions",
transformation="Clean, deduplicate, enrich with user data"
)

# Validate incoming data
df = pd.read_csv("transactions.csv")
gov.validate_data(df, gov.quality_suite)

Patterns and Pitfalls

Implement at pipeline ingestion: schema validation, null/uniqueness constraints, anomaly detection (statistical outliers). Fail fast on violations; quarantine bad data.
Track source → transform → target dependencies. Tools: dbt (SQL), Apache Atlas (general). Use for debugging: 'where did this data come from?' and impact analysis: 'if I change X, what breaks?'
Central metadata repository: table descriptions, owners, tags, freshness SLA, retention policy. Enables discovery and governance. Tools: Collibra, Apache Atlas, or custom.
Explicit schema + SLA. Producer (data pipeline) guarantees schema, freshness. Consumer (analytics) validates contract on ingestion. Fail fast on contract violation.
Tag all PII fields in catalog. Enforce access control, encryption, retention. Regularly audit who accesses PII. Required for GDPR/CCPA compliance.
Monitor: freshness (time since last update), completeness (% non-null), accuracy (anomalies). Set SLA alerts. Dashboard showing data health across all pipelines.

Design Review Checklist

  • Identified all critical datasets requiring governance
  • Defined quality checks: schema, nulls, anomalies
  • Implemented data quality framework (Great Expectations or custom)
  • Mapped data lineage: sources → transforms → targets
  • Built or adopted data catalog (tools: Collibra, Atlas, custom)
  • Classified all PII fields; enforced access controls
  • Defined data contracts (schema + SLA) with upstream producers
  • Set up freshness, completeness, accuracy monitoring
  • Established retention policies; automated deletion
  • Created runbook for handling data quality failures
  • Documented catalog: how to register, tag, search tables
  • Regular audits: who accesses PII? Are contracts being met?

Next Steps

  1. Audit current data: Identify critical datasets
  2. Implement quality checks: Start with nulls, uniqueness
  3. Build simple catalog: Spreadsheet or tool (Collibra/Atlas)
  4. Track lineage: Manually initially, automate later (dbt)
  5. Classify PII: Tag sensitive fields
  6. Monitor freshness: Set SLA alerts
  7. Enforce contracts: Validate schema on ingestion

References