Featured image of post DuckDB 一键合并多个数据库文件:read_duckdb() 完全指南

DuckDB 一键合并多个数据库文件:read_duckdb() 完全指南

DuckDB 1.5+ 引入的 read_duckdb() 函数让你能在纯 SQL 中直接读取并合并多个 .duckdb 文件,无需编写任何 Python 循环。本文涵盖语法、性能优化、实战场景和变现建议。

DuckDB read_duckdb() 架构图

引言:一个困扰无数数据工作者的痛点

假设你是一名数据工程师或独立开发者,每天需要处理大量按天、按月或按项目分割的 DuckDB 数据库文件。常见的场景包括:

  • 按天分片的业务数据库:每个 daily_20260601.duckdb 文件存储一天的订单数据
  • 多项目汇总分析:三个团队各自维护一份 .duckdb 文件,你需要跨项目做对比
  • 备份文件检索backup/ 目录下有数百个历史备份,需要快速查找某条记录
  • 数据湖归档查询:冷数据存储为独立的 DuckDB 文件,需要临时聚合分析

在没有 read_duckdb() 之前,你的解决方案通常是这样的:

import duckdb

results = []
for f in glob.glob("data_*.duckdb"):
    conn = duckdb.connect(f)
    results.append(conn.execute("SELECT * FROM orders").fetchdf())
    conn.close()

merged = pd.concat(results)

这段代码的问题很明显:

  1. 每次都要建立连接再关闭,开销大且容易忘记关闭导致资源泄漏
  2. 数据全部加载到内存,面对几百 GB 的文件时直接 OOM
  3. 无法下推过滤条件,必须先全量读取再过滤
  4. 代码冗长,简单的合并操作需要十几行 Python

read_duckdb() 的出现,彻底改变了这一切。

read_duckdb() 是什么?

read_duckdb() 是 DuckDB 1.5.0(Variegata 版本)引入的一个内置函数,允许你在 SQL 中直接读取 .duckdb 文件中的表数据,并支持 glob 通配符匹配多个文件。

它的核心语法非常简单:

read_duckdb(file_pattern [, table_name])
  • file_pattern:文件路径模式,支持 *? 通配符
  • table_name(可选):要读取的表名;不指定则返回文件中的所有表

基础用法:从简单到复杂

读取单个文件

-- 读取 backup.db 中默认的第一个表
SELECT * FROM read_duckdb('backup.db');

-- 读取指定表
SELECT * FROM read_duckdb('backup.db', 'orders');

合并多个文件

-- 合并所有 daily_*.duckdb 文件中的 orders 表
SELECT * FROM read_duckdb('daily_*.duckdb', 'orders');

-- 递归读取子目录中的所有 .duckdb 文件
SELECT * FROM read_duckdb('archive/**/*.duckdb', 'metrics');

配合 SQL 操作

-- 带过滤条件的合并查询
SELECT order_id, customer_id, total_amount
FROM read_duckdb('daily_*.duckdb', 'orders')
WHERE order_date >= '2026-06-01'
  AND total_amount > 100;

-- 聚合统计
SELECT 
    DATE_TRUNC('day', order_date) AS day,
    COUNT(*) AS order_count,
    SUM(total_amount) AS revenue,
    AVG(total_amount) AS avg_order_value
FROM read_duckdb('daily_*.duckdb', 'orders')
GROUP BY DATE_TRUNC('day', order_date)
ORDER BY day;

Python 集成:在生产环境中使用

在实际项目中,你通常会通过 DuckDB 的 Python 绑定来使用这个功能:

import duckdb
import glob

# 方法一:直接使用 SQL
conn = duckdb.connect()
result = conn.execute("""
    SELECT 
        _source_file AS file_source,
        COUNT(*) AS order_count,
        SUM(total_amount) AS total_revenue
    FROM read_duckdb('daily_reports/*.duckdb', 'orders')
    GROUP BY _source_file
    ORDER BY order_count DESC
""").fetchdf()

print(result)

关键特性:_source_file 伪列

当你使用 read_duckdb() 读取多个文件时,DuckDB 会自动为结果添加一个名为 _source_file 的列,记录每一行数据来源于哪个文件。这个特性在调试和数据溯源时非常有用:

SELECT 
    _source_file,
    order_id,
    customer_id,
    total_amount,
    order_date
FROM read_duckdb('daily_*.duckdb', 'orders')
LIMIT 10;

输出示例:

┌───────────────────────────┬──────────┬─────────────┬──────────────┬────────────┐
│       _source_file        │ order_id │ customer_id │ total_amount │ order_date │
├───────────────────────────┼──────────┼─────────────┼──────────────┼────────────┤
│ daily_20260601.duckdb     │ 10001    │ C-5023      │ 259.99       │ 2026-06-01 │
│ daily_20260601.duckdb     │ 10002    │ C-7891      │ 89.50        │ 2026-06-01 │
│ daily_20260602.duckdb     │ 10003    │ C-3344      │ 432.00       │ 2026-06-02 │
│ daily_20260602.duckdb     │ 10004    │ C-1122      │ 67.80        │ 2026-06-02 │
└───────────────────────────┴──────────┴─────────────┴──────────────┴────────────┘

