DuckDB + Apache Iceberg:从查询到写入,打造你的数据湖实战指南

DuckDB 在 v1.5 版本中大幅增强了 Apache Iceberg 支持,不仅可查询 Iceberg 表,还支持写入、时间旅行和 Glue/Unity Catalog 集成。本文通过完整代码示例,带你从零搭建基于 DuckDB 的 Iceberg 数据湖。

一、为什么是 DuckDB + Iceberg?

Apache Iceberg 是当下最热门的开放表格式(Open Table Format)之一,提供 ACID 事务、时间旅行、Schema 演化和分区裁剪等企业级特性。但传统上操作 Iceberg 表需要 Spark、Flink 或 Trino 等重型引擎。

DuckDB 改变了这一切。

从 v1.5.0 开始,DuckDB 的 iceberg 扩展不仅支持高性能查询,还引入了完整的写入能力(INSERT/UPDATE/DELETE/MERGE),配合 Unity Catalog 和 Glue Catalog 集成,让一张普通的笔记本电脑就能成为 Iceberg 数据湖的开发环境。

DuckDB vs 传统 Iceberg 引擎

特性DuckDBApache SparkTrinoFlink
部署成本零(嵌入式)集群集群集群
启动时间< 1s3-5 min10-30s1-2 min
Iceberg 写入✅ v1.5+❌ 只读
Unity Catalog
内存需求256MB+8GB+4GB+4GB+
SQL 标准支持完整完整部分部分
Python 集成原生PySpark
单机 TB 级处理❌(需要集群)

数据来源:DuckDB v1.5.2 官网文档及社区基准测试。


二、环境准备

安装 DuckDB 并加载 Iceberg 扩展

# 安装 DuckDB CLI(最新 v1.5.2)
curl https://install.duckdb.org | sh

# 或使用 Python
pip install duckdb

启动 DuckDB 并加载 Iceberg 扩展:

-- 加载 Iceberg 扩展
INSTALL iceberg FROM community;
LOAD iceberg;

-- 验证
SELECT version();

创建示例数据

-- 生成模拟销售数据用于 Iceberg 写入测试
CREATE TABLE raw_sales AS
SELECT 
    range AS order_id,
    '2026-0' || (range % 9 + 1)::VARCHAR AS month,
    (random() * 1000)::INTEGER AS customer_id,
    CASE WHEN random() < 0.3 THEN '电子产品'
         WHEN random() < 0.6 THEN '服装'
         ELSE '日用品'
    END AS category,
    (random() * 5000 + 10)::DECIMAL(10,2) AS amount,
    DATE '2026-01-01' + INTERVAL (range % 365) DAY AS order_date
FROM range(1, 100000);

SELECT count(*) AS total_orders,
       round(sum(amount)) AS total_revenue
FROM raw_sales;

三、创建 Iceberg 表并写入数据

3.1 本地 Iceberg 表

-- 创建本地 Iceberg 表(基于文件系统)
ATTACH 'sales_iceberg' AS sales_db (TYPE iceberg);

USE sales_db;

-- 创建分区表(按月分区)
CREATE TABLE orders (
    order_id INTEGER,
    month VARCHAR,
    customer_id INTEGER,
    category VARCHAR,
    amount DECIMAL(10,2),
    order_date DATE
) PARTITION_BY (month);

-- 写入数据
INSERT INTO orders 
SELECT * FROM raw_sales;

-- 验证
SELECT month, count(*) AS orders, round(sum(amount)) AS revenue
FROM orders
GROUP BY month
ORDER BY month;

3.2 ACID 事务与时间旅行

Iceberg 的核心优势之一是 ACID 事务和快照隔离:

-- 查看 Iceberg 快照历史
SELECT snapshot_id, parent_id, timestamp, manifest_list
FROM iceberg_snapshots('sales_iceberg/orders');
-- 开始一个新事务:更新订单金额
BEGIN TRANSACTION;

UPDATE orders 
SET amount = amount * 1.1
WHERE category = '电子产品';

-- 查看更改
SELECT category, round(sum(amount)) AS revenue
FROM orders
WHERE category = '电子产品'
GROUP BY category;

