
Introduction: The “Unified Query Gateway” for the Data Silo Era
Modern enterprise data is typically scattered across multiple systems:
- Transaction Databases: PostgreSQL / MySQL / SQL Server
- Data Warehouses: Snowflake / BigQuery / ClickHouse
- File Storage: CSV / Parquet / Excel / JSON
- External APIs: REST API / GraphQL / WebSocket
- Object Storage: S3 / OSS / MinIO
The traditional approach is to write independent connectors for each data source (Pandas read_sql, requests.get, boto3), then merge them in Python memory. This approach has three major pain points:
- Code Bloat: Each data source requires its own connection, authentication, and transformation logic
- Performance Bottleneck: Massive amounts of data pulled into local memory, consuming enormous RAM
- Maintenance Nightmare: When data sources change, all connection code needs updating
The DuckDB ATTACH architecture pattern offers an elegant solution: “Data stays put, queries move.” All data sources are registered through ATTACH into a unified query engine, and analysts can cross-query all data sources with a single SQL statement.
This article walks you through building a production-grade DuckDB DataHub from scratch, covering architecture design, code implementation, performance tuning, and monetization paths.
1. Core Principles of ATTACH
ATTACH creates an external table reference, not a data copy. When you execute ATTACH 'conn_string' AS alias (TYPE TYPE_NAME), DuckDB:
- Establishes a connection to the data source (or opens a file handle)
- Registers a namespace internally (alias)
- Pushes down subsequent queries against that namespace to the source
This means WHERE conditions, GROUP BY, and LIMIT operations execute at the data source, and only the final result set is transmitted over the network to the DuckDB engine.
Supported ATTACH Types Overview
| Type | Syntax Identifier | Description | Extension Required |
|---|---|---|---|
| PostgreSQL | TYPE POSTGRES | Remote PostgreSQL database | postgres_scanner |
| MySQL | TYPE MYSQL | Remote MySQL/MariaDB | mysql_scanner |
| SQLite | TYPE SQLITE | Local SQLite file | Built-in |
| CSV/TSV | TYPE CSV | Local CSV/TSV files or directories | Built-in |
| Parquet | TYPE PARQUET | Local Parquet files or directories | Built-in |
| Delta Lake | TYPE DELTA | Delta Lake tables | delta |
| Iceberg | TYPE ICEBERG | Apache Iceberg tables | iceberg |
| HTTP/HTTPS | via read_json_auto | Remote JSON/CSV APIs | httpfs |
| S3/OSS | via read_parquet | Object storage files | aws / httpfs |
2. Building a DataHub from Scratch
2.1 Architecture Diagram
┌─────────────────────────────────────────────────────┐
│ DuckDB DataHub │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ATTACH │ │ ATTACH │ │ ATTACH │ │
│ │ PG_DB │ │ CSV_DIR │ │ HTTP_API │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ │ │ │ │
│ ┌────▼─────────────▼─────────────▼────┐ │
│ │ DuckDB Query Engine │ │
│ │ (Pushdown + Projection + Join) │ │
│ └──────────────┬──────────────────────┘ │
│ │ │
│ ┌───────▼───────┐ │
│ │ SQL → DataFrame│ │
│ └───────────────┘ │
└─────────────────────────────────────────────────────┘
2.2 Complete Python Implementation
"""
DuckDB DataHub — Unified Data Access Layer
==========================================
Manages unified querying across PostgreSQL, MySQL, CSV, Parquet, and HTTP APIs.
Installation:
pip install duckdb pandas pyarrow
Usage:
hub = DataHub()
hub.attach_pg("orders_db", "host=localhost dbname=orders user=analyst")
hub.attach_csv("warehouse", "/data/warehouse/")
df = hub.query("SELECT * FROM orders_db.orders LIMIT 10")
hub.close()
"""
import duckdb
import pandas as pd
from typing import Optional, Dict, List, Any
from pathlib import Path
class DataHub:
"""Production-grade DuckDB data access center"""
def __init__(self, memory_limit: str = "50%", threads: int = 0):
"""
Args:
memory_limit: DuckDB memory limit, e.g., "50%", "8GB", "4g"
threads: Parallel thread count, 0 = auto-detect CPU cores
"""
self.con = duckdb.connect(
":memory:",
config={
"memory_limit": memory_limit,
"threads": threads,
"enable_object_cache": True,
"default_order": "DESC",
}
)
self._initialized = False
self._attached_sources: Dict[str, dict] = {}
def init_extensions(self):
"""Initialize all extensions (idempotent operation)"""
if self._initialized:
return
extensions = [
"postgres_scanner",
"mysql_scanner",
"httpfs",
"json",
"sqlite",
"parquet",
]
for ext in extensions:
try:
self.con.execute(f"INSTALL {ext}; LOAD {ext};")
except Exception:
pass # Extension may already be installed or unavailable
self._initialized = True
# ========== Data Source Attachment ==========
def attach_pg(self, name: str, conn_str: str, schema: str = "public") -> None:
"""Attach a PostgreSQL database"""
self.init_extensions()
self.con.execute(f"""
ATTACH '{conn_str}' AS {name} (TYPE POSTGRES, SCHEMA '{schema}');
""")
self._attached_sources[name] = {
"type": "postgres",
"conn_str": conn_str,
"schema": schema,
}
print(f"✅ Attached PostgreSQL: {name}")
def attach_mysql(self, name: str, conn_str: str, schema: str = "public") -> None:
"""Attach a MySQL database"""
self.init_extensions()
self.con.execute(f"""
ATTACH '{conn_str}' AS {name} (TYPE MYSQL, SCHEMA '{schema}');
""")
self._attached_sources[name] = {
"type": "mysql",
"conn_str": conn_str,
"schema": schema,
}
print(f"✅ Attached MySQL: {name}")
def attach_csv_dir(self, name: str, dir_path: str,
pattern: str = "*.csv", header: bool = True) -> None:
"""Attach a CSV directory (auto-scans all matching files)"""
self.con.execute(f"""
ATTACH '{dir_path}' AS {name} (TYPE CSV, HEADER {header},
READ_ALL_COLUMNS TRUE,
FILE_PATTERN '{pattern}');
""")
self._attached_sources[name] = {
"type": "csv_dir",
"path": dir_path,
"pattern": pattern,
}
print(f"✅ Attached CSV directory: {name} ({dir_path})")
def attach_parquet_dir(self, name: str, dir_path: str) -> None:
"""Attach a Parquet directory"""
self.con.execute(f"""
ATTACH '{dir_path}' AS {name} (TYPE PARQUET);
""")
self._attached_sources[name] = {
"type": "parquet_dir",
"path": dir_path,
}
print(f"✅ Attached Parquet directory: {name} ({dir_path})")
def attach_http_json(self, name: str, url: str) -> None:
"""Attach an HTTP JSON API"""
self.init_extensions()
self.con.execute(f"""
CREATE OR REPLACE VIEW {name} AS
SELECT * FROM read_json_auto('{url}');
""")
self._attached_sources[name] = {
"type": "http_json",
"url": url,
}
print(f"✅ Attached HTTP API: {name} ({url})")
def attach_http_csv(self, name: str, url: str) -> None:
"""Attach an HTTP CSV API"""
self.init_extensions()
self.con.execute(f"""
CREATE OR REPLACE VIEW {name} AS
SELECT * FROM read_csv_auto('{url}');
""")
self._attached_sources[name] = {
"type": "http_csv",
"url": url,
}
print(f"✅ Attached HTTP CSV: {name} ({url})")
def attach_local_csv(self, name: str, file_path: str) -> None:
"""Attach a single CSV file"""
self.con.execute(f"""
CREATE OR REPLACE VIEW {name} AS
SELECT * FROM read_csv_auto('{file_path}');
""")
self._attached_sources[name] = {
"type": "local_csv",
"path": file_path,
}
print(f"✅ Attached CSV file: {name} ({file_path})")
# ========== Query Interface ==========
def query(self, sql: str, params: tuple = ()) -> pd.DataFrame:
"""Execute SQL query and return DataFrame"""
result = self.con.execute(sql, params)
if result.description:
df = result.fetchdf()
return df
return pd.DataFrame()
def query_raw(self, sql: str, params: tuple = ()) -> duckdb.DuckDBPyRelation:
"""Execute SQL query and return raw Relation (supports lazy reading)"""
return self.con.execute(sql, params)
def describe(self, table_ref: str) -> pd.DataFrame:
"""View schema information for a table"""
return self.con.execute(f"DESCRIBE {table_ref}").fetchdf()
def table_names(self, schema: str = "main") -> List[str]:
"""List all tables in a given schema"""
result = self.con.execute(f"""
SELECT table_name FROM information_schema.tables
WHERE table_schema = '{schema}'
""").fetchdf()
return result["table_name"].tolist()
# ========== Cache & Optimization ==========
def create_cached_table(self, name: str, sql: str) -> None:
"""Cache query results as a local table (speeds up repeated queries)"""
self.con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}")
print(f"✅ Created cached table: {name}")
def drop_source(self, name: str) -> None:
"""Detach a data source"""
self.con.execute(f"DETACH {name}")
self._attached_sources.pop(name, None)
print(f"🗑️ Detached: {name}")
# ========== Diagnostic Tools ==========
def explain(self, sql: str) -> pd.DataFrame:
"""Get SQL execution plan"""
return self.con.execute(f"EXPLAIN {sql}").fetchdf()
def explain_analyze(self, sql: str) -> pd.DataFrame:
"""Get actual execution plan with timing"""
return self.con.execute(f"EXPLAIN ANALYZE {sql}").fetchdf()
def show_attached(self) -> pd.DataFrame:
"""Show all attached data sources"""
return self.con.execute("SHOW DATABASES").fetchdf()
# ========== Lifecycle ==========
def close(self) -> None:
"""Close connections and release resources"""
self.con.close()
print("🔒 DataHub closed")
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
2.3 Practical Usage Example
# Use context manager for automatic connection cleanup
with DataHub(memory_limit="4GB") as hub:
# Attach data sources
hub.attach_pg("pg_orders", "host=192.168.1.10 dbname=ecommerce user=analyst password=***")
hub.attach_csv_dir("csv_reports", "/data/reports/")
hub.attach_http_json("exchange_rates", "https://api.exchangerate.host/latest?base=CNY")
# Cross-source query
df = hub.query("""
WITH daily_sales AS (
SELECT
o.order_date,
c.category_name,
SUM(o.amount) AS total_amount
FROM pg_orders.orders o
JOIN pg_orders.categories c ON o.category_id = c.category_id
WHERE o.order_date >= '2026-01-01'
GROUP BY o.order_date, c.category_name
)
SELECT
order_date,
category_name,
total_amount,
total_amount * rates.usd_rate AS total_usd
FROM daily_sales
CROSS JOIN exchange_rates rates
ORDER BY total_usd DESC
LIMIT 20
""")
print(df.head())
# Check execution plan to confirm predicate pushdown
plan = hub.explain_analyze("""
SELECT category_name, SUM(amount)
FROM pg_orders.orders
WHERE order_date = '2026-06-01'
GROUP BY category_name
""")
print(plan)
3. Performance Tuning: Making ATTACH Fly
3.1 Predicate Pushdown
DuckDB automatically pushes WHERE conditions and SELECT columns to the data source, but you need to actively write queries that trigger pushdown:
-- ✅ GOOD: Pushes down to PostgreSQL
SELECT category, SUM(amount)
FROM pg_db.orders
WHERE order_date = '2026-06-01'
GROUP BY category;
-- ❌ BAD: Pulls entire table then filters
SELECT * FROM pg_db.orders;
Verify with EXPLAIN ANALYZE:
EXPLAIN ANALYZE
SELECT category, SUM(amount)
FROM pg_db.orders
WHERE order_date = '2026-06-01';
Look for the Remote Scan node in the output and confirm the WHERE condition appears on the remote side.
3.2 Cache Hot Data
For frequently accessed aggregation results, create local cached tables:
-- First time: create cache
CREATE TABLE daily_category_sales AS
SELECT
DATE_TRUNC('day', order_date) AS sale_date,
category_name,
SUM(amount) AS total_amount,
COUNT(*) AS order_count
FROM pg_db.orders
WHERE order_date >= '2026-01-01'
GROUP BY 1, 2;
-- Later: query local table (millisecond response)
SELECT * FROM daily_category_sales
WHERE sale_date = '2026-06-24';
3.3 Connection Pooling
class DataHubPool:
"""Data source connection pool"""
def __init__(self, max_connections: int = 5):
self.hubs = []
self.max = max_connections
def get_hub(self) -> DataHub:
if self.hubs:
return self.hubs.pop()
return DataHub()
def release(self, hub: DataHub):
if len(self.hubs) < self.max:
self.hubs.append(hub)
else:
hub.close()
3.4 Memory Management
hub = DataHub(memory_limit="8GB", threads=8)
# Key configuration options
config = {
"memory_limit": "8GB", # Maximum memory
"threads": 8, # Parallelism
"max_threads": 16, # Maximum thread count
"temp_directory": "/tmp/duckdb", # Temp directory (for large queries)
"enable_object_cache": True, # Enable object cache
"allow_unsigned_extensions": True,
}
4. Comparison with Traditional Approaches
| Dimension | Python Merge Approach | DuckDB ATTACH Approach |
|---|---|---|
| Code Volume | 50-200 lines (per data source) | 5-10 lines (unified ATTACH + SQL) |
| Memory Usage | All data loaded to RAM (GB scale) | Streaming + pushdown (MB scale) |
| Query Speed | Pandas merge/join (slow) | Columnar vectorized execution (10-100x faster) |
| Maintainability | Each source needs connection code changes | New source = one ATTACH line |
| Reusability | Must rewrite connection logic each analysis | DataHub instance reusable globally |
| Learning Curve | Need to master multiple library APIs | Standard SQL only |
| Debugging Difficulty | Data in memory, hard to trace source | Each source has clear namespace |
5. Production Scenario: Unified E-Commerce Analytics Platform
Building a self-service analytics platform for business teams:
# 1. Initialize Hub
hub = DataHub(memory_limit="16GB")
# 2. Attach all data sources
hub.attach_pg("orders", "host=db-host dbname=ecommerce user=analyst")
hub.attach_pg("crm", "host=crm-host dbname=customer_db user=analyst")
hub.attach_csv_dir("marketing", "/data/marketing/")
hub.attach_parquet_dir("warehouse", "/data/warehouse/parquet/")
hub.attach_http_json("competitors", "https://api.competitor.com/prices")
# 3. Create caching layer (daily refresh)
hub.create_cached_table("monthly_summary", """
SELECT
DATE_TRUNC('month', o.order_date) AS month,
c.category_name,
COUNT(*) AS orders,
SUM(o.amount) AS revenue,
AVG(o.amount) AS avg_order_value
FROM orders.orders o
JOIN orders.categories c ON o.category_id = c.category_id
GROUP BY 1, 2
""")
# 4. Business users only need to run SQL
df = hub.query("""
SELECT month, category_name, revenue,
revenue / LAG(revenue) OVER (ORDER BY month) - 1 AS mom_growth
FROM monthly_summary
WHERE category_name = 'Electronics'
ORDER BY month DESC
""")
6. Monetization Strategies
This architecture pattern has significant commercial value:
1. Data Consulting Services (One-time $500-$1,500)
Customer pain point: Data scattered across 5-10 systems, unable to do cross-analysis.
Deliverables:
- DuckDB DataHub configuration script
- 3-5 core analysis SQL queries
- Visualization report template
2. Automated Reporting Service (Monthly $150-$700)
Build daily/weekly automated cross-source reports for clients:
- Data source ATTACH configuration
- SQL queries + caching strategy
- Scheduled execution + email/Telegram push
3. Lightweight BI Product (SaaS $15-$40/month)
Build a self-service query platform based on DataHub:
- Frontend: Streamlit / Gradio / Shiny
- Backend: DuckDB DataHub + FastAPI
- Data sources: Client-configured ATTACH connections
4. Enterprise Training (One-time $700-$3,000)
Teach enterprise data teams to build their own DataHub:
- ATTACH principles and best practices
- Performance tuning (pushdown, caching, memory)
- Production deployment (Docker + Cron + monitoring)
Core principle: Enterprise data won’t disappear just because you don’t want to move it — but you can make queries go to the data. Master this architecture pattern, and you hold the “master key” to data integration.
📖 The complete DataHub code template from this article (including Docker deployment guide, detailed configurations for PostgreSQL/MySQL/HTTP/CSV/Parquet five data sources) is published at duckdblab.org, with production tuning parameters and common pitfalls guide.
💡 More DuckDB advanced techniques → duckdblab.org
Olap Studio · Focused on DuckDB Practical Skills · 2026-06-24