性能优化:让合并查询飞起来

1. 谓词下推(Predicate Pushdown)

read_duckdb() 支持谓词下推,这意味着 WHERE 条件会被传递到每个文件的查询中,只读取满足条件的数据:

-- 只有 2026-06-15 之后有数据的文件才会被扫描
SELECT * FROM read_duckdb('daily_*.duckdb', 'events')
WHERE event_time >= '2026-06-15';

2. 使用 EXPLAIN ANALYZE 验证优化

EXPLAIN ANALYZE
SELECT 
    DATE_TRUNC('month', order_date) AS month,
    SUM(total_amount) AS monthly_revenue
FROM read_duckdb('monthly_*.duckdb', 'orders')
WHERE order_date BETWEEN '2026-01-01' AND '2026-06-30'
GROUP BY DATE_TRUNC('month', order_date);

你会看到执行计划中每个文件的读取都被优化为只扫描相关分区。

3. 控制并发读取

import duckdb

# 设置最大并行读取的文件数
conn = duckdb.connect(config={
    'max_threads': 8,
    'temp_directory': '/tmp/duckdb_temp'
})

result = conn.execute("""
    SELECT * FROM read_duckdb('large_set/*.duckdb', 'data')
""").fetchdf()

4. 与外部排序结合处理大数据集

当合并的数据量超过内存时,可以借助 DuckDB 的外部排序能力:

SET GLOBAL memory_limit = '4GB';
SET GLOBAL temp_directory = '/tmp/duckdb_temp';

SELECT * FROM read_duckdb('archive/*.duckdb', 'transactions')
ORDER BY transaction_date DESC
LIMIT 1000;

与传统方案的对比

维度Python 循环 + pandasread_duckdb()
代码行数10-20 行1-3 行 SQL
内存效率全量加载到内存惰性读取 + 下推
过滤能力读取后过滤谓词下推到文件级
错误恢复需手动处理异常SQL 层面统一处理
学习成本需要 Python 编程只需 SQL 知识
并行处理需手动多线程自动并行扫描
调试追踪需额外逻辑自动提供 _source_file

实战场景:构建数据产品

场景一:竞品价格监控 SaaS

你可以用 read_duckdb() 构建一个自动化的竞品价格监控系统:

import duckdb
from datetime import datetime, timedelta

class PriceMonitor:
    def __init__(self, db_dir="price_data"):
        self.conn = duckdb.connect(
            config={'temp_directory': '/tmp/duckdb_monitor'}
        )
        self.db_dir = db_dir
    
    def get_price_trend(self, product_id, days=30):
        """获取指定商品过去 N 天的价格趋势"""
        cutoff = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d')
        
        query = f"""
        SELECT 
            _source_file,
            price,
            competitor,
            timestamp
        FROM read_duckdb('{self.db_dir}/{{*.duckdb}}', 'prices')
        WHERE product_id = '{product_id}'
          AND timestamp >= '{cutoff}'
        ORDER BY timestamp
        """
        
        return self.conn.execute(query).fetchdf()
    
    def get_best_deal(self, category, days=7):
        """找出过去 N 天内某类别的最优价格"""
        query = f"""
        SELECT 
            competitor,
            AVG(price) AS avg_price,
            MIN(price) AS min_price,
            MAX(price) AS max_price,
            COUNT(*) AS sample_count
        FROM read_duckdb('{self.db_dir}/{{*.duckdb}}', 'prices')
        WHERE category = '{category}'
        GROUP BY competitor
        ORDER BY avg_price ASC
        LIMIT 5
        """
        
        return self.conn.execute(query).fetchdf()

# 使用示例
monitor = PriceMonitor()
trend = monitor.get_price_trend("SKU-12345")
best_deals = monitor.get_best_deal("electronics")

这个系统的商业模式很简单:你收集不同电商平台的商品价格数据,按天存入独立的 DuckDB 文件,然后通过 API 提供给客户查询价格趋势和最优购买渠道。每个客户每月收取 50-200 元的订阅费。

场景二:自动化周报生成器

import duckdb

def generate_weekly_report():
    """自动生成每周销售报告"""
    conn = duckdb.connect()
    
    report = conn.execute("""
        WITH daily_stats AS (
            SELECT 
                _source_file AS report_date,
                COUNT(DISTINCT customer_id) AS unique_customers,
                COUNT(*) AS total_orders,
                SUM(total_amount) AS total_revenue,
                AVG(total_amount) AS avg_order_value
            FROM read_duckdb('weekly_sales/{{*.duckdb}}', 'orders')
            WHERE order_date >= CURRENT_DATE - INTERVAL 7 DAY
            GROUP BY _source_file
        )
        SELECT 
            report_date,
            unique_customers,
            total_orders,
            total_revenue,
            avg_order_value,
            LAG(total_revenue) OVER (ORDER BY report_date) AS prev_day_revenue,
            ROUND(
                (total_revenue - LAG(total_revenue) OVER (ORDER BY report_date)) 
                / NULLIF(LAG(total_revenue) OVER (ORDER BY report_date), 0) * 100, 2
            ) AS revenue_change_pct
        FROM daily_stats
        ORDER BY report_date
    """).fetchdf()
    
    return report

