Featured image of post DuckDB ATTACH Architecture Pattern: Building a Unified Data Access Layer

DuckDB ATTACH Architecture Pattern: Building a Unified Data Access Layer

Build a production-grade DuckDB DataHub using ATTACH to unify PostgreSQL, MySQL, CSV, Parquet, and HTTP APIs. Includes complete Python wrapper class and performance tuning guide.

DuckDB ATTACH Architecture Pattern

Introduction: The “Unified Query Gateway” for the Data Silo Era

Modern enterprise data is typically scattered across multiple systems:

  • Transaction Databases: PostgreSQL / MySQL / SQL Server
  • Data Warehouses: Snowflake / BigQuery / ClickHouse
  • File Storage: CSV / Parquet / Excel / JSON
  • External APIs: REST API / GraphQL / WebSocket
  • Object Storage: S3 / OSS / MinIO

The traditional approach is to write independent connectors for each data source (Pandas read_sql, requests.get, boto3), then merge them in Python memory. This approach has three major pain points:

  1. Code Bloat: Each data source requires its own connection, authentication, and transformation logic
  2. Performance Bottleneck: Massive amounts of data pulled into local memory, consuming enormous RAM
  3. Maintenance Nightmare: When data sources change, all connection code needs updating

The DuckDB ATTACH architecture pattern offers an elegant solution: “Data stays put, queries move.” All data sources are registered through ATTACH into a unified query engine, and analysts can cross-query all data sources with a single SQL statement.

This article walks you through building a production-grade DuckDB DataHub from scratch, covering architecture design, code implementation, performance tuning, and monetization paths.


1. Core Principles of ATTACH

ATTACH creates an external table reference, not a data copy. When you execute ATTACH 'conn_string' AS alias (TYPE TYPE_NAME), DuckDB:

  1. Establishes a connection to the data source (or opens a file handle)
  2. Registers a namespace internally (alias)
  3. Pushes down subsequent queries against that namespace to the source

This means WHERE conditions, GROUP BY, and LIMIT operations execute at the data source, and only the final result set is transmitted over the network to the DuckDB engine.

Supported ATTACH Types Overview

TypeSyntax IdentifierDescriptionExtension Required
PostgreSQLTYPE POSTGRESRemote PostgreSQL databasepostgres_scanner
MySQLTYPE MYSQLRemote MySQL/MariaDBmysql_scanner
SQLiteTYPE SQLITELocal SQLite fileBuilt-in
CSV/TSVTYPE CSVLocal CSV/TSV files or directoriesBuilt-in
ParquetTYPE PARQUETLocal Parquet files or directoriesBuilt-in
Delta LakeTYPE DELTADelta Lake tablesdelta
IcebergTYPE ICEBERGApache Iceberg tablesiceberg
HTTP/HTTPSvia read_json_autoRemote JSON/CSV APIshttpfs
S3/OSSvia read_parquetObject storage filesaws / httpfs

2. Building a DataHub from Scratch

2.1 Architecture Diagram

┌─────────────────────────────────────────────────────┐
│                  DuckDB DataHub                      │
│                                                      │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐           │
│  │ ATTACH   │  │ ATTACH   │  │ ATTACH   │           │
│  │ PG_DB    │  │ CSV_DIR  │  │ HTTP_API │           │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘           │
│       │             │             │                  │
│  ┌────▼─────────────▼─────────────▼────┐            │
│  │       DuckDB Query Engine            │            │
│  │   (Pushdown + Projection + Join)     │            │
│  └──────────────┬──────────────────────┘            │
│                 │                                    │
│         ┌───────▼───────┐                           │
│         │  SQL → DataFrame│                          │
│         └───────────────┘                           │
└─────────────────────────────────────────────────────┘

2.2 Complete Python Implementation

"""
DuckDB DataHub — Unified Data Access Layer
==========================================
Manages unified querying across PostgreSQL, MySQL, CSV, Parquet, and HTTP APIs.

Installation:
    pip install duckdb pandas pyarrow

Usage:
    hub = DataHub()
    hub.attach_pg("orders_db", "host=localhost dbname=orders user=analyst")
    hub.attach_csv("warehouse", "/data/warehouse/")
    df = hub.query("SELECT * FROM orders_db.orders LIMIT 10")
    hub.close()
"""

import duckdb
import pandas as pd
from typing import Optional, Dict, List, Any
from pathlib import Path


