DuckDB 数据湖:使用 Iceberg 和 Delta Lake 构建生产级分析

DuckDB 数据湖:使用 Iceberg 和 Delta Lake 构建生产级分析

TL;DR: DuckDB 的扩展生态系统现已原生支持 Apache Iceberg 和 Delta Lake。本指南向你展示如何构建生产级数据湖仓架构,将 DuckDB 的速度与开放表格式结合,实现 ACID 事务和时间旅行。


为什么选择数据湖仓?

传统数据架构将存储和计算分离:

┌─────────────────────────────────────────────────┐
│              传统架构                              │
│                                                  │
│  [数据湖] ──> [ETL 管道] ──> [数据仓库]          │
│   (S3/GCS)          (Spark/Airflow)   (Redshift) │
│                                                  │
│  问题:                                          │
│  • 数据重复(湖 → 仓库)                         │
│  • 复杂的 ETL 管道                               │
│  • 加载时强制实施模式                            │
│  • 无 ACID 事务                                  │
└─────────────────────────────────────────────────┘

数据湖仓方法消除了这种复杂性:

┌─────────────────────────────────────────────────┐
│              湖仓架构                              │
│                                                  │
│  [DuckDB] ──> [Apache Iceberg / Delta Lake] ──> │
│   (分析)         (S3/GCS + 元数据)                │
│                                                  │
│  优势:                                          │
│  • 单一事实来源                                  │
│  • ACID 事务                                    │
│  • 时间旅行                                      │
│  • 模式演进                                      │
│  • 开放表格式                                    │
└─────────────────────────────────────────────────┘

使用 DuckDB 的 Apache Iceberg

安装

# Install DuckDB Iceberg extension
duckdb -c "INSTALL iceberg;"
duckdb -c "LOAD iceberg;"

# Or in Python
pip install duckdb
python -c "
import duckdb
con = duckdb.connect()
con.execute('INSTALL iceberg FROM community')
con.execute('LOAD iceberg')
"

连接到 S3

-- Configure S3 credentials
CREATE SECRET (
    TYPE S3,
    KEY_ID 'YOUR_AWS_KEY',
    SECRET 'YOUR_AWS_SECRET',
    REGION 'us-east-1'
);

-- List tables in an Iceberg catalog
SHOW TABLES IN iceberg ('s3://my-bucket/catalog/');

读取 Iceberg 表

-- Query Iceberg table directly from S3
SELECT 
    customer_id,
    COUNT(*) as order_count,
    SUM(amount) as total_spent
FROM read_iceberg('s3://my-bucket/data/orders/*.parquet')
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;

时间旅行查询

-- Query table as of a specific timestamp
SELECT * FROM iceberg_scan(
    's3://my-bucket/data/orders/',
    {
        'snapshot_timestamp': '2026-06-20 10:00:00'
    }
);

-- Query by branch
SELECT * FROM iceberg_scan(
    's3://my-bucket/data/orders/',
    {
        'branch': 'feature-branch'
    }
);

-- Query by commit ID
SELECT * FROM iceberg_scan(
    's3://my-bucket/data/orders/',
    {
        'commit_id': 'abc123def456'
    }
);

模式演进

-- Add new column to existing table
ALTER TABLE iceberg_scan('s3://my-bucket/data/orders/') 
ADD COLUMN loyalty_tier VARCHAR;

-- Modify column type
ALTER TABLE iceberg_scan('s3://my-bucket/data/orders/') 
ALTER COLUMN amount TYPE DECIMAL(18,4);

-- Rename column
ALTER TABLE iceberg_scan('s3://my-bucket/data/orders/') 
RENAME COLUMN customer_id TO user_id;

写入 Iceberg

-- Create new Iceberg table
CREATE TABLE iceberg_scan('s3://my-bucket/data/new_orders/') AS
SELECT 
    order_id,
    customer_id,
    amount,
    order_date,
    status
FROM read_parquet('s3://my-bucket/raw/orders/*.parquet')
WHERE order_date >= '2026-01-01';

-- Append data
INSERT INTO iceberg_scan('s3://my-bucket/data/orders/')
SELECT * FROM read_parquet('s3://my-bucket/raw/new_orders/*.parquet');

-- Merge (upsert)
MERGE INTO iceberg_scan('s3://my-bucket/data/orders/') AS target
USING read_parquet('s3://my-bucket/raw/updates/*.parquet') AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET 
    amount = source.amount,
    status = source.status
