Featured image of post DuckDB Metadata-Driven Data Quality Monitoring: Detect Schema Drift and Anomalies in One Line of SQL

DuckDB Metadata-Driven Data Quality Monitoring: Detect Schema Drift and Anomalies in One Line of SQL

Leverage DuckDB's parquet_full_metadata() and parquet_metadata() functions to detect schema drift, null rate anomalies, and compression issues without reading any data. Perfect for building a Data Quality as a Service product.

Introduction

In the data industry, there’s an undervalued monetization opportunity: Data Quality as a Service.

Imagine this — you’ve built a data warehouse for an e-commerce company that produces weekly sales reports. Suddenly the report data looks wrong one day. Instead of blaming your technical setup, the client immediately thinks: “Did your data pipeline break again?” If you could detect and alert on these issues beforehand, you’ve just created a recurring revenue product.

Traditional approach: Spark + Great Expectations + Airflow — costing tens of thousands of dollars.

DuckDB approach: A few lines of SQL, scanning your entire dataset in seconds, with zero additional tooling.

Today, I’ll walk you through building a metadata-driven data quality monitoring system from scratch using DuckDB, and show you how to turn it into a sellable product.

DuckDB Metadata-Driven Data Quality Monitoring Architecture


Part 1: Core Principle — Metadata Is Data

DuckDB provides three powerful Parquet metadata query functions:

  • parquet_metadata(file) — Returns column-level statistics (row count, null counts, min/max values)
  • parquet_full_metadata(file) — Returns full file-level metadata (row group count, compression ratio, encoding)
  • parquet_schema(file) — Returns column definitions and types

The key insight: these functions don’t read the actual data — they only read the file headers. Even a 10GB Parquet file returns metadata in milliseconds.

-- Get column-level stats for all Parquet files in a directory in seconds
SELECT 
    column_name,
    num_nulls,
    num_values,
    min_value,
    max_value,
    ROUND(num_nulls * 100.0 / NULLIF(num_values, 0), 2) AS null_percentage
FROM parquet_metadata('data/sales/2026-06-*.parquet')
ORDER BY num_nulls DESC;

Part 2: Schema Drift Detection

Schema drift happens when the structure of your data source changes unexpectedly — columns are added, removed, or their types change. It’s the #1 cause of data pipeline failures.

Method 1: Compare structures with parquet_schema()

-- Get schema for all Parquet files in a directory
SELECT 
    file_path,
    column_name,
    column_type,
    is_nullable
FROM parquet_schema('data/trades/*.parquet')
ORDER BY file_path, ordinal_position;

Method 2: Automated drift detection in pure SQL

Here’s a complete schema drift detection query that compares column sets across multiple files:

WITH file_schemas AS (
    SELECT 
        file_path,
        ARRAY_SORT(ARRAY_AGG(column_name)) AS columns
    FROM parquet_schema('data/trades/*.parquet')
    GROUP BY file_path
),
base_schema AS (
    SELECT columns AS base_columns
    FROM file_schemas
    ORDER BY file_path
    LIMIT 1
)
SELECT 
    fs.file_path,
    fs.columns,
    bs.base_columns,
    -- Columns that were added
    ARRAY_SORT(ARRAY_REMOVE(fs.columns, bs.base_columns)) AS added_columns,
    -- Columns that are missing
    ARRAY_SORT(ARRAY_REMOVE(bs.base_columns, fs.columns)) AS missing_columns
FROM file_schemas fs, base_schema bs
WHERE fs.file_path != (SELECT file_path FROM file_schemas ORDER BY file_path LIMIT 1);

If added_columns or missing_columns is non-empty, schema drift has occurred. Wrap this query in a scheduled task that runs daily and sends alerts via email or Slack when anomalies are detected.

Python automation script

import duckdb
import glob
import json

