DuckDB 数据湖:使用 Iceberg 和 Delta Lake 构建生产级分析
TL;DR: DuckDB 的扩展生态系统现已原生支持 Apache Iceberg 和 Delta Lake。本指南向你展示如何构建生产级数据湖仓架构,将 DuckDB 的速度与开放表格式结合,实现 ACID 事务和时间旅行。
为什么选择数据湖仓?
传统数据架构将存储和计算分离:
┌─────────────────────────────────────────────────┐
│ 传统架构 │
│ │
│ [数据湖] ──> [ETL 管道] ──> [数据仓库] │
│ (S3/GCS) (Spark/Airflow) (Redshift) │
│ │
│ 问题: │
│ • 数据重复(湖 → 仓库) │
│ • 复杂的 ETL 管道 │
│ • 加载时强制实施模式 │
│ • 无 ACID 事务 │
└─────────────────────────────────────────────────┘
数据湖仓方法消除了这种复杂性:
┌─────────────────────────────────────────────────┐
│ 湖仓架构 │
│ │
│ [DuckDB] ──> [Apache Iceberg / Delta Lake] ──> │
│ (分析) (S3/GCS + 元数据) │
│ │
│ 优势: │
│ • 单一事实来源 │
│ • ACID 事务 │
│ • 时间旅行 │
│ • 模式演进 │
│ • 开放表格式 │
└─────────────────────────────────────────────────┘
使用 DuckDB 的 Apache Iceberg
安装
# Install DuckDB Iceberg extension
duckdb -c "INSTALL iceberg;"
duckdb -c "LOAD iceberg;"
# Or in Python
pip install duckdb
python -c "
import duckdb
con = duckdb.connect()
con.execute('INSTALL iceberg FROM community')
con.execute('LOAD iceberg')
"
连接到 S3
-- Configure S3 credentials
CREATE SECRET (
TYPE S3,
KEY_ID 'YOUR_AWS_KEY',
SECRET 'YOUR_AWS_SECRET',
REGION 'us-east-1'
);
-- List tables in an Iceberg catalog
SHOW TABLES IN iceberg ('s3://my-bucket/catalog/');
读取 Iceberg 表
-- Query Iceberg table directly from S3
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM read_iceberg('s3://my-bucket/data/orders/*.parquet')
GROUP BY 1
ORDER BY 2 DESC
LIMIT 100;
时间旅行查询
-- Query table as of a specific timestamp
SELECT * FROM iceberg_scan(
's3://my-bucket/data/orders/',
{
'snapshot_timestamp': '2026-06-20 10:00:00'
}
);
-- Query by branch
SELECT * FROM iceberg_scan(
's3://my-bucket/data/orders/',
{
'branch': 'feature-branch'
}
);
-- Query by commit ID
SELECT * FROM iceberg_scan(
's3://my-bucket/data/orders/',
{
'commit_id': 'abc123def456'
}
);
模式演进
-- Add new column to existing table
ALTER TABLE iceberg_scan('s3://my-bucket/data/orders/')
ADD COLUMN loyalty_tier VARCHAR;
-- Modify column type
ALTER TABLE iceberg_scan('s3://my-bucket/data/orders/')
ALTER COLUMN amount TYPE DECIMAL(18,4);
-- Rename column
ALTER TABLE iceberg_scan('s3://my-bucket/data/orders/')
RENAME COLUMN customer_id TO user_id;
写入 Iceberg
-- Create new Iceberg table
CREATE TABLE iceberg_scan('s3://my-bucket/data/new_orders/') AS
SELECT
order_id,
customer_id,
amount,
order_date,
status
FROM read_parquet('s3://my-bucket/raw/orders/*.parquet')
WHERE order_date >= '2026-01-01';
-- Append data
INSERT INTO iceberg_scan('s3://my-bucket/data/orders/')
SELECT * FROM read_parquet('s3://my-bucket/raw/new_orders/*.parquet');
-- Merge (upsert)
MERGE INTO iceberg_scan('s3://my-bucket/data/orders/') AS target
USING read_parquet('s3://my-bucket/raw/updates/*.parquet') AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
amount = source.amount,
status = source.status
WHEN NOT MATCHED THEN INSERT VALUES
(source.order_id, source.customer_id, source.amount, source.order_date, source.status);
使用 DuckDB 的 Delta Lake
安装
# Install Delta Lake extension
duckdb -c "INSTALL delta;"
duckdb -c "LOAD delta;"
连接到 Delta 表
-- Read Delta table from S3
SELECT * FROM delta_scan('s3://my-bucket/data/delta-orders/');
-- Query with time travel
SELECT * FROM delta_scan('s3://my-bucket/data/delta-orders/', {
'version': 42 -- Query by version
});
SELECT * FROM delta_scan('s3://my-bucket/data/delta-orders/', {
'timestamp': '2026-06-20' -- Query by timestamp
});
写入 Delta 表
-- Create Delta table
CREATE TABLE delta_scan('s3://my-bucket/data/delta-products/') AS
SELECT
product_id,
name,
category,
price,
stock_quantity
FROM read_parquet('s3://my-bucket/raw/products/*.parquet');
-- Append to Delta table
INSERT INTO delta_scan('s3://my-bucket/data/delta-orders/')
SELECT * FROM read_parquet('s3://my-bucket/raw/new_orders/*.parquet');
-- Merge in Delta
CALL delta_merge(
's3://my-bucket/data/delta-orders/',
read_parquet('s3://my-bucket/raw/updates/*.parquet'),
'source.order_id = target.order_id',
['UPDATE SET amount = source.amount, status = source.status'],
['INSERT VALUES (source.order_id, source.customer_id, source.amount, source.order_date, source.status)']
);
压缩和优化
-- Compact small files
CALL delta_optimize('s3://my-bucket/data/delta-orders/');
-- Vacuum old versions
CALL delta_vacuum('s3://my-bucket/data/delta-orders/', 0);
-- Z-order optimization
CALL delta_zorder('s3://my-bucket/data/delta-orders/', 'customer_id, order_date');
生产架构
数据湖结构
s3://my-company-data-lake/
├── raw/ # 原始数据(不可变)
│ ├── orders/
│ ├── products/
│ ├── events/
│ └── inventory/
├── curated/ # 精选表(Iceberg/Delta)
│ ├── customers/
│ ├── sales/
│ └── analytics/
├── staging/ # 临时处理
│ ├── etl-temp/
│ └── ml-features/
└── archives/ # 归档数据
└── historical/
使用 DuckDB 的 ETL 管道
import duckdb
import boto3
from datetime import datetime
class DataLakePipeline:
def __init__(self, bucket='my-company-data-lake'):
self.bucket = bucket
self.s3 = boto3.client('s3')
self.con = duckdb.connect(':memory:')
self.con.execute('INSTALL iceberg FROM community')
self.con.execute('LOAD iceberg')
def ingest_raw_data(self, source_path, target_path):
"""Ingest raw data from various sources."""
# Read from CSV
self.con.execute(f"""
COPY (
SELECT * FROM read_csv_auto('{source_path}')
) TO '{target_path}' (FORMAT PARQUET)
""")
def transform_curated(self, raw_table, curated_table, transformations):
"""Apply transformations and write to curated layer."""
self.con.execute(f"""
CREATE OR REPLACE TABLE {curated_table} AS
SELECT {transformations}
FROM {raw_table}
""")
def run_analytics(self, query):
"""Run analytical queries on curated data."""
return self.con.execute(query).fetchdf()
# Usage
pipeline = DataLakePipeline()
pipeline.ingest_raw_data(
's3://my-company-data-lake/raw/orders/*.csv',
's3://my-company-data-lake/curated/orders/'
)
results = pipeline.run_analytics("""
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(amount) as revenue,
COUNT(*) as orders
FROM iceberg_scan('s3://my-company-data-lake/curated/orders/')
GROUP BY 1
ORDER BY 1 DESC
""")
增量更新
-- Upsert new records
MERGE INTO iceberg_scan('s3://my-bucket/curated/orders/') AS target
USING (
SELECT * FROM read_parquet('s3://my-bucket/raw/new_orders/2026-06-27/*.parquet')
) AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET
amount = source.amount,
status = source.status,
updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN INSERT VALUES
(source.order_id, source.customer_id, source.amount, source.order_date, source.status);
性能优化
文件大小优化
-- DuckDB automatically optimizes Parquet file sizes
-- But you can control it explicitly:
COPY (
SELECT * FROM iceberg_scan('s3://my-bucket/raw/orders/')
) TO 's3://my-bucket/curated/orders/' (
FILE_FORMAT PARQUET,
COMPRESSION ZSTD,
PER_THREAD_OUTPUT TRUE,
MAX_PARQUET_ROW_GROUP_SIZE 1000000
);
分区策略
-- Partition by date for efficient filtering
CREATE TABLE iceberg_scan('s3://my-bucket/curated/orders/') AS
SELECT * FROM read_parquet('s3://my-bucket/raw/orders/*.parquet')
PARTITION BY (order_date);
-- Query with partition pruning
SELECT * FROM iceberg_scan('s3://my-bucket/curated/orders/')
WHERE order_date >= '2026-06-01' AND order_date < '2026-07-01';
-- Only scans June partitions
缓存策略
-- Enable result caching
SET enable_result_cache = TRUE;
SET result_cache_size = '2GB';
-- Cache frequently accessed tables
CREATE CACHE TABLE cached_monthly_sales AS
SELECT
DATE_TRUNC('month', order_date) as month,
category,
SUM(amount) as revenue
FROM iceberg_scan('s3://my-bucket/curated/orders/')
GROUP BY 1, 2;
成本对比
| 指标 | DuckDB + Iceberg | 传统数据仓库(Snowflake/BigQuery) |
|---|---|---|
| 计算成本 | $0.01/GB 扫描 | $5-10/GB 扫描 |
| 存储成本 | $0.023/GB/月(S3) | $0.023/GB/月 |
| ETL 复杂度 | 低(仅需 SQL) | 高(Spark/Airflow) |
| 时间旅行 | 内置 | 额外费用 |
| 模式演进 | 内置 | 有限 |
| 并发 | 有限(单进程) | 高(分布式) |
最佳实践
1. 使用 Parquet 作为存储格式
-- Always use Parquet for optimal compression and query performance
COPY data TO 's3://bucket/data/' (FORMAT PARQUET, COMPRESSION ZSTD);
2. 实施数据质量检查
-- Validate data before ingesting
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT order_id) as unique_orders,
COUNT(CASE WHEN amount <= 0 THEN 1 END) as invalid_amounts,
COUNT(CASE WHEN order_date > CURRENT_DATE THEN 1 END) as future_dates
FROM read_parquet('s3://bucket/raw/orders/*.parquet');
3. 设置生命周期策略
# S3 lifecycle policy for raw data
import boto3
s3 = boto3.client('s3')
s3.put_bucket_lifecycle_configuration(
Bucket='my-company-data-lake',
LifecycleConfiguration={
'Rules': [
{
'ID': 'archive-raw-data',
'Filter': {'Prefix': 'raw/'},
'Status': 'Enabled',
'Transitions': [
{'Days': 90, 'StorageClass': 'GLACIER'},
{'Days': 365, 'StorageClass': 'DEEP_ARCHIVE'}
]
}
]
}
)
4. 监控查询性能
-- Track query execution times
SELECT
query,
avg_time_ms,
total_runs,
p99_time_ms
FROM query_log
WHERE database = 'analytics'
GROUP BY 1, 2, 3, 4
ORDER BY avg_time_ms DESC;
结论
DuckDB 对 Apache Iceberg 和 Delta Lake 的原生支持使其成为构建生产级数据湖仓的绝佳选择。以下组合:
- DuckDB 的速度——极致的列式分析性能
- Iceberg/Delta 的 ACID 保证——可靠的交易和时间旅行
- S3/GCS 的可扩展性——低成本 PB 级存储
- 开放表格式——厂商中立、可移植的数据
创建了一个强大且成本效益高的分析平台,消除了对昂贵云数据仓库的需求。
对于大多数组织来说,在 S3 上使用 DuckDB + Iceberg 起步提供了性能、成本和灵活性的最佳平衡。
本指南假设对 SQL 和云存储有基本了解。有关生产部署,请参阅 Apache Iceberg 和 Delta Lake 的官方文档。