class DataHub:
    """Production-grade DuckDB data access center"""
    
    def __init__(self, memory_limit: str = "50%", threads: int = 0):
        """
        Args:
            memory_limit: DuckDB memory limit, e.g., "50%", "8GB", "4g"
            threads: Parallel thread count, 0 = auto-detect CPU cores
        """
        self.con = duckdb.connect(
            ":memory:",
            config={
                "memory_limit": memory_limit,
                "threads": threads,
                "enable_object_cache": True,
                "default_order": "DESC",
            }
        )
        self._initialized = False
        self._attached_sources: Dict[str, dict] = {}
    
    def init_extensions(self):
        """Initialize all extensions (idempotent operation)"""
        if self._initialized:
            return
        
        extensions = [
            "postgres_scanner",
            "mysql_scanner",
            "httpfs",
            "json",
            "sqlite",
            "parquet",
        ]
        
        for ext in extensions:
            try:
                self.con.execute(f"INSTALL {ext}; LOAD {ext};")
            except Exception:
                pass  # Extension may already be installed or unavailable
        
        self._initialized = True
    
    # ========== Data Source Attachment ==========
    
    def attach_pg(self, name: str, conn_str: str, schema: str = "public") -> None:
        """Attach a PostgreSQL database"""
        self.init_extensions()
        self.con.execute(f"""
            ATTACH '{conn_str}' AS {name} (TYPE POSTGRES, SCHEMA '{schema}');
        """)
        self._attached_sources[name] = {
            "type": "postgres",
            "conn_str": conn_str,
            "schema": schema,
        }
        print(f"✅ Attached PostgreSQL: {name}")
    
    def attach_mysql(self, name: str, conn_str: str, schema: str = "public") -> None:
        """Attach a MySQL database"""
        self.init_extensions()
        self.con.execute(f"""
            ATTACH '{conn_str}' AS {name} (TYPE MYSQL, SCHEMA '{schema}');
        """)
        self._attached_sources[name] = {
            "type": "mysql",
            "conn_str": conn_str,
            "schema": schema,
        }
        print(f"✅ Attached MySQL: {name}")
    
    def attach_csv_dir(self, name: str, dir_path: str, 
                       pattern: str = "*.csv", header: bool = True) -> None:
        """Attach a CSV directory (auto-scans all matching files)"""
        self.con.execute(f"""
            ATTACH '{dir_path}' AS {name} (TYPE CSV, HEADER {header}, 
                                            READ_ALL_COLUMNS TRUE, 
                                            FILE_PATTERN '{pattern}');
        """)
        self._attached_sources[name] = {
            "type": "csv_dir",
            "path": dir_path,
            "pattern": pattern,
        }
        print(f"✅ Attached CSV directory: {name} ({dir_path})")
    
    def attach_parquet_dir(self, name: str, dir_path: str) -> None:
        """Attach a Parquet directory"""
        self.con.execute(f"""
            ATTACH '{dir_path}' AS {name} (TYPE PARQUET);
        """)
        self._attached_sources[name] = {
            "type": "parquet_dir",
            "path": dir_path,
        }
        print(f"✅ Attached Parquet directory: {name} ({dir_path})")
    
    def attach_http_json(self, name: str, url: str) -> None:
        """Attach an HTTP JSON API"""
        self.init_extensions()
        self.con.execute(f"""
            CREATE OR REPLACE VIEW {name} AS 
            SELECT * FROM read_json_auto('{url}');
        """)
        self._attached_sources[name] = {
            "type": "http_json",
            "url": url,
        }
        print(f"✅ Attached HTTP API: {name} ({url})")
    
    def attach_http_csv(self, name: str, url: str) -> None:
        """Attach an HTTP CSV API"""
        self.init_extensions()
        self.con.execute(f"""
            CREATE OR REPLACE VIEW {name} AS 
            SELECT * FROM read_csv_auto('{url}');
        """)
        self._attached_sources[name] = {
            "type": "http_csv",
            "url": url,
        }
        print(f"✅ Attached HTTP CSV: {name} ({url})")
    
    def attach_local_csv(self, name: str, file_path: str) -> None:
        """Attach a single CSV file"""
        self.con.execute(f"""
            CREATE OR REPLACE VIEW {name} AS 
            SELECT * FROM read_csv_auto('{file_path}');
        """)
        self._attached_sources[name] = {
            "type": "local_csv",
            "path": file_path,
        }
        print(f"✅ Attached CSV file: {name} ({file_path})")
    
    # ========== Query Interface ==========
    
    def query(self, sql: str, params: tuple = ()) -> pd.DataFrame:
        """Execute SQL query and return DataFrame"""
        result = self.con.execute(sql, params)
        if result.description:
            df = result.fetchdf()
            return df
        return pd.DataFrame()
    
    def query_raw(self, sql: str, params: tuple = ()) -> duckdb.DuckDBPyRelation:
        """Execute SQL query and return raw Relation (supports lazy reading)"""
        return self.con.execute(sql, params)
    
    def describe(self, table_ref: str) -> pd.DataFrame:
        """View schema information for a table"""
        return self.con.execute(f"DESCRIBE {table_ref}").fetchdf()
    
    def table_names(self, schema: str = "main") -> List[str]:
        """List all tables in a given schema"""
        result = self.con.execute(f"""
            SELECT table_name FROM information_schema.tables 
            WHERE table_schema = '{schema}'
        """).fetchdf()
        return result["table_name"].tolist()
    
    # ========== Cache & Optimization ==========
    
    def create_cached_table(self, name: str, sql: str) -> None:
        """Cache query results as a local table (speeds up repeated queries)"""
        self.con.execute(f"CREATE OR REPLACE TABLE {name} AS {sql}")
        print(f"✅ Created cached table: {name}")
    
    def drop_source(self, name: str) -> None:
        """Detach a data source"""
        self.con.execute(f"DETACH {name}")
        self._attached_sources.pop(name, None)
        print(f"🗑️ Detached: {name}")
    
    # ========== Diagnostic Tools ==========
    
    def explain(self, sql: str) -> pd.DataFrame:
        """Get SQL execution plan"""
        return self.con.execute(f"EXPLAIN {sql}").fetchdf()
    
    def explain_analyze(self, sql: str) -> pd.DataFrame:
        """Get actual execution plan with timing"""
        return self.con.execute(f"EXPLAIN ANALYZE {sql}").fetchdf()
    
    def show_attached(self) -> pd.DataFrame:
        """Show all attached data sources"""
        return self.con.execute("SHOW DATABASES").fetchdf()
    
    # ========== Lifecycle ==========
    
    def close(self) -> None:
        """Close connections and release resources"""
        self.con.close()
        print("🔒 DataHub closed")
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.close()

