Featured image of post DuckDB Python 集成指南:从安装到高级数据分析

DuckDB Python 集成指南:从安装到高级数据分析

完整的 DuckDB Python 集成教程,涵盖安装配置、SQL 查询、Pandas DataFrame 操作、Parquet/CSV/JSON 读写、参数化查询和性能优化,适合数据分析师和 Python 开发者。

引言

作为一名 Python 数据分析师,你大概率遇到过 Pandas 内存不足和 SQL 查询能力受限的双重困境。DuckDB Python 提供了一个完美的解决方案——它是一个嵌入在 Python 进程中的 SQL OLAP 数据库,无需安装任何外部服务,通过向量化执行引擎提供比 Pandas 快 10-50 倍的查询性能。

本指南将从零开始,手把手教你如何将 DuckDB 集成到 Python 工作流中:从一行 pip install 到零拷贝 DataFrame 查询、多文件 Parquet 分析,再到生产环境中的参数化 SQL 管道。

如果你对 DuckDB 还不熟悉,建议先阅读我们的 DuckDB 安装指南DuckDB 入门指南 2026


1. 安装 DuckDB Python 包

在 Python 中使用 DuckDB 只需一条命令:

pip install duckdb

这个命令会安装 duckdb Python 包,它把 DuckDB 整套 C++ 引擎作为原生扩展打包进来。不需要安装单独的服务器、JDBC 驱动或 Docker 容器——只凭一个 import 就能使用。

验证安装

import duckdb
print(duckdb.__version__)
# 输出示例:1.2.0

安装可选扩展

pip install duckdb duckdb-extensions  # 可选:安装扩展管理工具

小贴士: DuckDB Python 支持 Python 3.8 到 3.13 版本,兼容 Linux、macOS 和 Windows。每个平台的安装包只有约 15MB。


2. 基础连接与查询执行

DuckDB 提供两种连接模式,都可以从 Python 中使用。

内存数据库(默认)

大部分数据分析场景下,使用内存数据库就够了:

import duckdb

# 创建内存数据库连接
conn = duckdb.connect()

# 或直接使用默认连接
result = duckdb.sql("SELECT '你好,DuckDB!' AS greeting")
print(result)

输出:

┌─────────────────┐
│    greeting     │
│    varchar      │
├─────────────────┤
│ 你好,DuckDB!  │
└─────────────────┘

持久化数据库

如果你需要数据持久化,可以指定数据库文件路径:

conn = duckdb.connect('my_analysis.db')
conn.sql("CREATE TABLE users (id INTEGER, name VARCHAR, city VARCHAR)")
conn.sql("INSERT INTO users VALUES (1, '张三', '北京'), (2, '李四', '上海')")
conn.sql("SELECT * FROM users").show()

获取查询结果

DuckDB 提供多种方式获取结果:

result = duckdb.sql("SELECT unnest([10, 20, 30]) AS value")

# 作为元组列表
rows = result.fetchall()       # [(10,), (20,), (30,)]

# 作为 Pandas DataFrame
df = result.fetchdf()          # 一列 'value' 的 DataFrame

# 作为字典列表
dicts = result.fetchdf().to_dict('records')  # [{'value': 10}, ...]

# 作为 Arrow 表
import pyarrow as pa
arrow_table = result.fetch_arrow_table()

💡 fetchdf() 是最常用的方法——它无缝连接了 DuckDB 和 Pandas,让 DuckDB 负责重活,Pandas 用于可视化或后续处理。


3. DuckDB + Pandas DataFrame 集成

这是 DuckDB Python 最惊艳的功能之一:你可以直接在 Pandas DataFrame 上运行 SQL 查询,完全不需要复制数据

用 SQL 查询 DataFrame

import pandas as pd
import duckdb

# 创建一个 Pandas DataFrame
df = pd.DataFrame({
    'product': ['Widget A', 'Widget B', 'Widget C', 'Widget A'],
    'category': ['电子产品', '电子产品', '家居', '电子产品'],
    'price': [29.99, 49.99, 15.99, 34.99],
    'quantity': [100, 75, 200, 50]
})

# 直接在 DataFrame 上运行 SQL —— 零拷贝!
result = duckdb.sql("""
    SELECT
        product,
        category,
        SUM(price * quantity) AS total_revenue,
        AVG(price) AS avg_price,
        COUNT(*) AS transaction_count
    FROM df
    WHERE price > 20
    GROUP BY product, category
    ORDER BY total_revenue DESC
""").fetchdf()

