DuckDB for Real-Time Analytics: Building Streaming Pipelines at Scale

DuckDB for Real-Time Analytics: Building Streaming Pipelines at Scale

TL;DR: DuckDB can process streaming data in real-time with millisecond latency. This guide shows you how to build production-grade streaming pipelines using DuckDB, Kafka, and modern data engineering tools.


Why DuckDB for Streaming?

Traditional streaming analytics requires complex infrastructure:

┌─────────────────────────────────────────────────┐
│              Traditional Streaming                │
│                                                  │
│  [Kafka] ──> [Flink/Spark] ──> [Warehouse]      │
│                                                  │
│  Problems:                                      │
│  • Complex infrastructure                        │
│  • High operational cost                         │
│  • Long learning curve                           │
│  • Separate systems for batch and stream         │
└─────────────────────────────────────────────────┘

DuckDB simplifies this:

┌─────────────────────────────────────────────────┐
│              DuckDB Streaming                     │
│                                                  │
│  [Kafka] ──> [DuckDB] ──> [Dashboard]           │
│                                                  │
│  Benefits:                                      │
│  • Single engine for batch + stream              │
│  • Minimal infrastructure                        │
│  • SQL-based (familiar)                          │
│  • Real-time + historical in one place           │
└─────────────────────────────────────────────────┘

Real-Time Data Ingestion

Kafka Integration

from duckdb import connect
from confluent_kafka import Consumer, KafkaException
import json

class KafkaDuckDBStreamer:
    def __init__(self, bootstrap_servers, topic, db_path):
        self.consumer = Consumer({
            'bootstrap.servers': bootstrap_servers,
            'group.id': 'duckdb-streaming',
            'auto.offset.reset': 'earliest'
        })
        self.consumer.subscribe([topic])
        self.con = connect(db_path)
    
    def process_message(self, message):
        """Process a single Kafka message."""
        data = json.loads(message.value())
        
        # Insert into DuckDB
        self.con.execute("""
            INSERT INTO events (event_id, user_id, event_type, timestamp, data)
            VALUES (?, ?, ?, ?, ?)
        """, [
            data['event_id'],
            data['user_id'],
            data['event_type'],
            data['timestamp'],
            json.dumps(data['data'])
        ])
    
    def stream(self):
        """Start streaming from Kafka."""
        while True:
            message = self.consumer.poll(1.0)
            if message is None:
                continue
            if message.error():
                raise KafkaException(message.error())
            
            self.process_message(message)
            
            # Commit offset
            self.consumer.commit(asynchronous=False)

# Usage
streamer = KafkaDuckDBStreamer(
    bootstrap_servers='kafka:9092',
    topic='user-events',
    db_path='analytics.duckdb'
)
streamer.stream()

WebSocket Integration

from duckdb import connect
import asyncio
import websockets
import json

async def websocket_streamer(db_path):
    """Stream data from WebSocket to DuckDB."""
    con = connect(db_path)
    
    async with websockets.connect('wss://stream.example.com/events') as ws:
        async for message in ws:
            data = json.loads(message)
            
            # Upsert into DuckDB
            con.execute("""
                UPSERT INTO real_time_metrics (
                    metric_name, value, timestamp
                ) VALUES (?, ?, ?)
            """, [
                data['metric'],
                data['value'],
                data['timestamp']
            ])

# Run the streamer
asyncio.run(websocket_streamer('analytics.duckdb'))

HTTP API Integration

from duckdb import connect
import requests
import time

class APIDataStreamer:
    def __init__(self, api_url, db_path, poll_interval=5):
        self.api_url = api_url
        self.con = connect(db_path)
        self.poll_interval = poll_interval
    
    def fetch_and_store(self):
        """Fetch data from API and store in DuckDB."""
        response = requests.get(self.api_url)
        data = response.json()
        
        # Process and insert
        for item in data['results']:
            self.con.execute("""
                INSERT INTO api_data (id, name, value, fetched_at)
                VALUES (?, ?, ?, CURRENT_TIMESTAMP)
            """, [
                item['id'],
                item['name'],
                item['value']
            ])
    
    def stream(self):
        """Continuous polling loop."""
        while True:
            self.fetch_and_store()
            time.sleep(self.poll_interval)

# Usage
streamer = APIDataStreamer(
    api_url='https://api.example.com/data',
    db_path='analytics.duckdb'
)
streamer.stream()

Real-Time Analytics

Streaming Aggregations

-- Real-time dashboard query
SELECT 
    DATE_TRUNC('second', timestamp) as second,
    COUNT(*) as events_per_second,
    AVG(value) as avg_metric,
    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95
FROM streaming_data
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
GROUP BY 1
ORDER BY 1 DESC;

Anomaly Detection

-- Detect anomalies in real-time
WITH rolling_stats AS (
    SELECT 
        timestamp,
        value,
        AVG(value) OVER (
            ORDER BY timestamp 
            ROWS BETWEEN 60 PRECEDING AND CURRENT ROW
        ) as rolling_avg,
        STDDEV(value) OVER (
            ORDER BY timestamp 
            ROWS BETWEEN 60 PRECEDING AND CURRENT ROW
        ) as rolling_stddev
    FROM streaming_data
)
SELECT *
FROM rolling_stats
WHERE ABS(value - rolling_avg) > 3 * rolling_stddev
AND timestamp >= NOW() - INTERVAL '1 hour';

Real-Time Alerts