WHEN NOT MATCHED THEN INSERT VALUES 
    (source.order_id, source.customer_id, source.amount, source.order_date, source.status);

使用 DuckDB 的 Delta Lake

安装

# Install Delta Lake extension
duckdb -c "INSTALL delta;"
duckdb -c "LOAD delta;"

连接到 Delta 表

-- Read Delta table from S3
SELECT * FROM delta_scan('s3://my-bucket/data/delta-orders/');

-- Query with time travel
SELECT * FROM delta_scan('s3://my-bucket/data/delta-orders/', {
    'version': 42  -- Query by version
});

SELECT * FROM delta_scan('s3://my-bucket/data/delta-orders/', {
    'timestamp': '2026-06-20'  -- Query by timestamp
});

写入 Delta 表

-- Create Delta table
CREATE TABLE delta_scan('s3://my-bucket/data/delta-products/') AS
SELECT 
    product_id,
    name,
    category,
    price,
    stock_quantity
FROM read_parquet('s3://my-bucket/raw/products/*.parquet');

-- Append to Delta table
INSERT INTO delta_scan('s3://my-bucket/data/delta-orders/')
SELECT * FROM read_parquet('s3://my-bucket/raw/new_orders/*.parquet');

-- Merge in Delta
CALL delta_merge(
    's3://my-bucket/data/delta-orders/',
    read_parquet('s3://my-bucket/raw/updates/*.parquet'),
    'source.order_id = target.order_id',
    ['UPDATE SET amount = source.amount, status = source.status'],
    ['INSERT VALUES (source.order_id, source.customer_id, source.amount, source.order_date, source.status)']
);

压缩和优化

-- Compact small files
CALL delta_optimize('s3://my-bucket/data/delta-orders/');

-- Vacuum old versions
CALL delta_vacuum('s3://my-bucket/data/delta-orders/', 0);

-- Z-order optimization
CALL delta_zorder('s3://my-bucket/data/delta-orders/', 'customer_id, order_date');

生产架构

数据湖结构

s3://my-company-data-lake/
├── raw/                    # 原始数据(不可变)
│   ├── orders/
│   ├── products/
│   ├── events/
│   └── inventory/
├── curated/                # 精选表(Iceberg/Delta)
│   ├── customers/
│   ├── sales/
│   └── analytics/
├── staging/                # 临时处理
│   ├── etl-temp/
│   └── ml-features/
└── archives/               # 归档数据
    └── historical/

使用 DuckDB 的 ETL 管道

import duckdb
import boto3
from datetime import datetime

class DataLakePipeline:
    def __init__(self, bucket='my-company-data-lake'):
        self.bucket = bucket
        self.s3 = boto3.client('s3')
        self.con = duckdb.connect(':memory:')
        self.con.execute('INSTALL iceberg FROM community')
        self.con.execute('LOAD iceberg')
    
    def ingest_raw_data(self, source_path, target_path):
        """Ingest raw data from various sources."""
        # Read from CSV
        self.con.execute(f"""
            COPY (
                SELECT * FROM read_csv_auto('{source_path}')
            ) TO '{target_path}' (FORMAT PARQUET)
        """)
    
    def transform_curated(self, raw_table, curated_table, transformations):
        """Apply transformations and write to curated layer."""
        self.con.execute(f"""
            CREATE OR REPLACE TABLE {curated_table} AS
            SELECT {transformations}
            FROM {raw_table}
        """)
    
    def run_analytics(self, query):
        """Run analytical queries on curated data."""
        return self.con.execute(query).fetchdf()

# Usage
pipeline = DataLakePipeline()
pipeline.ingest_raw_data(
    's3://my-company-data-lake/raw/orders/*.csv',
    's3://my-company-data-lake/curated/orders/'
)
results = pipeline.run_analytics("""
    SELECT 
        DATE_TRUNC('month', order_date) as month,
        SUM(amount) as revenue,
        COUNT(*) as orders
    FROM iceberg_scan('s3://my-company-data-lake/curated/orders/')
    GROUP BY 1
    ORDER BY 1 DESC
""")

增量更新