def detect_schema_drift(data_dir: str) -> dict:
    """Automatically detect schema drift in a Parquet directory"""
    conn = duckdb.connect(":memory:")
    conn.execute("INSTALL parquet; LOAD parquet;")
    
    files = sorted(glob.glob(f"{data_dir}/*.parquet"))
    if len(files) < 2:
        return {"status": "ok", "message": "Insufficient files"}
    
    # Get column lists for each file
    schemas = {}
    for f in files[:20]:  # Sample first 20 files
        try:
            result = conn.execute(f"""
                SELECT column_name 
                FROM parquet_schema('{f}')
                ORDER BY ordinal_position
            """).fetchall()
            schemas[f] = [row[0] for row in result]
        except Exception as e:
            schemas[f] = f"ERROR: {str(e)}"
    
    # Use the first file as baseline
    base_cols = schemas[files[0]]
    drifts = []
    
    for fname, cols in schemas.items():
        if isinstance(cols, list):
            added = set(cols) - set(base_cols)
            missing = set(base_cols) - set(cols)
            if added or missing:
                drifts.append({
                    "file": fname,
                    "added_columns": sorted(list(added)),
                    "missing_columns": sorted(list(missing)),
                    "total_columns": len(cols),
                    "base_columns": len(base_cols)
                })
    
    return {
        "status": "drift_detected" if drifts else "ok",
        "total_files_scanned": len(schemas),
        "base_schema_columns": base_cols,
        "drifts": drifts
    }

# Usage example
report = detect_schema_drift("/data/sales/")
print(json.dumps(report, indent=2, ensure_ascii=False))

Part 3: Null Rate Monitoring

A sudden spike in null rates is often the first signal that a data source has problems. With parquet_metadata(), you can scan an entire dataset for null anomalies in seconds — without reading any actual data.

def monitor_null_rates(data_dir: str, threshold: float = 5.0) -> dict:
    """Monitor null rates across Parquet files, alert when threshold exceeded"""
    conn = duckdb.connect(":memory:")
    conn.execute("INSTALL parquet; LOAD parquet;")
    
    files = sorted(glob.glob(f"{data_dir}/*.parquet"))
    alerts = []
    
    for f in files[:50]:  # Sample first 50 files
        try:
            result = conn.execute(f"""
                SELECT 
                    column_name,
                    num_nulls,
                    num_values,
                    ROUND(num_nulls * 100.0 / NULLIF(num_values, 0), 2) AS null_pct
                FROM parquet_metadata('{f}')
                WHERE num_nulls > 0
                ORDER BY num_nulls DESC
            """).fetchall()
            
            for col_name, num_nulls, num_values, null_pct in result:
                if null_pct and float(null_pct) > threshold:
                    alerts.append({
                        "file": f,
                        "column": col_name,
                        "null_count": int(num_nulls),
                        "null_percentage": float(null_pct)
                    })
        except Exception as e:
            alerts.append({"file": f, "error": str(e)})
    
    return {
        "total_files": len(files),
        "alert_count": len(alerts),
        "threshold": threshold,
        "alerts": alerts[:20]
    }

# Usage example
null_report = monitor_null_rates("/data/sales/", threshold=5.0)
print(f"Found {null_report['alert_count']} null rate anomaly alerts")

Monetization idea: Package this as an API service. Charge $0.001 per call. A medium-sized client running 100 quality checks daily generates $30/day = $900/month. Add a base service fee, and you’re looking at $2,000–$5,000 per client per month.


Part 4: Compression Ratio and Size Anomaly Detection

Abnormal compression ratios in Parquet files are another early warning signal. Too low suggests uneven data distribution or incorrect encoding; too high may indicate over-compression slowing down queries.

-- Batch-check compression status of all Parquet files
SELECT 
    file_path,
    num_rows,
    compressed_size,
    uncompressed_size,
    ROUND(1.0 - compressed_size * 1.0 / NULLIF(uncompressed_size, 0), 4) AS compression_ratio,
    ROUND(compressed_size * 1.0 / NULLIF(num_rows, 0), 2) AS bytes_per_row
FROM parquet_full_metadata('data/sales/*.parquet')
ORDER BY compression_ratio ASC;

Combine with Python for statistical anomaly detection:

def detect_size_anomalies(data_dir: str) -> dict:
    """Detect compression ratio and size anomalies in Parquet files"""
    conn = duckdb.connect(":memory:")
    conn.execute("INSTALL parquet; LOAD parquet;")
    
    result = conn.execute(f"""
        SELECT 
            file_path,
            num_rows,
            compressed_size,
            uncompressed_size,
            ROUND(1.0 - compressed_size * 1.0 / NULLIF(uncompressed_size, 0), 4) AS ratio,
            ROUND(compressed_size * 1.0 / NULLIF(num_rows, 0), 2) AS bpr
        FROM parquet_full_metadata('{data_dir}/*.parquet')
    """).fetchall()
    
    ratios = [r[4] for r in result if r[4] is not None]
    bprs = [r[5] for r in result if r[5] is not None]
    
    if not ratios:
        return {"status": "error", "message": "No valid data"}
    
    avg_ratio = sum(ratios) / len(ratios)
    avg_bpr = sum(bprs) / len(bprs)
    
    std_ratio = (sum((r - avg_ratio)**2 for r in ratios) / len(ratios)) ** 0.5
    std_bpr = (sum((b - avg_bpr)**2 for b in bprs) / len(bprs)) ** 0.5
    
    anomalies = []
    for file_path, num_rows, comp_size, uncomp_size, ratio, bpr in result:
        if ratio is not None and abs(ratio - avg_ratio) > 2 * std_ratio:
            anomalies.append({
                "file": file_path,
                "compression_ratio": ratio,
                "avg_ratio": round(avg_ratio, 4),
                "deviation_sigma": round(abs(ratio - avg_ratio) / std_ratio, 2)
            })
    
    return {
        "total_files": len(result),
        "avg_compression_ratio": round(avg_ratio, 4),
        "avg_bytes_per_row": round(avg_bpr, 2),
        "anomaly_count": len(anomalies),
        "anomalies": anomalies
    }

Part 5: Complete Data Quality Dashboard

Integrate everything above into a comprehensive monitoring class that outputs a daily JSON report:

import duckdb
import glob
import json
from datetime import datetime

class DuckDBDataQualityMonitor:
    """Metadata-driven data quality monitor using DuckDB"""
    
    def __init__(self, data_dir: str):
        self.conn = duckdb.connect(":memory:")
        self.conn.execute("INSTALL parquet; LOAD parquet;")
        self.data_dir = data_dir
    
    def scan_all(self) -> dict:
        """Run complete data quality scan"""
        files = sorted(glob.glob(f"{self.data_dir}/*.parquet"))
        
        report = {
            "scan_time": datetime.now().isoformat(),
            "data_directory": self.data_dir,
            "total_files": len(files),
            "schema_drift": self._check_schema_drift(files),
            "null_analysis": self._check_null_rates(files),
            "size_analysis": self._check_size_anomalies(files),
            "summary": {}
        }
        
        total_issues = (
            len(report["schema_drift"].get("drifts", [])) +
            report["null_analysis"].get("alert_count", 0) +
            report["size_analysis"].get("anomaly_count", 0)
        )
        
        if total_issues == 0:
            report["summary"]["status"] = "healthy"
            report["summary"]["message"] = "All data files are healthy"
        elif total_issues <= 3:
            report["summary"]["status"] = "warning"
            report["summary"]["message"] = f"{total_issues} issue(s) found, review recommended"
        else:
            report["summary"]["status"] = "critical"
            report["summary"]["message"] = f"{total_issues} issue(s) found, immediate action required"
        
        return report
    
    def _check_schema_drift(self, files: list) -> dict:
        if len(files) < 2:
            return {"status": "ok", "drifts": []}
        
        schemas = {}
        for f in files[:20]:
            try:
                result = self.conn.execute(f"""
                    SELECT column_name FROM parquet_schema('{f}')
                    ORDER BY ordinal_position
                """).fetchall()
                schemas[f] = [row[0] for row in result]
            except:
                schemas[f] = None
        
        base_cols = schemas.get(files[0], [])
        drifts = []
        for fname, cols in schemas.items():
            if cols and isinstance(cols, list):
                added = set(cols) - set(base_cols)
                missing = set(base_cols) - set(cols)
                if added or missing:
                    drifts.append({
                        "file": fname,
                        "added": sorted(list(added)),
                        "missing": sorted(list(missing))
                    })
        
        return {"status": "drift_detected" if drifts else "ok", "drifts": drifts}
    
    def _check_null_rates(self, files: list, threshold: float = 5.0) -> dict:
        alerts = []
        for f in files[:50]:
            try:
                result = self.conn.execute(f"""
                    SELECT column_name, num_nulls, num_values,
                           ROUND(num_nulls * 100.0 / NULLIF(num_values, 0), 2) AS pct
                    FROM parquet_metadata('{f}')
                    WHERE num_nulls > 0
                """).fetchall()
                for col, nulls, vals, pct in result:
                    if pct and float(pct) > threshold:
                        alerts.append({"file": f, "column": col, "null_pct": float(pct)})
            except:
                pass
        
        return {"alert_count": len(alerts), "alerts": alerts[:20]}
    
    def _check_size_anomalies(self, files: list) -> dict:
        result = self.conn.execute(f"""
            SELECT file_path, num_rows, compressed_size, uncompressed_size,
                   ROUND(1.0 - compressed_size * 1.0 / NULLIF(uncompressed_size, 0), 4) AS ratio,
                   ROUND(compressed_size * 1.0 / NULLIF(num_rows, 0), 2) AS bpr
            FROM parquet_full_metadata('{self.data_dir}/*.parquet')
        """).fetchall()
        
        ratios = [r[4] for r in result if r[4] is not None]
        if not ratios:
            return {"anomaly_count": 0}
        
        avg = sum(ratios) / len(ratios)
        std = (sum((r - avg)**2 for r in ratios) / len(ratios)) ** 0.5
        
        anomalies = []
        for fp, _, _, _, ratio, bpr in result:
            if ratio and abs(ratio - avg) > 2 * std:
                anomalies.append({"file": fp, "ratio": ratio, "avg_ratio": round(avg, 4)})
        
        return {"anomaly_count": len(anomalies), "anomalies": anomalies}