print(result)
    product  category  total_revenue  avg_price  transaction_count
0  Widget B  电子产品        3749.25      49.99                  1
1  Widget A  电子产品        3498.50      32.49                  2

一个查询 JOIN 多个 DataFrame

你可以同时 JOIN 多个 DataFrame,也可以混用 DataFrame、CSV 文件和数据库表:

orders = pd.DataFrame({
    'order_id': [1, 2, 3],
    'customer_id': [101, 102, 101],
    'amount': [250.0, 180.0, 320.0]
})

customers = pd.DataFrame({
    'customer_id': [101, 102, 103],
    'name': ['张三', '李四', '王五'],
    'city': ['北京', '上海', '广州']
})

result = duckdb.sql("""
    SELECT
        c.name,
        c.city,
        COUNT(o.order_id) AS order_count,
        SUM(o.amount) AS total_spent
    FROM customers AS c
    LEFT JOIN orders AS o ON c.customer_id = o.customer_id
    GROUP BY c.name, c.city
    ORDER BY total_spent DESC NULLS LAST
""").fetchdf()

print(result)
  name  city  order_count  total_spent
0  张三  北京            2        570.0
1  李四  上海            1        180.0
2  王五  广州            0          NaN

零拷贝的工作原理

DuckDB 的 Python 客户端不会序列化你的 DataFrame——它通过 Apache Arrow 直接读取底层的 NumPy/Pandas 列式数据。这意味着:

  • 无需内存复制——Pandas 看到的数据就是 DuckDB 查询的数据
  • 零配置——不需要 CREATE TABLE 或数据加载
  • 无缝来回转换——查询 DataFrame → 得到 DataFrame

4. 参数化查询

在构建生产管道或交互式应用时,你需要参数化查询来防止 SQL 注入并安全处理动态值。

使用 ? 占位符

duckdb.sql("SELECT * FROM df WHERE price > ? AND category = ?", [30.0, '电子产品']).show()

命名参数

min_price = 25.0
target_category = '家居'

duckdb.sql("""
    SELECT * FROM df
    WHERE price >= $min_price AND category = $target_category
""", params={'min_price': min_price, 'target_category': target_category}).show()

参数化 INSERT

conn = duckdb.connect()
conn.execute("CREATE TABLE IF NOT EXISTS sales (product VARCHAR, amount DECIMAL(10,2), sale_date DATE)")

products = ['Widget A', 'Widget B', 'Widget C']
amounts = [99.99, 149.99, 79.99]
dates = ['2026-01-15', '2026-01-16', '2026-01-17']

for p, a, d in zip(products, amounts, dates):
    conn.execute("INSERT INTO sales VALUES (?, ?, ?)", [p, a, d])

conn.sql("SELECT * FROM sales").show()

批量插入

executemany 提升大批量数据插入性能:

data = [
    ('Widget D', 199.99, '2026-02-01'),
    ('Widget E', 249.99, '2026-02-02'),
    ('Widget F', 129.99, '2026-02-03'),
]
conn.executemany("INSERT INTO sales VALUES (?, ?, ?)", data)

5. 读写 CSV、Parquet 和 JSON

DuckDB 的 read_csv_autoread_parquetread_json_auto 函数让文件读写变得极其简单。

CSV 文件

# 直接读取 CSV 文件到 DuckDB 关系
rel = duckdb.sql("SELECT * FROM read_csv_auto('data/sales_2026.csv')")
print(rel.fetchdf().head())

# 带选项的读取
rel = duckdb.sql("""
    SELECT * FROM read_csv_auto(
        'data/sales_2026.csv',
        header=true,
        delim=',',
        dateformat='%Y-%m-%d',
        all_varchar=true
    )
""")

# 将查询结果写入 CSV
duckdb.sql("COPY (SELECT * FROM read_csv_auto('input.csv') WHERE amount > 100) TO 'filtered_output.csv' (HEADER, DELIMITER ',')")

Parquet 文件

Parquet 是 DuckDB 表现最出色的格式——列式存储与 DuckDB 的向量化引擎完美匹配。

# 读取 Parquet 文件
df = duckdb.sql("SELECT * FROM read_parquet('data/analytics.parquet')").fetchdf()