-- Upsert new records
MERGE INTO iceberg_scan('s3://my-bucket/curated/orders/') AS target
USING (
    SELECT * FROM read_parquet('s3://my-bucket/raw/new_orders/2026-06-27/*.parquet')
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
    amount = source.amount,
    status = source.status,
    updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN INSERT VALUES
    (source.order_id, source.customer_id, source.amount, source.order_date, source.status);

性能优化

文件大小优化

-- DuckDB automatically optimizes Parquet file sizes
-- But you can control it explicitly:
COPY (
    SELECT * FROM iceberg_scan('s3://my-bucket/raw/orders/')
) TO 's3://my-bucket/curated/orders/' (
    FILE_FORMAT PARQUET,
    COMPRESSION ZSTD,
    PER_THREAD_OUTPUT TRUE,
    MAX_PARQUET_ROW_GROUP_SIZE 1000000
);

分区策略

-- Partition by date for efficient filtering
CREATE TABLE iceberg_scan('s3://my-bucket/curated/orders/') AS
SELECT * FROM read_parquet('s3://my-bucket/raw/orders/*.parquet')
PARTITION BY (order_date);

-- Query with partition pruning
SELECT * FROM iceberg_scan('s3://my-bucket/curated/orders/')
WHERE order_date >= '2026-06-01' AND order_date < '2026-07-01';
-- Only scans June partitions

缓存策略

-- Enable result caching
SET enable_result_cache = TRUE;
SET result_cache_size = '2GB';

-- Cache frequently accessed tables
CREATE CACHE TABLE cached_monthly_sales AS
SELECT 
    DATE_TRUNC('month', order_date) as month,
    category,
    SUM(amount) as revenue
FROM iceberg_scan('s3://my-bucket/curated/orders/')
GROUP BY 1, 2;

成本对比

指标DuckDB + Iceberg传统数据仓库(Snowflake/BigQuery)
计算成本$0.01/GB 扫描$5-10/GB 扫描
存储成本$0.023/GB/月(S3)$0.023/GB/月
ETL 复杂度低(仅需 SQL)高(Spark/Airflow)
时间旅行内置额外费用
模式演进内置有限
并发有限(单进程)高(分布式)

最佳实践

1. 使用 Parquet 作为存储格式

-- Always use Parquet for optimal compression and query performance
COPY data TO 's3://bucket/data/' (FORMAT PARQUET, COMPRESSION ZSTD);

2. 实施数据质量检查

-- Validate data before ingesting
SELECT 
    COUNT(*) as total_rows,
    COUNT(DISTINCT order_id) as unique_orders,
    COUNT(CASE WHEN amount <= 0 THEN 1 END) as invalid_amounts,
    COUNT(CASE WHEN order_date > CURRENT_DATE THEN 1 END) as future_dates
FROM read_parquet('s3://bucket/raw/orders/*.parquet');

3. 设置生命周期策略

# S3 lifecycle policy for raw data
import boto3

s3 = boto3.client('s3')
s3.put_bucket_lifecycle_configuration(
    Bucket='my-company-data-lake',
    LifecycleConfiguration={
        'Rules': [
            {
                'ID': 'archive-raw-data',
                'Filter': {'Prefix': 'raw/'},
                'Status': 'Enabled',
                'Transitions': [
                    {'Days': 90, 'StorageClass': 'GLACIER'},
                    {'Days': 365, 'StorageClass': 'DEEP_ARCHIVE'}
                ]
            }
        ]
    }
)

4. 监控查询性能

-- Track query execution times
SELECT 
    query,
    avg_time_ms,
    total_runs,
    p99_time_ms
FROM query_log
WHERE database = 'analytics'
GROUP BY 1, 2, 3, 4
ORDER BY avg_time_ms DESC;

结论

DuckDB 对 Apache Iceberg 和 Delta Lake 的原生支持使其成为构建生产级数据湖仓的绝佳选择。以下组合:

  1. DuckDB 的速度——极致的列式分析性能
  2. Iceberg/Delta 的 ACID 保证——可靠的交易和时间旅行
  3. S3/GCS 的可扩展性——低成本 PB 级存储
  4. 开放表格式——厂商中立、可移植的数据

创建了一个强大且成本效益高的分析平台,消除了对昂贵云数据仓库的需求。

对于大多数组织来说,在 S3 上使用 DuckDB + Iceberg 起步提供了性能、成本和灵活性的最佳平衡。


本指南假设对 SQL 和云存储有基本了解。有关生产部署,请参阅 Apache Iceberg 和 Delta Lake 的官方文档。

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计

⚠️ 本站为独立社区项目,与 DuckDB 基金会及 DuckDB 官方项目无任何从属、背书或赞助关系。

"DuckDB" 是 DuckDB 基金会的注册商标,本站仅以事实描述方式使用该名称。

本站内容仅供教育与社区推广用途,不构成任何商业服务。