COMMIT;
-- ⏱ 时间旅行:查询更新前的快照
-- 获取历史快照 ID
SELECT snapshot_id, timestamp 
FROM iceberg_snapshots('sales_iceberg/orders')
ORDER BY timestamp DESC;

-- 使用快照 ID 查询历史版本
SELECT category, round(sum(amount)) AS revenue
FROM orders
FOR SYSTEM_VERSION AS OF 1234567890
WHERE category = '电子产品'
GROUP BY category;
-- 回滚到指定快照
ALTER TABLE orders ROLLBACK TO 1234567890;

3.3 MERGE INTO(更新插入)

Iceberg 支持完整的 MERGE 操作:

-- 创建增量更新表
CREATE TABLE daily_updates AS
SELECT order_id, '2026-05' AS month, 
       customer_id, '电子产品' AS category,
       amount * 1.2 AS amount, order_date
FROM raw_sales
WHERE order_id <= 100;

-- MERGE 操作
MERGE INTO orders t
USING daily_updates s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET 
    amount = s.amount,
    category = s.category
WHEN NOT MATCHED THEN INSERT 
    (order_id, month, customer_id, category, amount, order_date)
VALUES 
    (s.order_id, s.month, s.customer_id, s.category, s.amount, s.order_date);

四、与 Unity Catalog 集成

DuckDB 支持通过 REST Catalog 接口连接 Unity Catalog 和 Glue Catalog:

-- 连接 Unity Catalog(需要 UC 端点)
ATTACH 'uc:my_catalog.my_schema' AS uc_db 
    (TYPE uc, 
     endpoint 'https://your-uc-instance/api/2.1/unity-catalog',
     token 'your_token_here');

-- 查询 UC 中的 Iceberg 表
SELECT * FROM uc_db.sales_region_iceberg
WHERE region = 'APAC'
LIMIT 100;
-- 连接 AWS Glue Catalog
ATTACH 'glue:my_database' AS glue_db 
    (TYPE glue, 
     region 'us-east-1');

-- 查询 Glue 中的 Iceberg 表
SELECT year, count(*) AS flights
FROM glue_db.flights_iceberg
WHERE year >= 2024
GROUP BY year
ORDER BY year DESC;

企业级集成对比

特性本地文件AWS GlueUnity Catalog
部署复杂度
成本免费按表付费按 CU 付费
多引擎共享有限
权限管理IAMRBAC
血缘追踪
跨区域复制手动

五、性能优化与最佳实践

5.1 分区策略

-- 好的分区:基数字段(月份、地区)
CREATE TABLE orders_partitioned (
    order_id INTEGER,
    month VARCHAR,
    region VARCHAR,
    amount DECIMAL(10,2)
) PARTITION_BY (month, region);

-- 避免:高基数分区(order_id、customer_id)
-- 这样会产生大量小文件,降低查询性能

5.2 文件压缩(Compaction)

频繁的写入会产生大量小文件,需要定期压缩:

-- 查看当前文件数
SELECT count(*) AS file_count,
       round(sum(file_size_in_bytes) / 1024 / 1024) AS total_mb
FROM iceberg_files('sales_iceberg/orders');

-- 执行压缩(重写小文件为大文件)
CALL iceberg_rewrite_data_files(
    'sales_iceberg/orders',
    strategy => 'binpack',
    target_bytes_per_file => '134217728'  -- 128MB
);

5.3 Iceberg 优化 SQL 示例

-- 利用 Iceberg 的分区裁剪
EXPLAIN ANALYZE
SELECT month, round(sum(amount)) AS revenue
FROM orders
WHERE month IN ('2026-01', '2026-02', '2026-03')
GROUP BY month;
-- DuckDB 会自动跳过无关分区

六、与传统数据仓库对比

维度DuckDB + IcebergSnowflakeAmazon RedshiftClickHouse
成本按存储付费(S3 ~$23/TB/月)$2-4/credit$0.25/小时起步自托管免费
查询速度SQLite 级别启动秒级秒级毫秒级
开放格式✅ Iceberg/Parquet❌ 专有❌ 专有❌ 专有
本地开发✅ 零依赖部分
ACID 事务✅ Iceberg 保证
数据湖兼容✅ 原生有限有限
CI/CD 集成✅ 嵌入式

