Featured image of post 用 DuckDB MERGE INTO 干掉 ETL 里的 Upsert 噩梦

用 DuckDB MERGE INTO 干掉 ETL 里的 Upsert 噩梦

一行 SQL 替代 20 行 Python 代码,掌握 DuckDB MERGE INTO 幂等写入技巧,彻底告别 ETL 中的 Upsert 竞态条件和脏数据问题。

用 DuckDB MERGE INTO 干掉 ETL 里的 Upsert 噩梦:一行 SQL 替代 20 行 Python 代码

难度:⭐⭐⭐ | 预计耗时:15 分钟上手,之后告别脏数据

DuckDB MERGE INTO 架构图

一、你的 ETL 是不是也在写这种代码?

很多数据工程师的日常是这样的:

# 传统做法:检查是否存在,然后决定 INSERT 还是 UPDATE
existing = con.execute("SELECT id FROM daily_sales WHERE date = '2026-06-30'").fetchall()
if existing:
    con.execute("UPDATE daily_sales SET ... WHERE date = '2026-06-30'")
else:
    con.execute("INSERT INTO daily_sales VALUES (...)")

这段代码有什么问题?

  1. 并发不安全:两个进程同时检查,发现都不存在,就插了两条重复数据
  2. 性能极差:两次网络往返(SELECT + INSERT/UPDATE)
  3. 代码冗长:每张表都要写一遍 check-then-insert 的逻辑
  4. 事务复杂度:还需要手动管理事务回滚

而 DuckDB 的 MERGE INTO 让你一行 SQL 搞定一切:判断存在则更新,不存在则插入,原子操作,无竞态。

二、幂等写入的核心原理

DuckDB 支持 SQL:2003 标准的 MERGE 语法(也叫 UPSERT),从 v0.9 开始全面支持。它的本质是一个原子化的更新插入操作:

MERGE INTO target_table
USING source_data
ON target_table.key = source_data.key
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT VALUES ...

关键优势:

  • 原子性:整个操作在一个事务内完成,不存在竞态条件
  • 批量操作:一次性处理成千上万条记录,远快于逐条循环
  • 可读性强:意图清晰,匹配则更新,不匹配则插入

三、完整实战:电商销售日报的幂等写入

假设你每天从消息队列消费销售数据,需要写入 DuckDB 的分析表。数据可能包含重复消息,你需要确保同一笔订单不会重复累加

Step 1:建表

import duckdb

con = duckdb.connect(":memory:")

# 主表:按订单号去重
con.execute("""
    CREATE TABLE daily_sales (
        order_id VARCHAR PRIMARY KEY,
        product_id VARCHAR,
        quantity INTEGER,
        amount DOUBLE,
        sale_date DATE,
        region VARCHAR,
        updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
""")

# 临时表:存放当天消费的全部原始数据(可能含重复)
con.execute("""
    CREATE TEMP TABLE raw_ingest AS
    SELECT * FROM read_csv_auto('/data/kafka/sales_20260630.csv')
""")

Step 2:幂等写入

# 核心:一行 SQL 搞定幂等写入
con.execute("""
    MERGE INTO daily_sales ds
    USING raw_ingest ri
    ON ds.order_id = ri.order_id
    WHEN MATCHED THEN
        UPDATE SET
            quantity = ds.quantity + ri.quantity,
            amount = ds.amount + ri.amount,
            updated_at = CURRENT_TIMESTAMP
    WHEN NOT MATCHED THEN
        INSERT (order_id, product_id, quantity, amount, sale_date, region)
        VALUES (ri.order_id, ri.product_id, ri.quantity, ri.amount, ri.sale_date, ri.region)
""")

# 验证结果
result = con.execute("""
    SELECT
        COUNT(*) as total_records,
        COUNT(DISTINCT order_id) as unique_orders,
        SUM(quantity) as total_qty,
        ROUND(SUM(amount), 2) as total_revenue
    FROM daily_sales
""").fetchall()

print(f'总记录: {result[0][0]} | 唯一订单: {result[0][1]} | 总收入: {result[0][3]}')

关键点:即使 CSV 中有 100 条重复的 order_id,幂等写入也只会在第一次匹配时 INSERT,后续重复的会通过 UPDATE 累加。这就是幂等性——多次执行结果一致。

四、进阶场景:带条件更新的 MERGE

不是所有匹配都需要更新。有时候你只想在特定条件下才更新已有记录:

-- 只更新金额变化超过 10% 的记录
MERGE INTO daily_sales ds
USING raw_ingest ri
ON ds.order_id = ri.order_id
WHEN MATCHED AND ABS(ds.amount - ri.amount) / NULLIF(ds.amount, 0) > 0.1 THEN
    UPDATE SET
        amount = ri.amount,
        quantity = ri.quantity,
        updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
    INSERT (order_id, product_id, quantity, amount, sale_date, region)
    VALUES (ri.order_id, ri.product_id, ri.quantity, ri.amount, ri.sale_date, ri.region);

这个场景非常实用:比如你的数据管道偶尔会发送"修正消息"(修正之前发错的数据),只有差异超过阈值时才真正更新,避免不必要的写放大。

五、性能优化:大批量 MERGE 的最佳实践

当 MERGE 涉及百万级以上数据时,需要注意以下几点:

1. 先对源数据去重

-- 在 MERGE 之前先去重,减少匹配工作量
CREATE TEMP TABLE deduped_raw AS
SELECT 
    order_id,
    product_id,
    SUM(quantity) as quantity,
    SUM(amount) as amount,
    MIN(sale_date) as sale_date,
    FIRST_VALUE(region) OVER (PARTITION BY order_id) as region
FROM raw_ingest
GROUP BY order_id, product_id, sale_date

2. 利用 DuckDB 的向量化执行

DuckDB 的 MERGE 是向量化执行的,比逐行循环快 10-100 倍。确保启用多线程:

SET threads = 0;  -- 使用所有 CPU 核心

3. 写入 Parquet 格式做持久化

# MERGE 完成后,导出为 Parquet 供后续分析
con.execute("""
    COPY daily_sales TO '/data/daily/daily_sales_20260630.parquet'
    (FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000)
""")

4. 监控 MERGE 统计信息

-- DuckDB 不直接返回 MERGE 影响的行数,但可以事后验证
SELECT 
    (SELECT COUNT(*) FROM daily_sales) as current_total,
    (SELECT COUNT(*) FROM raw_ingest) as source_total,
    (SELECT COUNT(DISTINCT order_id) FROM raw_ingest) as source_unique;

六、传统工具 vs DuckDB MERGE INTO 对比

特性传统 Python UpsertDuckDB MERGE INTOSpark DataFrameWriterPostgreSQL ON CONFLICT
代码行数15-20 行1 行 SQL5-8 行3-5 行
并发安全❌ 需手动锁✅ 原子操作✅ 分布式事务✅ 行级锁
批量性能⭐⭐ 逐行循环⭐⭐⭐⭐⭐ 向量化⭐⭐⭐⭐ 分布式⭐⭐⭐⭐ 索引加速
条件更新✅ 需写 if✅ WHEN MATCHED AND❌ 不支持❌ 不支持
学习曲线中等低(标准 SQL)高(Spark API)
适用场景小规模数据中大规模分析超大数据集OLTP 生产库

七、变现建议

掌握 DuckDB MERGE INTO 幂等写入技巧后,你可以从以下几个方向实现商业变现:

  1. 数据管道咨询服务:为企业提供 ETL 优化方案,将传统的 Python Upsert 代码替换为 DuckDB MERGE INTO,通常能将数据处理速度提升 10 倍以上,按项目收费 5,000-50,000 元不等。

  2. SaaS 数据产品:基于 DuckDB 构建轻量级数据管理平台,利用 MERGE INTO 实现数据的实时更新和去重,面向中小企业提供订阅制服务(月费 200-2,000 元/用户)。

  3. 技术培训课程:制作 DuckDB 高级实战课程,重点讲解 MERGE INTO 在 ETL 中的应用场景,在知识星球、极客时间等平台付费发售,单课程定价 99-399 元。

  4. 开源工具包:将幂等写入的最佳实践封装成 Python 库(如 duckdb-merge-utils),提供一键式 ETL 模板,通过 GitHub Sponsors 和商业许可双重变现。

  5. 自动化报表产品:结合 DuckDB 的 PARQUET 导出能力和 MERGE INTO 的幂等特性,搭建每日/每周自动数据报表系统,按客户数量收取年费(10,000-100,000 元/年)。

💡 提示:如果你正在构建数据分析后端或需要高性能的 CSV/Parquet 查询能力,可以参考 duckdblab.org 获取更多实战教程和工具推荐。


本文基于 DuckDB 掘金实战内容整理,更多 DuckDB 高级用法请关注我们的频道获取最新教程。

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计

⚠️ 本站为独立社区项目,与 DuckDB 基金会及 DuckDB 官方项目无任何从属、背书或赞助关系。

"DuckDB" 是 DuckDB 基金会的注册商标,本站仅以事实描述方式使用该名称。

本站内容仅供教育与社区推广用途,不构成任何商业服务。