# Usage example
monitor = DuckDBDataQualityMonitor("/data/sales/")
report = monitor.scan_all()
print(json.dumps(report, indent=2, ensure_ascii=False))

# Send alerts based on severity
if report["summary"]["status"] == "critical":
    send_alert("Critical data quality issues detected!")
elif report["summary"]["status"] == "warning":
    send_notification("Minor data quality issues found.")
else:
    log_status("Data quality scan complete — all healthy.")

Part 6: Monetization Strategy

The beauty of this approach is near-zero marginal cost. Once built, monitoring 10 files costs essentially the same as monitoring 10,000 — because parquet_metadata() queries only read file headers, not data.

Productization roadmap:

  1. MVP: Wrap the Python script as an API, deploy on a cloud server. Charge $500–$2,000/month per client for basic monitoring.

  2. SaaS: Add a web dashboard where clients can view real-time quality reports, set custom thresholds, and configure alert rules. Price at $2,000–$5,000/month.

  3. Enterprise: Integrate with existing data pipelines (Airflow, dbt, Fivetran), triggering automatic quality checks after every data write and generating compliance reports. Price at $10,000+/month.

Why will clients pay? Because the business cost of undetected data quality issues far exceeds your monitoring fee. An e-commerce client losing hundreds of thousands in sales due to a schema-drift-induced report error would happily pay a few thousand per month for prevention.

Competitive moat: Traditional solutions require Spark clusters + Great Expectations + an ops team — costing hundreds of thousands. The DuckDB approach has zero dependencies, responds in milliseconds, and uses fewer than 200 lines of code. That’s your moat.


Part 7: Comparison with Traditional Solutions

ApproachDuckDB Metadata QuerySpark + Great ExpectationsCustom Python Script
DeploymentZero deps, pip install duckdbRequires Spark clusterHeavy custom code
Detection SpeedMilliseconds (metadata only)Minutes to hours (full scan)Minutes
Code Volume50–100 lines500+ lines200–500 lines
Monthly CostServer hosting onlyCluster: $5,000+Development labor
ScalabilityS3/GCS/local supportedStrongWeak

Conclusion

Building a data quality monitoring system with DuckDB’s metadata functions is a classic case of solving 90% of the problem with 10% of the effort. The key insight: metadata itself is one of the most valuable pieces of information — it contains critical indicators about file structure, data distribution, and compression efficiency, all accessible without reading the actual data.

Master this mindset, and you’ll stand out among data service providers — delivering maximum value with minimum cost.

💡 The complete runnable code and deployment templates are available at duckdblab.org, including Docker setup and alert configuration examples.

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

Built with Hugo
Theme Stack designed by Jimmy

⚠️ This site is an independent community project, not affiliated with, endorsed by, or sponsored by the DuckDB Foundation or official DuckDB project.

"DuckDB" is a registered trademark of the DuckDB Foundation. This site uses the name solely for factual description purposes.

All content is for educational and community promotion purposes only and does not constitute any commercial service.