report = generate_weekly_report()
print(report.to_markdown(index=False))

场景三:数据质量审计工具

-- 检查多个文件中的数据一致性
SELECT 
    _source_file,
    COUNT(*) AS row_count,
    COUNT(DISTINCT id) AS unique_ids,
    COUNT(*) - COUNT(DISTINCT id) AS duplicates,
    COUNT(CASE WHEN email IS NULL THEN 1 END) AS missing_emails,
    COUNT(CASE WHEN phone IS NULL THEN 1 END) AS missing_phones
FROM read_duckdb('customer_db/{{*.duckdb}}', 'customers')
GROUP BY _source_file
HAVING duplicates > 0 OR missing_emails > 100
ORDER BY row_count DESC;

与类似功能的对比

除了 read_duckdb(),DuckDB 还提供了其他文件读取功能:

函数用途适用场景
read_duckdb()读取 .duckdb 文件合并同构数据库文件
read_csv_auto()读取 CSV 文件文本数据源
read_parquet()读取 Parquet 文件列式存储的大数据集
read_json_auto()读取 JSON 文件半结构化数据
read_ndjson()读取 NDJSON 文件流式 JSON 数据

read_duckdb() 的独特之处在于它能直接读取 DuckDB 的内部格式,保留完整的类型信息和索引结构,这是其他格式无法比拟的。

注意事项与最佳实践

1. 版本兼容性

read_duckdb() 需要 DuckDB ≥ 1.5.0。如果你的环境版本较低,可以通过以下方式升级:

pip install --upgrade duckdb

2. 文件路径规范

  • 使用相对路径时,确保相对于 DuckDB 的工作目录
  • 在 Windows 上使用反斜杠时需要转义或使用原始字符串
  • glob 模式中的 * 匹配当前目录下的文件,** 递归匹配子目录

3. 安全性考虑

# ⚠️ 不要直接拼接用户输入到文件路径中
# 不安全
pattern = f"{user_input}/*.duckdb"

# ✅ 使用白名单验证
import pathlib
allowed_dir = pathlib.Path("/data/reports")
user_path = pathlib.Path(user_input)
if user_path.resolve().is_relative_to(allowed_dir.resolve()):
    pattern = str(user_path) + "/*.duckdb"

4. 性能调优清单

  • 对于超过 100 个文件的合并,设置 max_threads 控制并发度
  • 使用 temp_directory 配置外部排序的临时空间
  • 在文件上建立适当的索引,虽然 read_duckdb() 本身会利用文件内部索引
  • 避免在同一查询中对同一个文件多次调用 read_duckdb()

变现建议

read_duckdb() 这个功能虽然看起来只是一个小特性,但它解决的是一个真实且高频的需求——多文件数据合并。以下是几个可以直接变现的方向:

方向一:数据整合即服务(DIaaS)

为中小企业提供跨系统数据整合服务。很多公司使用不同的工具收集数据(CRM、ERP、电商平台),每个工具导出独立的 .duckdb 文件。你可以用 read_duckdb() 在几分钟内完成合并,然后交付可视化的分析报告。

定价策略:单次整合 500-2000 元,月度数据监控服务 1000-5000 元/月。

方向二:自动化报表 SaaS

基于 read_duckdb() 构建自动化的周报/月报系统。客户每天生成独立的数据库文件,你通过定时任务合并分析,自动生成图表和洞察。

技术栈:DuckDB + Streamlit/Evidence + 定时任务 = 一套完整的报表产品。

定价策略:基础版 99 元/月,专业版 299 元/月。

方向三:数据审计工具

面向数据团队,提供跨文件的数据质量审计服务。检查数据一致性、重复记录、缺失值等问题,自动生成审计报告。

定价策略:按文件数量收费,100-500 元/次。

方向四:教育课程

read_duckdb() 的使用技巧整理成课程,教数据分析师如何高效处理多文件场景。这类实战教程在知识付费平台很受欢迎。

定价策略:单课 199-499 元,系列课程打包 999 元。

总结

read_duckdb() 是 DuckDB 1.5+ 中最实用的功能之一。它将原本需要多行 Python 代码才能完成的多文件合并操作,简化为一行 SQL:

SELECT * FROM read_duckdb('data_*.duckdb', 'orders');

更重要的是,它带来了真正的性能优势——谓词下推、惰性读取、自动并行,这些都是传统 Python 循环方案难以企及的。

如果你正在处理多文件数据合并的场景,不要再写冗长的 Python 循环了。试试 read_duckdb(),你会发现数据处理的速度和质量都有了质的提升。

📖 更多 DuckDB 实用函数和实战案例 → duckdblab.org

📺 Watch video tutorials → DuckDB Lab YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计

⚠️ 本站为独立社区项目,与 DuckDB 基金会及 DuckDB 官方项目无任何从属、背书或赞助关系。

"DuckDB" 是 DuckDB 基金会的注册商标,本站仅以事实描述方式使用该名称。

本站内容仅供教育与社区推广用途,不构成任何商业服务。