
引言:一个困扰无数数据工作者的痛点
假设你是一名数据工程师或独立开发者,每天需要处理大量按天、按月或按项目分割的 DuckDB 数据库文件。常见的场景包括:
- 按天分片的业务数据库:每个
daily_20260601.duckdb文件存储一天的订单数据 - 多项目汇总分析:三个团队各自维护一份
.duckdb文件,你需要跨项目做对比 - 备份文件检索:
backup/目录下有数百个历史备份,需要快速查找某条记录 - 数据湖归档查询:冷数据存储为独立的 DuckDB 文件,需要临时聚合分析
在没有 read_duckdb() 之前,你的解决方案通常是这样的:
import duckdb
results = []
for f in glob.glob("data_*.duckdb"):
conn = duckdb.connect(f)
results.append(conn.execute("SELECT * FROM orders").fetchdf())
conn.close()
merged = pd.concat(results)
这段代码的问题很明显:
- 每次都要建立连接再关闭,开销大且容易忘记关闭导致资源泄漏
- 数据全部加载到内存,面对几百 GB 的文件时直接 OOM
- 无法下推过滤条件,必须先全量读取再过滤
- 代码冗长,简单的合并操作需要十几行 Python
而 read_duckdb() 的出现,彻底改变了这一切。
read_duckdb() 是什么?
read_duckdb() 是 DuckDB 1.5.0(Variegata 版本)引入的一个内置函数,允许你在 SQL 中直接读取 .duckdb 文件中的表数据,并支持 glob 通配符匹配多个文件。
它的核心语法非常简单:
read_duckdb(file_pattern [, table_name])
file_pattern:文件路径模式,支持*和?通配符table_name(可选):要读取的表名;不指定则返回文件中的所有表
基础用法:从简单到复杂
读取单个文件
-- 读取 backup.db 中默认的第一个表
SELECT * FROM read_duckdb('backup.db');
-- 读取指定表
SELECT * FROM read_duckdb('backup.db', 'orders');
合并多个文件
-- 合并所有 daily_*.duckdb 文件中的 orders 表
SELECT * FROM read_duckdb('daily_*.duckdb', 'orders');
-- 递归读取子目录中的所有 .duckdb 文件
SELECT * FROM read_duckdb('archive/**/*.duckdb', 'metrics');
配合 SQL 操作
-- 带过滤条件的合并查询
SELECT order_id, customer_id, total_amount
FROM read_duckdb('daily_*.duckdb', 'orders')
WHERE order_date >= '2026-06-01'
AND total_amount > 100;
-- 聚合统计
SELECT
DATE_TRUNC('day', order_date) AS day,
COUNT(*) AS order_count,
SUM(total_amount) AS revenue,
AVG(total_amount) AS avg_order_value
FROM read_duckdb('daily_*.duckdb', 'orders')
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY day;
Python 集成:在生产环境中使用
在实际项目中,你通常会通过 DuckDB 的 Python 绑定来使用这个功能:
import duckdb
import glob
# 方法一:直接使用 SQL
conn = duckdb.connect()
result = conn.execute("""
SELECT
_source_file AS file_source,
COUNT(*) AS order_count,
SUM(total_amount) AS total_revenue
FROM read_duckdb('daily_reports/*.duckdb', 'orders')
GROUP BY _source_file
ORDER BY order_count DESC
""").fetchdf()
print(result)
关键特性:_source_file 伪列
当你使用 read_duckdb() 读取多个文件时,DuckDB 会自动为结果添加一个名为 _source_file 的列,记录每一行数据来源于哪个文件。这个特性在调试和数据溯源时非常有用:
SELECT
_source_file,
order_id,
customer_id,
total_amount,
order_date
FROM read_duckdb('daily_*.duckdb', 'orders')
LIMIT 10;
输出示例:
┌───────────────────────────┬──────────┬─────────────┬──────────────┬────────────┐
│ _source_file │ order_id │ customer_id │ total_amount │ order_date │
├───────────────────────────┼──────────┼─────────────┼──────────────┼────────────┤
│ daily_20260601.duckdb │ 10001 │ C-5023 │ 259.99 │ 2026-06-01 │
│ daily_20260601.duckdb │ 10002 │ C-7891 │ 89.50 │ 2026-06-01 │
│ daily_20260602.duckdb │ 10003 │ C-3344 │ 432.00 │ 2026-06-02 │
│ daily_20260602.duckdb │ 10004 │ C-1122 │ 67.80 │ 2026-06-02 │
└───────────────────────────┴──────────┴─────────────┴──────────────┴────────────┘
性能优化:让合并查询飞起来
1. 谓词下推(Predicate Pushdown)
read_duckdb() 支持谓词下推,这意味着 WHERE 条件会被传递到每个文件的查询中,只读取满足条件的数据:
-- 只有 2026-06-15 之后有数据的文件才会被扫描
SELECT * FROM read_duckdb('daily_*.duckdb', 'events')
WHERE event_time >= '2026-06-15';
2. 使用 EXPLAIN ANALYZE 验证优化
EXPLAIN ANALYZE
SELECT
DATE_TRUNC('month', order_date) AS month,
SUM(total_amount) AS monthly_revenue
FROM read_duckdb('monthly_*.duckdb', 'orders')
WHERE order_date BETWEEN '2026-01-01' AND '2026-06-30'
GROUP BY DATE_TRUNC('month', order_date);
你会看到执行计划中每个文件的读取都被优化为只扫描相关分区。
3. 控制并发读取
import duckdb
# 设置最大并行读取的文件数
conn = duckdb.connect(config={
'max_threads': 8,
'temp_directory': '/tmp/duckdb_temp'
})
result = conn.execute("""
SELECT * FROM read_duckdb('large_set/*.duckdb', 'data')
""").fetchdf()
4. 与外部排序结合处理大数据集
当合并的数据量超过内存时,可以借助 DuckDB 的外部排序能力:
SET GLOBAL memory_limit = '4GB';
SET GLOBAL temp_directory = '/tmp/duckdb_temp';
SELECT * FROM read_duckdb('archive/*.duckdb', 'transactions')
ORDER BY transaction_date DESC
LIMIT 1000;
与传统方案的对比
| 维度 | Python 循环 + pandas | read_duckdb() |
|---|---|---|
| 代码行数 | 10-20 行 | 1-3 行 SQL |
| 内存效率 | 全量加载到内存 | 惰性读取 + 下推 |
| 过滤能力 | 读取后过滤 | 谓词下推到文件级 |
| 错误恢复 | 需手动处理异常 | SQL 层面统一处理 |
| 学习成本 | 需要 Python 编程 | 只需 SQL 知识 |
| 并行处理 | 需手动多线程 | 自动并行扫描 |
| 调试追踪 | 需额外逻辑 | 自动提供 _source_file |
实战场景:构建数据产品
场景一:竞品价格监控 SaaS
你可以用 read_duckdb() 构建一个自动化的竞品价格监控系统:
import duckdb
from datetime import datetime, timedelta
class PriceMonitor:
def __init__(self, db_dir="price_data"):
self.conn = duckdb.connect(
config={'temp_directory': '/tmp/duckdb_monitor'}
)
self.db_dir = db_dir
def get_price_trend(self, product_id, days=30):
"""获取指定商品过去 N 天的价格趋势"""
cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
query = f"""
SELECT
_source_file,
price,
competitor,
timestamp
FROM read_duckdb('{self.db_dir}/{{*.duckdb}}', 'prices')
WHERE product_id = '{product_id}'
AND timestamp >= '{cutoff}'
ORDER BY timestamp
"""
return self.conn.execute(query).fetchdf()
def get_best_deal(self, category, days=7):
"""找出过去 N 天内某类别的最优价格"""
query = f"""
SELECT
competitor,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS sample_count
FROM read_duckdb('{self.db_dir}/{{*.duckdb}}', 'prices')
WHERE category = '{category}'
GROUP BY competitor
ORDER BY avg_price ASC
LIMIT 5
"""
return self.conn.execute(query).fetchdf()
# 使用示例
monitor = PriceMonitor()
trend = monitor.get_price_trend("SKU-12345")
best_deals = monitor.get_best_deal("electronics")
这个系统的商业模式很简单:你收集不同电商平台的商品价格数据,按天存入独立的 DuckDB 文件,然后通过 API 提供给客户查询价格趋势和最优购买渠道。每个客户每月收取 50-200 元的订阅费。
场景二:自动化周报生成器
import duckdb
def generate_weekly_report():
"""自动生成每周销售报告"""
conn = duckdb.connect()
report = conn.execute("""
WITH daily_stats AS (
SELECT
_source_file AS report_date,
COUNT(DISTINCT customer_id) AS unique_customers,
COUNT(*) AS total_orders,
SUM(total_amount) AS total_revenue,
AVG(total_amount) AS avg_order_value
FROM read_duckdb('weekly_sales/{{*.duckdb}}', 'orders')
WHERE order_date >= CURRENT_DATE - INTERVAL 7 DAY
GROUP BY _source_file
)
SELECT
report_date,
unique_customers,
total_orders,
total_revenue,
avg_order_value,
LAG(total_revenue) OVER (ORDER BY report_date) AS prev_day_revenue,
ROUND(
(total_revenue - LAG(total_revenue) OVER (ORDER BY report_date))
/ NULLIF(LAG(total_revenue) OVER (ORDER BY report_date), 0) * 100, 2
) AS revenue_change_pct
FROM daily_stats
ORDER BY report_date
""").fetchdf()
return report
report = generate_weekly_report()
print(report.to_markdown(index=False))
场景三:数据质量审计工具
-- 检查多个文件中的数据一致性
SELECT
_source_file,
COUNT(*) AS row_count,
COUNT(DISTINCT id) AS unique_ids,
COUNT(*) - COUNT(DISTINCT id) AS duplicates,
COUNT(CASE WHEN email IS NULL THEN 1 END) AS missing_emails,
COUNT(CASE WHEN phone IS NULL THEN 1 END) AS missing_phones
FROM read_duckdb('customer_db/{{*.duckdb}}', 'customers')
GROUP BY _source_file
HAVING duplicates > 0 OR missing_emails > 100
ORDER BY row_count DESC;
与类似功能的对比
除了 read_duckdb(),DuckDB 还提供了其他文件读取功能:
| 函数 | 用途 | 适用场景 |
|---|---|---|
read_duckdb() | 读取 .duckdb 文件 | 合并同构数据库文件 |
read_csv_auto() | 读取 CSV 文件 | 文本数据源 |
read_parquet() | 读取 Parquet 文件 | 列式存储的大数据集 |
read_json_auto() | 读取 JSON 文件 | 半结构化数据 |
read_ndjson() | 读取 NDJSON 文件 | 流式 JSON 数据 |
read_duckdb() 的独特之处在于它能直接读取 DuckDB 的内部格式,保留完整的类型信息和索引结构,这是其他格式无法比拟的。
注意事项与最佳实践
1. 版本兼容性
read_duckdb() 需要 DuckDB ≥ 1.5.0。如果你的环境版本较低,可以通过以下方式升级:
pip install --upgrade duckdb
2. 文件路径规范
- 使用相对路径时,确保相对于 DuckDB 的工作目录
- 在 Windows 上使用反斜杠时需要转义或使用原始字符串
- glob 模式中的
*匹配当前目录下的文件,**递归匹配子目录
3. 安全性考虑
# ⚠️ 不要直接拼接用户输入到文件路径中
# 不安全
pattern = f"{user_input}/*.duckdb"
# ✅ 使用白名单验证
import pathlib
allowed_dir = pathlib.Path("/data/reports")
user_path = pathlib.Path(user_input)
if user_path.resolve().is_relative_to(allowed_dir.resolve()):
pattern = str(user_path) + "/*.duckdb"
4. 性能调优清单
- 对于超过 100 个文件的合并,设置
max_threads控制并发度 - 使用
temp_directory配置外部排序的临时空间 - 在文件上建立适当的索引,虽然
read_duckdb()本身会利用文件内部索引 - 避免在同一查询中对同一个文件多次调用
read_duckdb()
变现建议
read_duckdb() 这个功能虽然看起来只是一个小特性,但它解决的是一个真实且高频的需求——多文件数据合并。以下是几个可以直接变现的方向:
方向一:数据整合即服务(DIaaS)
为中小企业提供跨系统数据整合服务。很多公司使用不同的工具收集数据(CRM、ERP、电商平台),每个工具导出独立的 .duckdb 文件。你可以用 read_duckdb() 在几分钟内完成合并,然后交付可视化的分析报告。
定价策略:单次整合 500-2000 元,月度数据监控服务 1000-5000 元/月。
方向二:自动化报表 SaaS
基于 read_duckdb() 构建自动化的周报/月报系统。客户每天生成独立的数据库文件,你通过定时任务合并分析,自动生成图表和洞察。
技术栈:DuckDB + Streamlit/Evidence + 定时任务 = 一套完整的报表产品。
定价策略:基础版 99 元/月,专业版 299 元/月。
方向三:数据审计工具
面向数据团队,提供跨文件的数据质量审计服务。检查数据一致性、重复记录、缺失值等问题,自动生成审计报告。
定价策略:按文件数量收费,100-500 元/次。
方向四:教育课程
将 read_duckdb() 的使用技巧整理成课程,教数据分析师如何高效处理多文件场景。这类实战教程在知识付费平台很受欢迎。
定价策略:单课 199-499 元,系列课程打包 999 元。
总结
read_duckdb() 是 DuckDB 1.5+ 中最实用的功能之一。它将原本需要多行 Python 代码才能完成的多文件合并操作,简化为一行 SQL:
SELECT * FROM read_duckdb('data_*.duckdb', 'orders');
更重要的是,它带来了真正的性能优势——谓词下推、惰性读取、自动并行,这些都是传统 Python 循环方案难以企及的。
如果你正在处理多文件数据合并的场景,不要再写冗长的 Python 循环了。试试 read_duckdb(),你会发现数据处理的速度和质量都有了质的提升。
📖 更多 DuckDB 实用函数和实战案例 → duckdblab.org