用 DuckDB MERGE INTO 干掉 ETL 里的 Upsert 噩梦:一行 SQL 替代 20 行 Python 代码
难度:⭐⭐⭐ | 预计耗时:15 分钟上手,之后告别脏数据

一、你的 ETL 是不是也在写这种代码?
很多数据工程师的日常是这样的:
# 传统做法:检查是否存在,然后决定 INSERT 还是 UPDATE
existing = con.execute("SELECT id FROM daily_sales WHERE date = '2026-06-30'").fetchall()
if existing:
con.execute("UPDATE daily_sales SET ... WHERE date = '2026-06-30'")
else:
con.execute("INSERT INTO daily_sales VALUES (...)")
这段代码有什么问题?
- 并发不安全:两个进程同时检查,发现都不存在,就插了两条重复数据
- 性能极差:两次网络往返(SELECT + INSERT/UPDATE)
- 代码冗长:每张表都要写一遍 check-then-insert 的逻辑
- 事务复杂度:还需要手动管理事务回滚
而 DuckDB 的 MERGE INTO 让你一行 SQL 搞定一切:判断存在则更新,不存在则插入,原子操作,无竞态。
二、幂等写入的核心原理
DuckDB 支持 SQL:2003 标准的 MERGE 语法(也叫 UPSERT),从 v0.9 开始全面支持。它的本质是一个原子化的更新插入操作:
MERGE INTO target_table
USING source_data
ON target_table.key = source_data.key
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT VALUES ...
关键优势:
- 原子性:整个操作在一个事务内完成,不存在竞态条件
- 批量操作:一次性处理成千上万条记录,远快于逐条循环
- 可读性强:意图清晰,匹配则更新,不匹配则插入
三、完整实战:电商销售日报的幂等写入
假设你每天从消息队列消费销售数据,需要写入 DuckDB 的分析表。数据可能包含重复消息,你需要确保同一笔订单不会重复累加。
Step 1:建表
import duckdb
con = duckdb.connect(":memory:")
# 主表:按订单号去重
con.execute("""
CREATE TABLE daily_sales (
order_id VARCHAR PRIMARY KEY,
product_id VARCHAR,
quantity INTEGER,
amount DOUBLE,
sale_date DATE,
region VARCHAR,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 临时表:存放当天消费的全部原始数据(可能含重复)
con.execute("""
CREATE TEMP TABLE raw_ingest AS
SELECT * FROM read_csv_auto('/data/kafka/sales_20260630.csv')
""")
Step 2:幂等写入
# 核心:一行 SQL 搞定幂等写入
con.execute("""
MERGE INTO daily_sales ds
USING raw_ingest ri
ON ds.order_id = ri.order_id
WHEN MATCHED THEN
UPDATE SET
quantity = ds.quantity + ri.quantity,
amount = ds.amount + ri.amount,
updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (order_id, product_id, quantity, amount, sale_date, region)
VALUES (ri.order_id, ri.product_id, ri.quantity, ri.amount, ri.sale_date, ri.region)
""")
# 验证结果
result = con.execute("""
SELECT
COUNT(*) as total_records,
COUNT(DISTINCT order_id) as unique_orders,
SUM(quantity) as total_qty,
ROUND(SUM(amount), 2) as total_revenue
FROM daily_sales
""").fetchall()
print(f'总记录: {result[0][0]} | 唯一订单: {result[0][1]} | 总收入: {result[0][3]}')
关键点:即使 CSV 中有 100 条重复的 order_id,幂等写入也只会在第一次匹配时 INSERT,后续重复的会通过 UPDATE 累加。这就是幂等性——多次执行结果一致。
四、进阶场景:带条件更新的 MERGE
不是所有匹配都需要更新。有时候你只想在特定条件下才更新已有记录:
-- 只更新金额变化超过 10% 的记录
MERGE INTO daily_sales ds
USING raw_ingest ri
ON ds.order_id = ri.order_id
WHEN MATCHED AND ABS(ds.amount - ri.amount) / NULLIF(ds.amount, 0) > 0.1 THEN
UPDATE SET
amount = ri.amount,
quantity = ri.quantity,
updated_at = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
INSERT (order_id, product_id, quantity, amount, sale_date, region)
VALUES (ri.order_id, ri.product_id, ri.quantity, ri.amount, ri.sale_date, ri.region);
这个场景非常实用:比如你的数据管道偶尔会发送"修正消息"(修正之前发错的数据),只有差异超过阈值时才真正更新,避免不必要的写放大。
五、性能优化:大批量 MERGE 的最佳实践
当 MERGE 涉及百万级以上数据时,需要注意以下几点:
1. 先对源数据去重
-- 在 MERGE 之前先去重,减少匹配工作量
CREATE TEMP TABLE deduped_raw AS
SELECT
order_id,
product_id,
SUM(quantity) as quantity,
SUM(amount) as amount,
MIN(sale_date) as sale_date,
FIRST_VALUE(region) OVER (PARTITION BY order_id) as region
FROM raw_ingest
GROUP BY order_id, product_id, sale_date
2. 利用 DuckDB 的向量化执行
DuckDB 的 MERGE 是向量化执行的,比逐行循环快 10-100 倍。确保启用多线程:
SET threads = 0; -- 使用所有 CPU 核心
3. 写入 Parquet 格式做持久化
# MERGE 完成后,导出为 Parquet 供后续分析
con.execute("""
COPY daily_sales TO '/data/daily/daily_sales_20260630.parquet'
(FORMAT PARQUET, COMPRESSION ZSTD, ROW_GROUP_SIZE 100000)
""")
4. 监控 MERGE 统计信息
-- DuckDB 不直接返回 MERGE 影响的行数,但可以事后验证
SELECT
(SELECT COUNT(*) FROM daily_sales) as current_total,
(SELECT COUNT(*) FROM raw_ingest) as source_total,
(SELECT COUNT(DISTINCT order_id) FROM raw_ingest) as source_unique;
六、传统工具 vs DuckDB MERGE INTO 对比
| 特性 | 传统 Python Upsert | DuckDB MERGE INTO | Spark DataFrameWriter | PostgreSQL ON CONFLICT |
|---|---|---|---|---|
| 代码行数 | 15-20 行 | 1 行 SQL | 5-8 行 | 3-5 行 |
| 并发安全 | ❌ 需手动锁 | ✅ 原子操作 | ✅ 分布式事务 | ✅ 行级锁 |
| 批量性能 | ⭐⭐ 逐行循环 | ⭐⭐⭐⭐⭐ 向量化 | ⭐⭐⭐⭐ 分布式 | ⭐⭐⭐⭐ 索引加速 |
| 条件更新 | ✅ 需写 if | ✅ WHEN MATCHED AND | ❌ 不支持 | ❌ 不支持 |
| 学习曲线 | 中等 | 低(标准 SQL) | 高(Spark API) | 低 |
| 适用场景 | 小规模数据 | 中大规模分析 | 超大数据集 | OLTP 生产库 |
七、变现建议
掌握 DuckDB MERGE INTO 幂等写入技巧后,你可以从以下几个方向实现商业变现:
数据管道咨询服务:为企业提供 ETL 优化方案,将传统的 Python Upsert 代码替换为 DuckDB MERGE INTO,通常能将数据处理速度提升 10 倍以上,按项目收费 5,000-50,000 元不等。
SaaS 数据产品:基于 DuckDB 构建轻量级数据管理平台,利用 MERGE INTO 实现数据的实时更新和去重,面向中小企业提供订阅制服务(月费 200-2,000 元/用户)。
技术培训课程:制作 DuckDB 高级实战课程,重点讲解 MERGE INTO 在 ETL 中的应用场景,在知识星球、极客时间等平台付费发售,单课程定价 99-399 元。
开源工具包:将幂等写入的最佳实践封装成 Python 库(如
duckdb-merge-utils),提供一键式 ETL 模板,通过 GitHub Sponsors 和商业许可双重变现。自动化报表产品:结合 DuckDB 的 PARQUET 导出能力和 MERGE INTO 的幂等特性,搭建每日/每周自动数据报表系统,按客户数量收取年费(10,000-100,000 元/年)。
💡 提示:如果你正在构建数据分析后端或需要高性能的 CSV/Parquet 查询能力,可以参考 duckdblab.org 获取更多实战教程和工具推荐。
本文基于 DuckDB 掘金实战内容整理,更多 DuckDB 高级用法请关注我们的频道获取最新教程。