2.3 Practical Usage Example

# Use context manager for automatic connection cleanup
with DataHub(memory_limit="4GB") as hub:
    # Attach data sources
    hub.attach_pg("pg_orders", "host=192.168.1.10 dbname=ecommerce user=analyst password=***")
    hub.attach_csv_dir("csv_reports", "/data/reports/")
    hub.attach_http_json("exchange_rates", "https://api.exchangerate.host/latest?base=CNY")
    
    # Cross-source query
    df = hub.query("""
        WITH daily_sales AS (
            SELECT 
                o.order_date,
                c.category_name,
                SUM(o.amount) AS total_amount
            FROM pg_orders.orders o
            JOIN pg_orders.categories c ON o.category_id = c.category_id
            WHERE o.order_date >= '2026-01-01'
            GROUP BY o.order_date, c.category_name
        )
        SELECT 
            order_date,
            category_name,
            total_amount,
            total_amount * rates.usd_rate AS total_usd
        FROM daily_sales
        CROSS JOIN exchange_rates rates
        ORDER BY total_usd DESC
        LIMIT 20
    """)
    
    print(df.head())
    
    # Check execution plan to confirm predicate pushdown
    plan = hub.explain_analyze("""
        SELECT category_name, SUM(amount) 
        FROM pg_orders.orders 
        WHERE order_date = '2026-06-01' 
        GROUP BY category_name
    """)
    print(plan)

3. Performance Tuning: Making ATTACH Fly

3.1 Predicate Pushdown

DuckDB automatically pushes WHERE conditions and SELECT columns to the data source, but you need to actively write queries that trigger pushdown:

-- ✅ GOOD: Pushes down to PostgreSQL
SELECT category, SUM(amount) 
FROM pg_db.orders 
WHERE order_date = '2026-06-01' 
GROUP BY category;

-- ❌ BAD: Pulls entire table then filters
SELECT * FROM pg_db.orders;

Verify with EXPLAIN ANALYZE:

EXPLAIN ANALYZE 
SELECT category, SUM(amount) 
FROM pg_db.orders 
WHERE order_date = '2026-06-01';

Look for the Remote Scan node in the output and confirm the WHERE condition appears on the remote side.

3.2 Cache Hot Data

For frequently accessed aggregation results, create local cached tables:

-- First time: create cache
CREATE TABLE daily_category_sales AS
SELECT 
    DATE_TRUNC('day', order_date) AS sale_date,
    category_name,
    SUM(amount) AS total_amount,
    COUNT(*) AS order_count
FROM pg_db.orders
WHERE order_date >= '2026-01-01'
GROUP BY 1, 2;

-- Later: query local table (millisecond response)
SELECT * FROM daily_category_sales 
WHERE sale_date = '2026-06-24';

3.3 Connection Pooling