-- Create alert table
CREATE TABLE alerts AS
SELECT 
    event_type,
    COUNT(*) as event_count,
    MAX(timestamp) as last_event
FROM streaming_data
WHERE timestamp >= NOW() - INTERVAL '1 minute'
GROUP BY event_type
HAVING COUNT(*) > 100;

-- Query alerts in real-time
SELECT * FROM alerts
ORDER BY event_count DESC;

Performance Optimization

Streaming Configuration

-- Optimize for streaming workloads
SET memory_limit = '4GB';
SET threads = 4;
SET enable_streaming = TRUE;
SET streaming_batch_size = 1000;

-- Enable auto-vacuum for streaming tables
PRAGMA enable_wal;
PRAGMA checkpoint_threshold = 100;

Indexing for Real-Time

-- Create indexes for streaming data
CREATE INDEX idx_streaming_timestamp ON streaming_data(timestamp);
CREATE INDEX idx_streaming_event_type ON streaming_data(event_type);
CREATE INDEX idx_streaming_user ON streaming_data(user_id);

-- Partial indexes for recent data
CREATE INDEX idx_recent_alerts ON alerts(event_type)
WHERE alert_level = 'critical';

Partitioning Strategy

-- Partition by time for efficient streaming
CREATE TABLE streaming_partitioned (
    id VARCHAR,
    user_id VARCHAR,
    event_type VARCHAR,
    timestamp TIMESTAMP,
    data JSON
) PARTITION BY (timestamp);

-- Query recent partitions only
SELECT * FROM streaming_partitioned
WHERE timestamp >= NOW() - INTERVAL '1 hour'
AND timestamp < NOW();

Monitoring Streaming Pipelines

Pipeline Health Check

def check_streaming_health(db_path):
    """Monitor streaming pipeline health."""
    con = connect(db_path)
    
    # Check data freshness
    latest_event = con.execute("""
        SELECT MAX(timestamp) as latest FROM streaming_data
    """).fetchone()[0]
    
    time_diff = (datetime.now() - latest_event).total_seconds()
    
    if time_diff > 60:
        return {"status": "degraded", "lag_seconds": time_diff}
    else:
        return {"status": "healthy", "lag_seconds": time_diff}

# Monitor every 30 seconds
while True:
    health = check_streaming_health('analytics.duckdb')
    print(f"Streaming Health: {health}")
    time.sleep(30)

Performance Metrics

-- Track streaming performance
SELECT 
    DATE_TRUNC('minute', timestamp) as minute,
    COUNT(*) as events_processed,
    AVG(EXTRACT(EPOCH FROM (timestamp - lag(timestamp) OVER (ORDER BY timestamp)))) as avg_latency
FROM streaming_data
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY 1
ORDER BY 1 DESC;

Production Deployment

Docker Setup

FROM python:3.11-slim

RUN pip install duckdb confluent-kafka websockets

WORKDIR /app
COPY streamer.py .

CMD ["python", "streamer.py"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
  name: duckdb-streamer
spec:
  replicas: 1
  template:
    spec:
      containers:
      - name: streamer
        image: duckdb-streamer:latest
        env:
        - name: KAFKA_BOOTSTRAP
          value: "kafka:9092"
        - name: DB_PATH
          value: "/data/analytics.duckdb"
        volumeMounts:
        - name: data
          mountPath: /data
      volumes:
      - name: data
        persistentVolumeClaim:
          claimName: duckdb-pvc

Use Cases

1. Real-Time Dashboard

-- Dashboard query for live metrics
SELECT 
    DATE_TRUNC('second', timestamp) as second,
    COUNT(*) as requests,
    AVG(response_time) as avg_response_time,
    PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY response_time) as p99
FROM api_logs
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
GROUP BY 1
ORDER BY 1 DESC;

2. Fraud Detection

-- Real-time fraud detection
WITH user_activity AS (
    SELECT 
        user_id,
        COUNT(*) as transaction_count,
        SUM(amount) as total_amount,
        COUNT(DISTINCT merchant_id) as unique_merchants
    FROM transactions
    WHERE timestamp >= NOW() - INTERVAL '1 hour'
    GROUP BY user_id
)
SELECT *
FROM user_activity
WHERE transaction_count > 10 
   OR total_amount > 10000
   OR unique_merchants > 5;

3. IoT Monitoring

-- IoT sensor monitoring
SELECT 
    device_id,
    AVG(temperature) as avg_temp,
    MAX(temperature) as max_temp,
    MIN(temperature) as min_temp,
    COUNT(*) as readings
FROM iot_sensors
WHERE timestamp >= NOW() - INTERVAL '10 minutes'
GROUP BY device_id
HAVING MAX(temperature) > 80;  -- Alert on high temp

Conclusion

DuckDB enables real-time streaming analytics with:

  1. Minimal infrastructure — single engine for batch and stream
  2. Low latency — millisecond-level processing
  3. SQL-based — familiar interface for analysts
  4. Cost-effective — no expensive streaming platforms needed

For most organizations, DuckDB provides the simplest path to real-time analytics without the complexity of traditional streaming platforms.


Streaming performance depends on hardware and data volume. Always benchmark with your specific workload.

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

Built with Hugo
Theme Stack designed by Jimmy

⚠️ This site is an independent community project, not affiliated with, endorsed by, or sponsored by the DuckDB Foundation or official DuckDB project.

"DuckDB" is a registered trademark of the DuckDB Foundation. This site uses the name solely for factual description purposes.

All content is for educational and community promotion purposes only and does not constitute any commercial service.