# 使用通配符读取多个 Parquet 文件
df = duckdb.sql("SELECT * FROM read_parquet('data/monthly/*.parquet')").fetchdf()

# 读取分区 Parquet 数据集
df = duckdb.sql("""
    SELECT * FROM read_parquet('data/year=2026/month=*/*.parquet')
    WHERE region = '亚太'
""").fetchdf()

# 将查询结果写入 Parquet
duckdb.sql("""
    COPY (
        SELECT region, SUM(revenue) AS total
        FROM read_parquet('data/*.parquet')
        GROUP BY region
    ) TO 'region_totals.parquet' (FORMAT PARQUET)
""")

JSON 文件

DuckDB 同时支持换行符分隔 JSON 和标准 JSON 数组:

# NDJSON(每行一个 JSON 对象)
df = duckdb.sql("SELECT * FROM read_json_auto('data/events.ndjson')").fetchdf()

# JSON 数组
df = duckdb.sql("SELECT * FROM read_json_auto('data/array.json')").fetchdf()

# 嵌套 JSON 自动展平
df = duckdb.sql("""
    SELECT
        id,
        user.name AS user_name,
        user.address.city AS city,
        metadata.timestamp::TIMESTAMP AS event_time
    FROM read_json_auto('data/complex.json')
""").fetchdf()

# 写入 JSON
duckdb.sql("""
    COPY (SELECT * FROM read_parquet('data.parquet') LIMIT 1000)
    TO 'sample.json' (FORMAT JSON)
""")

6. 在 Python 中使用 DuckDB 做数据分析

除了基本的查询功能,DuckDB Python 还支持强大的分析工作流。

链式调用

DuckDB 的关系 API 支持方法链:

rel = duckdb.sql("SELECT * FROM read_csv_auto('transactions.csv')")

# 链式过滤和聚合
result = (
    rel.filter("amount > 50")
       .aggregate("customer_id, SUM(amount) AS total, COUNT(*) AS txns")
       .order("total DESC")
       .limit(10)
       .fetchdf()
)

窗口函数

result = duckdb.sql("""
    SELECT
        product,
        sale_date,
        amount,
        SUM(amount) OVER (PARTITION BY product ORDER BY sale_date) AS running_total,
        AVG(amount) OVER (PARTITION BY product ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg_3
    FROM read_csv_auto('daily_sales.csv')
    ORDER BY product, sale_date
""").fetchdf()

统计分析

result = duckdb.sql("""
    SELECT
        category,
        COUNT(*) AS n,
        AVG(price) AS mean_price,
        STDDEV(price) AS std_price,
        MIN(price) AS min_price,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_price,
        MAX(price) AS max_price,
        CORR(price, quantity) AS price_qty_correlation
    FROM read_parquet('products/*.parquet')
    GROUP BY category
""").fetchdf()

创建视图实现可复用的分析逻辑

conn = duckdb.connect()

# 将 Pandas DataFrame 注册为视图
conn.register('orders_view', orders_df)

# 创建 SQL 视图封装可复用逻辑
conn.sql("""
    CREATE VIEW high_value_orders AS
    SELECT * FROM orders_view
    WHERE amount > 500 AND status = 'completed'
""")

# 在后续查询中使用视图
hourly_stats = conn.sql("""
    SELECT
        DATE_TRUNC('hour', order_time) AS hour,
        COUNT(*) AS orders,
        SUM(amount) AS revenue
    FROM high_value_orders
    GROUP BY hour
    ORDER BY hour
""").fetchdf()

7. Python 用户性能优化技巧

1. 下推过滤和投影

DuckDB 可以将过滤条件直接推入 Parquet/CSV 读取阶段,尽可能先过滤再 JOIN:

# ❌ 慢:先读全部数据再过滤
df = duckdb.sql("""
    SELECT * FROM (
        SELECT * FROM read_parquet('huge_dataset/*.parquet')
    ) WHERE region = '中国'
""").fetchdf()

# ✅ 快:过滤条件被推入读取器
df = duckdb.sql("""
    SELECT * FROM read_parquet('huge_dataset/*.parquet')
    WHERE region = '中国'
""").fetchdf()

2. 优先使用 Parquet 而非 CSV

Parquet 在分析查询中比 CSV 快 10-100 倍:

# 慢
duckdb.sql("SELECT * FROM read_csv_auto('data.csv') WHERE date > '2026-01-01'")

