DuckDB for Real-Time Analytics: Building Streaming Pipelines at Scale
TL;DR: DuckDB can process streaming data in real-time with millisecond latency. This guide shows you how to build production-grade streaming pipelines using DuckDB, Kafka, and modern data engineering tools.
Why DuckDB for Streaming?
Traditional streaming analytics requires complex infrastructure:
┌─────────────────────────────────────────────────┐
│ Traditional Streaming │
│ │
│ [Kafka] ──> [Flink/Spark] ──> [Warehouse] │
│ │
│ Problems: │
│ • Complex infrastructure │
│ • High operational cost │
│ • Long learning curve │
│ • Separate systems for batch and stream │
└─────────────────────────────────────────────────┘
DuckDB simplifies this:
┌─────────────────────────────────────────────────┐
│ DuckDB Streaming │
│ │
│ [Kafka] ──> [DuckDB] ──> [Dashboard] │
│ │
│ Benefits: │
│ • Single engine for batch + stream │
│ • Minimal infrastructure │
│ • SQL-based (familiar) │
│ • Real-time + historical in one place │
└─────────────────────────────────────────────────┘
Real-Time Data Ingestion
Kafka Integration
from duckdb import connect
from confluent_kafka import Consumer, KafkaException
import json
class KafkaDuckDBStreamer:
def __init__(self, bootstrap_servers, topic, db_path):
self.consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'group.id': 'duckdb-streaming',
'auto.offset.reset': 'earliest'
})
self.consumer.subscribe([topic])
self.con = connect(db_path)
def process_message(self, message):
"""Process a single Kafka message."""
data = json.loads(message.value())
# Insert into DuckDB
self.con.execute("""
INSERT INTO events (event_id, user_id, event_type, timestamp, data)
VALUES (?, ?, ?, ?, ?)
""", [
data['event_id'],
data['user_id'],
data['event_type'],
data['timestamp'],
json.dumps(data['data'])
])
def stream(self):
"""Start streaming from Kafka."""
while True:
message = self.consumer.poll(1.0)
if message is None:
continue
if message.error():
raise KafkaException(message.error())
self.process_message(message)
# Commit offset
self.consumer.commit(asynchronous=False)
# Usage
streamer = KafkaDuckDBStreamer(
bootstrap_servers='kafka:9092',
topic='user-events',
db_path='analytics.duckdb'
)
streamer.stream()
WebSocket Integration
from duckdb import connect
import asyncio
import websockets
import json
async def websocket_streamer(db_path):
"""Stream data from WebSocket to DuckDB."""
con = connect(db_path)
async with websockets.connect('wss://stream.example.com/events') as ws:
async for message in ws:
data = json.loads(message)
# Upsert into DuckDB
con.execute("""
UPSERT INTO real_time_metrics (
metric_name, value, timestamp
) VALUES (?, ?, ?)
""", [
data['metric'],
data['value'],
data['timestamp']
])
# Run the streamer
asyncio.run(websocket_streamer('analytics.duckdb'))
HTTP API Integration
from duckdb import connect
import requests
import time
class APIDataStreamer:
def __init__(self, api_url, db_path, poll_interval=5):
self.api_url = api_url
self.con = connect(db_path)
self.poll_interval = poll_interval
def fetch_and_store(self):
"""Fetch data from API and store in DuckDB."""
response = requests.get(self.api_url)
data = response.json()
# Process and insert
for item in data['results']:
self.con.execute("""
INSERT INTO api_data (id, name, value, fetched_at)
VALUES (?, ?, ?, CURRENT_TIMESTAMP)
""", [
item['id'],
item['name'],
item['value']
])
def stream(self):
"""Continuous polling loop."""
while True:
self.fetch_and_store()
time.sleep(self.poll_interval)
# Usage
streamer = APIDataStreamer(
api_url='https://api.example.com/data',
db_path='analytics.duckdb'
)
streamer.stream()
Real-Time Analytics
Streaming Aggregations
-- Real-time dashboard query
SELECT
DATE_TRUNC('second', timestamp) as second,
COUNT(*) as events_per_second,
AVG(value) as avg_metric,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY value) as p95
FROM streaming_data
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
GROUP BY 1
ORDER BY 1 DESC;
Anomaly Detection
-- Detect anomalies in real-time
WITH rolling_stats AS (
SELECT
timestamp,
value,
AVG(value) OVER (
ORDER BY timestamp
ROWS BETWEEN 60 PRECEDING AND CURRENT ROW
) as rolling_avg,
STDDEV(value) OVER (
ORDER BY timestamp
ROWS BETWEEN 60 PRECEDING AND CURRENT ROW
) as rolling_stddev
FROM streaming_data
)
SELECT *
FROM rolling_stats
WHERE ABS(value - rolling_avg) > 3 * rolling_stddev
AND timestamp >= NOW() - INTERVAL '1 hour';
Real-Time Alerts
-- Create alert table
CREATE TABLE alerts AS
SELECT
event_type,
COUNT(*) as event_count,
MAX(timestamp) as last_event
FROM streaming_data
WHERE timestamp >= NOW() - INTERVAL '1 minute'
GROUP BY event_type
HAVING COUNT(*) > 100;
-- Query alerts in real-time
SELECT * FROM alerts
ORDER BY event_count DESC;
Performance Optimization
Streaming Configuration
-- Optimize for streaming workloads
SET memory_limit = '4GB';
SET threads = 4;
SET enable_streaming = TRUE;
SET streaming_batch_size = 1000;
-- Enable auto-vacuum for streaming tables
PRAGMA enable_wal;
PRAGMA checkpoint_threshold = 100;
Indexing for Real-Time
-- Create indexes for streaming data
CREATE INDEX idx_streaming_timestamp ON streaming_data(timestamp);
CREATE INDEX idx_streaming_event_type ON streaming_data(event_type);
CREATE INDEX idx_streaming_user ON streaming_data(user_id);
-- Partial indexes for recent data
CREATE INDEX idx_recent_alerts ON alerts(event_type)
WHERE alert_level = 'critical';
Partitioning Strategy
-- Partition by time for efficient streaming
CREATE TABLE streaming_partitioned (
id VARCHAR,
user_id VARCHAR,
event_type VARCHAR,
timestamp TIMESTAMP,
data JSON
) PARTITION BY (timestamp);
-- Query recent partitions only
SELECT * FROM streaming_partitioned
WHERE timestamp >= NOW() - INTERVAL '1 hour'
AND timestamp < NOW();
Monitoring Streaming Pipelines
Pipeline Health Check
def check_streaming_health(db_path):
"""Monitor streaming pipeline health."""
con = connect(db_path)
# Check data freshness
latest_event = con.execute("""
SELECT MAX(timestamp) as latest FROM streaming_data
""").fetchone()[0]
time_diff = (datetime.now() - latest_event).total_seconds()
if time_diff > 60:
return {"status": "degraded", "lag_seconds": time_diff}
else:
return {"status": "healthy", "lag_seconds": time_diff}
# Monitor every 30 seconds
while True:
health = check_streaming_health('analytics.duckdb')
print(f"Streaming Health: {health}")
time.sleep(30)
Performance Metrics
-- Track streaming performance
SELECT
DATE_TRUNC('minute', timestamp) as minute,
COUNT(*) as events_processed,
AVG(EXTRACT(EPOCH FROM (timestamp - lag(timestamp) OVER (ORDER BY timestamp)))) as avg_latency
FROM streaming_data
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY 1
ORDER BY 1 DESC;
Production Deployment
Docker Setup
FROM python:3.11-slim
RUN pip install duckdb confluent-kafka websockets
WORKDIR /app
COPY streamer.py .
CMD ["python", "streamer.py"]
Kubernetes Deployment
apiVersion: apps/v1
kind: Deployment
metadata:
name: duckdb-streamer
spec:
replicas: 1
template:
spec:
containers:
- name: streamer
image: duckdb-streamer:latest
env:
- name: KAFKA_BOOTSTRAP
value: "kafka:9092"
- name: DB_PATH
value: "/data/analytics.duckdb"
volumeMounts:
- name: data
mountPath: /data
volumes:
- name: data
persistentVolumeClaim:
claimName: duckdb-pvc
Use Cases
1. Real-Time Dashboard
-- Dashboard query for live metrics
SELECT
DATE_TRUNC('second', timestamp) as second,
COUNT(*) as requests,
AVG(response_time) as avg_response_time,
PERCENTILE_CONT(0.99) WITHIN GROUP (ORDER BY response_time) as p99
FROM api_logs
WHERE timestamp >= NOW() - INTERVAL '5 minutes'
GROUP BY 1
ORDER BY 1 DESC;
2. Fraud Detection
-- Real-time fraud detection
WITH user_activity AS (
SELECT
user_id,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
COUNT(DISTINCT merchant_id) as unique_merchants
FROM transactions
WHERE timestamp >= NOW() - INTERVAL '1 hour'
GROUP BY user_id
)
SELECT *
FROM user_activity
WHERE transaction_count > 10
OR total_amount > 10000
OR unique_merchants > 5;
3. IoT Monitoring
-- IoT sensor monitoring
SELECT
device_id,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
COUNT(*) as readings
FROM iot_sensors
WHERE timestamp >= NOW() - INTERVAL '10 minutes'
GROUP BY device_id
HAVING MAX(temperature) > 80; -- Alert on high temp
Conclusion
DuckDB enables real-time streaming analytics with:
- Minimal infrastructure — single engine for batch and stream
- Low latency — millisecond-level processing
- SQL-based — familiar interface for analysts
- Cost-effective — no expensive streaming platforms needed
For most organizations, DuckDB provides the simplest path to real-time analytics without the complexity of traditional streaming platforms.
Streaming performance depends on hardware and data volume. Always benchmark with your specific workload.