关键洞察:DuckDB + Iceberg 的组合特别适合以下场景:

  • 数据湖的开发和测试环境(替代昂贵的 Spark 集群)
  • 中小规模的数据分析管道(< 100GB)
  • 需要开放格式锁定规避的团队
  • 云成本敏感的组织

七、进阶:构建自动化 Iceberg 管道

以下是一个完整的 Python 自动化脚本示例:

import duckdb
import pandas as pd
from datetime import datetime

def ingest_to_iceberg(csv_path: str, table_path: str, partition_col: str):
    """
    将 CSV 数据自动写入 Iceberg 表并触发压缩
    """
    con = duckdb.connect()
    
    # 加载扩展
    con.execute("INSTALL iceberg FROM community; LOAD iceberg;")
    
    # 创建或附加 Iceberg 数据库
    con.execute(f"ATTACH '{table_path}' AS db (TYPE iceberg)")
    
    # 读取 CSV 并写入 Iceberg
    con.execute(f"""
        CREATE OR REPLACE TABLE db.raw_data AS 
        SELECT *, '{datetime.now().strftime('%Y-%m-%d')}' AS ingest_date
        FROM read_csv_auto('{csv_path}')
    """)
    
    # 创建分区表并插入
    con.execute(f"""
        CREATE OR REPLACE TABLE db.partitioned_data 
        PARTITION_BY ({partition_col}) AS
        SELECT * FROM db.raw_data
    """)
    
    # 压缩小文件
    con.execute(f"CALL iceberg_rewrite_data_files('{table_path}/partitioned_data', 'binpack', 134217728)")
    
    # 统计
    rows = con.execute(f"SELECT count(*) FROM db.partitioned_data").fetchone()[0]
    print(f"✅ 成功导入 {rows} 行到 {table_path}")
    
    con.close()

# 使用示例
ingest_to_iceberg(
    csv_path='/data/sales_2026.csv',
    table_path='s3://my-bucket/iceberg/sales',
    partition_col='month'
)

八、变现建议 💰

掌握 DuckDB + Iceberg 的组合技能可以在以下领域创造价值:

1. 数据湖咨询与迁移服务

  • 目标客户:正在从 Spark 迁移到轻量化方案的中小企业
  • 服务内容:帮助客户将现有数据管道从 Spark/Flink 迁移到 DuckDB + Iceberg
  • 定价参考:每次迁移服务 $500-$3,000,取决于数据规模
  • 交付物:迁移方案文档 + 自动化迁移脚本 + 性能对比报告

2. Iceberg 数据管道模板产品化

  • 产品形式:将上面的 Python 自动化脚本包装为 CLI 工具或 SaaS 服务
  • 定价模式:$49/月的订阅制,包含自动压缩、监控和告警
  • 目标用户:中小团队的数据工程师

3. 培训与教育工作坊

  • 线上课程:《DuckDB + Iceberg 实战——从零搭建企业数据湖》
  • 定价:$199/人(录播)或 $499/人(直播+答疑)
  • 内容大纲:Iceberg 原理 → DuckDB 操作 → Catalog 集成 → 管道自动化 → 生产部署

4. 开源周边工具

  • 开发 DuckDB Iceberg 管理工具(类似 pgAdmin 但针对 Iceberg)
  • 通过 GitHub Sponsors 获取资助,或提供企业版功能

5. 性能优化顾问

  • 针对已使用 Iceberg 但性能不佳的团队提供诊断
  • 服务内容:文件布局分析 → 分区策略优化 → 查询重写
  • 定价:$200/小时的按需咨询

总结

DuckDB + Apache Iceberg 的组合让数据湖技术从集群专属走向了人人可用。无论是本地开发测试、CI/CD 流水线,还是生产环境的数据管道,DuckDB 都能以极低的成本和部署复杂度完成 Iceberg 表的读写操作。

记住三个核心要点:

  1. 分区策略决定性能:选择低基数字段分区
  2. 定期压缩是必须的:频繁写入后执行 iceberg_rewrite_data_files
  3. Schema 演化是 Iceberg 杀招:利用 Iceberg 的 Schema 演化能力,无需停机即可修改表结构

开始上手吧——你的笔记本电脑或许是世界上最便宜的数据湖。