# 快
duckdb.sql("SELECT * FROM read_parquet('data.parquet') WHERE date > '2026-01-01'")

3. 显式设置内存限制

# 限制 DuckDB 内存使用
duckdb.sql("SET memory_limit = '4GB'")

# 设置线程数
duckdb.sql("SET threads = 4")

4. 合理使用 fetchdf()

只在需要 Pandas 特有功能时才转换结果:

# ❌ 不必要的转换
df = duckdb.sql("SELECT * FROM large_table").fetchdf()
# 然后再做 DuckDB 操作
df2 = duckdb.sql("SELECT COUNT(*) FROM df").fetchdf()

# ✅ 尽可能留在 DuckDB 中
count = duckdb.sql("SELECT COUNT(*) FROM large_table").fetchone()[0]

5. 注册大 DataFrame 而非重复传递

对于需要反复查询的 DataFrame,一次注册即可:

# ❌ 每次重新解析
for i in range(100):
    duckdb.sql("SELECT COUNT(*) FROM my_df WHERE amount > ?", [i])

# ✅ 注册一次,高效复用
conn = duckdb.connect()
conn.register('my_df', my_df)
for i in range(100):
    conn.sql("SELECT COUNT(*) FROM my_df WHERE amount > ?", [i])

实战示例:完整分析管道

下面是一个综合了上述所有技巧的真实 Python 分析示例:

import duckdb
import pandas as pd
from datetime import datetime

# 连接到持久化数据库
conn = duckdb.connect('retail_analysis.db')

# 1. 从多个来源加载原始数据
conn.sql("""
    CREATE TABLE daily_sales AS
    SELECT * FROM read_csv_auto('data/sales_2026.csv')
""")

# 2. 导入 BI 导出的 Parquet 数据
conn.sql("""
    INSERT INTO daily_sales
    SELECT * FROM read_parquet('data/bi_export/*.parquet')
""")

# 3. 执行分析查询
monthly_performance = conn.sql("""
    SELECT
        DATE_TRUNC('month', sale_date) AS month,
        product_category,
        COUNT(DISTINCT customer_id) AS unique_customers,
        SUM(quantity * unit_price) AS revenue,
        SUM(quantity * unit_price) / NULLIF(COUNT(DISTINCT customer_id), 0) AS revenue_per_customer
    FROM daily_sales
    WHERE sale_date >= '2026-01-01'
    GROUP BY ALL
    HAVING revenue > 10000
    ORDER BY month, revenue DESC
""").fetchdf()

# 4. 与 Pandas DataFrame 混合分析(例如从 CRM 导出的客户分层数据)
segments = pd.read_csv('data/customer_segments.csv')
blended = conn.sql("""
    SELECT
        s.month,
        s.product_category,
        s.revenue,
        cs.segment,
        cs.region
    FROM monthly_performance AS s
    JOIN segments AS cs ON s.product_category = cs.category
    WHERE cs.segment IN ('VIP', '企业客户')
""").fetchdf()

# 5. 导出最终结果
conn.sql("""
    COPY (
        SELECT * FROM blended
    ) TO 'analysis_output.parquet' (FORMAT PARQUET)
""")

print("分析完成!结果已保存到 analysis_output.parquet")
print(blended.head())

总结

DuckDB Python 集成提供了一个独特而强大的组合:SQL 完整的分析能力和 Python 的灵活性,全部集成在单个进程中,无需任何外部依赖。无论你是要替换缓慢的 Pandas groupby 操作、构建跨 CSV/Parquet/JSON 的 ETL 管道,还是创建交互式数据分析应用,DuckDB 都能提供所需的性能和简洁性。

核心要点回顾:

  • 安装极其简单——pip install duckdb,一行命令搞定
  • Pandas 集成无缝——直接在 DataFrame 上运行 SQL,零拷贝
  • 多格式原生支持——CSV、Parquet、JSON 全部开箱即用
  • 生产环境就绪——参数化查询、内存限制、线程控制一应俱全
  • 性能优先——向量化执行、过滤器下推、列式存储加速

现在就尝试将 DuckDB 集成到你的 Python 数据分析工作中吧!更多 DuckDB 教程,请参考我们的 安装指南入门指南 2026。记住:最快的 Python 数据处理代码,就是把重活交给 DuckDB 的代码。

📺 Watch video tutorials → DuckDB Lab YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计