class DataHubPool:
    """Data source connection pool"""
    
    def __init__(self, max_connections: int = 5):
        self.hubs = []
        self.max = max_connections
    
    def get_hub(self) -> DataHub:
        if self.hubs:
            return self.hubs.pop()
        return DataHub()
    
    def release(self, hub: DataHub):
        if len(self.hubs) < self.max:
            self.hubs.append(hub)
        else:
            hub.close()

3.4 Memory Management

hub = DataHub(memory_limit="8GB", threads=8)

# Key configuration options
config = {
    "memory_limit": "8GB",          # Maximum memory
    "threads": 8,                    # Parallelism
    "max_threads": 16,              # Maximum thread count
    "temp_directory": "/tmp/duckdb", # Temp directory (for large queries)
    "enable_object_cache": True,    # Enable object cache
    "allow_unsigned_extensions": True,
}

4. Comparison with Traditional Approaches

DimensionPython Merge ApproachDuckDB ATTACH Approach
Code Volume50-200 lines (per data source)5-10 lines (unified ATTACH + SQL)
Memory UsageAll data loaded to RAM (GB scale)Streaming + pushdown (MB scale)
Query SpeedPandas merge/join (slow)Columnar vectorized execution (10-100x faster)
MaintainabilityEach source needs connection code changesNew source = one ATTACH line
ReusabilityMust rewrite connection logic each analysisDataHub instance reusable globally
Learning CurveNeed to master multiple library APIsStandard SQL only
Debugging DifficultyData in memory, hard to trace sourceEach source has clear namespace

5. Production Scenario: Unified E-Commerce Analytics Platform

Building a self-service analytics platform for business teams:

# 1. Initialize Hub
hub = DataHub(memory_limit="16GB")

# 2. Attach all data sources
hub.attach_pg("orders", "host=db-host dbname=ecommerce user=analyst")
hub.attach_pg("crm", "host=crm-host dbname=customer_db user=analyst")
hub.attach_csv_dir("marketing", "/data/marketing/")
hub.attach_parquet_dir("warehouse", "/data/warehouse/parquet/")
hub.attach_http_json("competitors", "https://api.competitor.com/prices")

# 3. Create caching layer (daily refresh)
hub.create_cached_table("monthly_summary", """
    SELECT 
        DATE_TRUNC('month', o.order_date) AS month,
        c.category_name,
        COUNT(*) AS orders,
        SUM(o.amount) AS revenue,
        AVG(o.amount) AS avg_order_value
    FROM orders.orders o
    JOIN orders.categories c ON o.category_id = c.category_id
    GROUP BY 1, 2
""")

# 4. Business users only need to run SQL
df = hub.query("""
    SELECT month, category_name, revenue, 
           revenue / LAG(revenue) OVER (ORDER BY month) - 1 AS mom_growth
    FROM monthly_summary
    WHERE category_name = 'Electronics'
    ORDER BY month DESC
""")

6. Monetization Strategies

This architecture pattern has significant commercial value:

1. Data Consulting Services (One-time $500-$1,500)

Customer pain point: Data scattered across 5-10 systems, unable to do cross-analysis.

Deliverables:

  • DuckDB DataHub configuration script
  • 3-5 core analysis SQL queries
  • Visualization report template

2. Automated Reporting Service (Monthly $150-$700)

Build daily/weekly automated cross-source reports for clients:

  • Data source ATTACH configuration
  • SQL queries + caching strategy
  • Scheduled execution + email/Telegram push

3. Lightweight BI Product (SaaS $15-$40/month)

Build a self-service query platform based on DataHub:

  • Frontend: Streamlit / Gradio / Shiny
  • Backend: DuckDB DataHub + FastAPI
  • Data sources: Client-configured ATTACH connections

4. Enterprise Training (One-time $700-$3,000)

Teach enterprise data teams to build their own DataHub:

  • ATTACH principles and best practices
  • Performance tuning (pushdown, caching, memory)
  • Production deployment (Docker + Cron + monitoring)

Core principle: Enterprise data won’t disappear just because you don’t want to move it — but you can make queries go to the data. Master this architecture pattern, and you hold the “master key” to data integration.


📖 The complete DataHub code template from this article (including Docker deployment guide, detailed configurations for PostgreSQL/MySQL/HTTP/CSV/Parquet five data sources) is published at duckdblab.org, with production tuning parameters and common pitfalls guide.

💡 More DuckDB advanced techniques → duckdblab.org

Olap Studio · Focused on DuckDB Practical Skills · 2026-06-24

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

Built with Hugo
Theme Stack designed by Jimmy

⚠️ This site is an independent community project, not affiliated with, endorsed by, or sponsored by the DuckDB Foundation or official DuckDB project.

"DuckDB" is a registered trademark of the DuckDB Foundation. This site uses the name solely for factual description purposes.

All content is for educational and community promotion purposes only and does not constitute any commercial service.