引言
作为一名 Python 数据分析师,你大概率遇到过 Pandas 内存不足和 SQL 查询能力受限的双重困境。DuckDB Python 提供了一个完美的解决方案——它是一个嵌入在 Python 进程中的 SQL OLAP 数据库,无需安装任何外部服务,通过向量化执行引擎提供比 Pandas 快 10-50 倍的查询性能。
本指南将从零开始,手把手教你如何将 DuckDB 集成到 Python 工作流中:从一行 pip install 到零拷贝 DataFrame 查询、多文件 Parquet 分析,再到生产环境中的参数化 SQL 管道。
如果你对 DuckDB 还不熟悉,建议先阅读我们的 DuckDB 安装指南 和 DuckDB 入门指南 2026。
1. 安装 DuckDB Python 包
在 Python 中使用 DuckDB 只需一条命令:
pip install duckdb
这个命令会安装 duckdb Python 包,它把 DuckDB 整套 C++ 引擎作为原生扩展打包进来。不需要安装单独的服务器、JDBC 驱动或 Docker 容器——只凭一个 import 就能使用。
验证安装
import duckdb
print(duckdb.__version__)
# 输出示例:1.2.0
安装可选扩展
pip install duckdb duckdb-extensions # 可选:安装扩展管理工具
小贴士: DuckDB Python 支持 Python 3.8 到 3.13 版本,兼容 Linux、macOS 和 Windows。每个平台的安装包只有约 15MB。
2. 基础连接与查询执行
DuckDB 提供两种连接模式,都可以从 Python 中使用。
内存数据库(默认)
大部分数据分析场景下,使用内存数据库就够了:
import duckdb
# 创建内存数据库连接
conn = duckdb.connect()
# 或直接使用默认连接
result = duckdb.sql("SELECT '你好,DuckDB!' AS greeting")
print(result)
输出:
┌─────────────────┐
│ greeting │
│ varchar │
├─────────────────┤
│ 你好,DuckDB! │
└─────────────────┘
持久化数据库
如果你需要数据持久化,可以指定数据库文件路径:
conn = duckdb.connect('my_analysis.db')
conn.sql("CREATE TABLE users (id INTEGER, name VARCHAR, city VARCHAR)")
conn.sql("INSERT INTO users VALUES (1, '张三', '北京'), (2, '李四', '上海')")
conn.sql("SELECT * FROM users").show()
获取查询结果
DuckDB 提供多种方式获取结果:
result = duckdb.sql("SELECT unnest([10, 20, 30]) AS value")
# 作为元组列表
rows = result.fetchall() # [(10,), (20,), (30,)]
# 作为 Pandas DataFrame
df = result.fetchdf() # 一列 'value' 的 DataFrame
# 作为字典列表
dicts = result.fetchdf().to_dict('records') # [{'value': 10}, ...]
# 作为 Arrow 表
import pyarrow as pa
arrow_table = result.fetch_arrow_table()
💡
fetchdf()是最常用的方法——它无缝连接了 DuckDB 和 Pandas,让 DuckDB 负责重活,Pandas 用于可视化或后续处理。
3. DuckDB + Pandas DataFrame 集成
这是 DuckDB Python 最惊艳的功能之一:你可以直接在 Pandas DataFrame 上运行 SQL 查询,完全不需要复制数据。
用 SQL 查询 DataFrame
import pandas as pd
import duckdb
# 创建一个 Pandas DataFrame
df = pd.DataFrame({
'product': ['Widget A', 'Widget B', 'Widget C', 'Widget A'],
'category': ['电子产品', '电子产品', '家居', '电子产品'],
'price': [29.99, 49.99, 15.99, 34.99],
'quantity': [100, 75, 200, 50]
})
# 直接在 DataFrame 上运行 SQL —— 零拷贝!
result = duckdb.sql("""
SELECT
product,
category,
SUM(price * quantity) AS total_revenue,
AVG(price) AS avg_price,
COUNT(*) AS transaction_count
FROM df
WHERE price > 20
GROUP BY product, category
ORDER BY total_revenue DESC
""").fetchdf()
print(result)
product category total_revenue avg_price transaction_count
0 Widget B 电子产品 3749.25 49.99 1
1 Widget A 电子产品 3498.50 32.49 2
一个查询 JOIN 多个 DataFrame
你可以同时 JOIN 多个 DataFrame,也可以混用 DataFrame、CSV 文件和数据库表:
orders = pd.DataFrame({
'order_id': [1, 2, 3],
'customer_id': [101, 102, 101],
'amount': [250.0, 180.0, 320.0]
})
customers = pd.DataFrame({
'customer_id': [101, 102, 103],
'name': ['张三', '李四', '王五'],
'city': ['北京', '上海', '广州']
})
result = duckdb.sql("""
SELECT
c.name,
c.city,
COUNT(o.order_id) AS order_count,
SUM(o.amount) AS total_spent
FROM customers AS c
LEFT JOIN orders AS o ON c.customer_id = o.customer_id
GROUP BY c.name, c.city
ORDER BY total_spent DESC NULLS LAST
""").fetchdf()
print(result)
name city order_count total_spent
0 张三 北京 2 570.0
1 李四 上海 1 180.0
2 王五 广州 0 NaN
零拷贝的工作原理
DuckDB 的 Python 客户端不会序列化你的 DataFrame——它通过 Apache Arrow 直接读取底层的 NumPy/Pandas 列式数据。这意味着:
- 无需内存复制——Pandas 看到的数据就是 DuckDB 查询的数据
- 零配置——不需要
CREATE TABLE或数据加载 - 无缝来回转换——查询 DataFrame → 得到 DataFrame
4. 参数化查询
在构建生产管道或交互式应用时,你需要参数化查询来防止 SQL 注入并安全处理动态值。
使用 ? 占位符
duckdb.sql("SELECT * FROM df WHERE price > ? AND category = ?", [30.0, '电子产品']).show()
命名参数
min_price = 25.0
target_category = '家居'
duckdb.sql("""
SELECT * FROM df
WHERE price >= $min_price AND category = $target_category
""", params={'min_price': min_price, 'target_category': target_category}).show()
参数化 INSERT
conn = duckdb.connect()
conn.execute("CREATE TABLE IF NOT EXISTS sales (product VARCHAR, amount DECIMAL(10,2), sale_date DATE)")
products = ['Widget A', 'Widget B', 'Widget C']
amounts = [99.99, 149.99, 79.99]
dates = ['2026-01-15', '2026-01-16', '2026-01-17']
for p, a, d in zip(products, amounts, dates):
conn.execute("INSERT INTO sales VALUES (?, ?, ?)", [p, a, d])
conn.sql("SELECT * FROM sales").show()
批量插入
用 executemany 提升大批量数据插入性能:
data = [
('Widget D', 199.99, '2026-02-01'),
('Widget E', 249.99, '2026-02-02'),
('Widget F', 129.99, '2026-02-03'),
]
conn.executemany("INSERT INTO sales VALUES (?, ?, ?)", data)
5. 读写 CSV、Parquet 和 JSON
DuckDB 的 read_csv_auto、read_parquet 和 read_json_auto 函数让文件读写变得极其简单。
CSV 文件
# 直接读取 CSV 文件到 DuckDB 关系
rel = duckdb.sql("SELECT * FROM read_csv_auto('data/sales_2026.csv')")
print(rel.fetchdf().head())
# 带选项的读取
rel = duckdb.sql("""
SELECT * FROM read_csv_auto(
'data/sales_2026.csv',
header=true,
delim=',',
dateformat='%Y-%m-%d',
all_varchar=true
)
""")
# 将查询结果写入 CSV
duckdb.sql("COPY (SELECT * FROM read_csv_auto('input.csv') WHERE amount > 100) TO 'filtered_output.csv' (HEADER, DELIMITER ',')")
Parquet 文件
Parquet 是 DuckDB 表现最出色的格式——列式存储与 DuckDB 的向量化引擎完美匹配。
# 读取 Parquet 文件
df = duckdb.sql("SELECT * FROM read_parquet('data/analytics.parquet')").fetchdf()
# 使用通配符读取多个 Parquet 文件
df = duckdb.sql("SELECT * FROM read_parquet('data/monthly/*.parquet')").fetchdf()
# 读取分区 Parquet 数据集
df = duckdb.sql("""
SELECT * FROM read_parquet('data/year=2026/month=*/*.parquet')
WHERE region = '亚太'
""").fetchdf()
# 将查询结果写入 Parquet
duckdb.sql("""
COPY (
SELECT region, SUM(revenue) AS total
FROM read_parquet('data/*.parquet')
GROUP BY region
) TO 'region_totals.parquet' (FORMAT PARQUET)
""")
JSON 文件
DuckDB 同时支持换行符分隔 JSON 和标准 JSON 数组:
# NDJSON(每行一个 JSON 对象)
df = duckdb.sql("SELECT * FROM read_json_auto('data/events.ndjson')").fetchdf()
# JSON 数组
df = duckdb.sql("SELECT * FROM read_json_auto('data/array.json')").fetchdf()
# 嵌套 JSON 自动展平
df = duckdb.sql("""
SELECT
id,
user.name AS user_name,
user.address.city AS city,
metadata.timestamp::TIMESTAMP AS event_time
FROM read_json_auto('data/complex.json')
""").fetchdf()
# 写入 JSON
duckdb.sql("""
COPY (SELECT * FROM read_parquet('data.parquet') LIMIT 1000)
TO 'sample.json' (FORMAT JSON)
""")
6. 在 Python 中使用 DuckDB 做数据分析
除了基本的查询功能,DuckDB Python 还支持强大的分析工作流。
链式调用
DuckDB 的关系 API 支持方法链:
rel = duckdb.sql("SELECT * FROM read_csv_auto('transactions.csv')")
# 链式过滤和聚合
result = (
rel.filter("amount > 50")
.aggregate("customer_id, SUM(amount) AS total, COUNT(*) AS txns")
.order("total DESC")
.limit(10)
.fetchdf()
)
窗口函数
result = duckdb.sql("""
SELECT
product,
sale_date,
amount,
SUM(amount) OVER (PARTITION BY product ORDER BY sale_date) AS running_total,
AVG(amount) OVER (PARTITION BY product ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS moving_avg_3
FROM read_csv_auto('daily_sales.csv')
ORDER BY product, sale_date
""").fetchdf()
统计分析
result = duckdb.sql("""
SELECT
category,
COUNT(*) AS n,
AVG(price) AS mean_price,
STDDEV(price) AS std_price,
MIN(price) AS min_price,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY price) AS median_price,
MAX(price) AS max_price,
CORR(price, quantity) AS price_qty_correlation
FROM read_parquet('products/*.parquet')
GROUP BY category
""").fetchdf()
创建视图实现可复用的分析逻辑
conn = duckdb.connect()
# 将 Pandas DataFrame 注册为视图
conn.register('orders_view', orders_df)
# 创建 SQL 视图封装可复用逻辑
conn.sql("""
CREATE VIEW high_value_orders AS
SELECT * FROM orders_view
WHERE amount > 500 AND status = 'completed'
""")
# 在后续查询中使用视图
hourly_stats = conn.sql("""
SELECT
DATE_TRUNC('hour', order_time) AS hour,
COUNT(*) AS orders,
SUM(amount) AS revenue
FROM high_value_orders
GROUP BY hour
ORDER BY hour
""").fetchdf()
7. Python 用户性能优化技巧
1. 下推过滤和投影
DuckDB 可以将过滤条件直接推入 Parquet/CSV 读取阶段,尽可能先过滤再 JOIN:
# ❌ 慢:先读全部数据再过滤
df = duckdb.sql("""
SELECT * FROM (
SELECT * FROM read_parquet('huge_dataset/*.parquet')
) WHERE region = '中国'
""").fetchdf()
# ✅ 快:过滤条件被推入读取器
df = duckdb.sql("""
SELECT * FROM read_parquet('huge_dataset/*.parquet')
WHERE region = '中国'
""").fetchdf()
2. 优先使用 Parquet 而非 CSV
Parquet 在分析查询中比 CSV 快 10-100 倍:
# 慢
duckdb.sql("SELECT * FROM read_csv_auto('data.csv') WHERE date > '2026-01-01'")
# 快
duckdb.sql("SELECT * FROM read_parquet('data.parquet') WHERE date > '2026-01-01'")
3. 显式设置内存限制
# 限制 DuckDB 内存使用
duckdb.sql("SET memory_limit = '4GB'")
# 设置线程数
duckdb.sql("SET threads = 4")
4. 合理使用 fetchdf()
只在需要 Pandas 特有功能时才转换结果:
# ❌ 不必要的转换
df = duckdb.sql("SELECT * FROM large_table").fetchdf()
# 然后再做 DuckDB 操作
df2 = duckdb.sql("SELECT COUNT(*) FROM df").fetchdf()
# ✅ 尽可能留在 DuckDB 中
count = duckdb.sql("SELECT COUNT(*) FROM large_table").fetchone()[0]
5. 注册大 DataFrame 而非重复传递
对于需要反复查询的 DataFrame,一次注册即可:
# ❌ 每次重新解析
for i in range(100):
duckdb.sql("SELECT COUNT(*) FROM my_df WHERE amount > ?", [i])
# ✅ 注册一次,高效复用
conn = duckdb.connect()
conn.register('my_df', my_df)
for i in range(100):
conn.sql("SELECT COUNT(*) FROM my_df WHERE amount > ?", [i])
实战示例:完整分析管道
下面是一个综合了上述所有技巧的真实 Python 分析示例:
import duckdb
import pandas as pd
from datetime import datetime
# 连接到持久化数据库
conn = duckdb.connect('retail_analysis.db')
# 1. 从多个来源加载原始数据
conn.sql("""
CREATE TABLE daily_sales AS
SELECT * FROM read_csv_auto('data/sales_2026.csv')
""")
# 2. 导入 BI 导出的 Parquet 数据
conn.sql("""
INSERT INTO daily_sales
SELECT * FROM read_parquet('data/bi_export/*.parquet')
""")
# 3. 执行分析查询
monthly_performance = conn.sql("""
SELECT
DATE_TRUNC('month', sale_date) AS month,
product_category,
COUNT(DISTINCT customer_id) AS unique_customers,
SUM(quantity * unit_price) AS revenue,
SUM(quantity * unit_price) / NULLIF(COUNT(DISTINCT customer_id), 0) AS revenue_per_customer
FROM daily_sales
WHERE sale_date >= '2026-01-01'
GROUP BY ALL
HAVING revenue > 10000
ORDER BY month, revenue DESC
""").fetchdf()
# 4. 与 Pandas DataFrame 混合分析(例如从 CRM 导出的客户分层数据)
segments = pd.read_csv('data/customer_segments.csv')
blended = conn.sql("""
SELECT
s.month,
s.product_category,
s.revenue,
cs.segment,
cs.region
FROM monthly_performance AS s
JOIN segments AS cs ON s.product_category = cs.category
WHERE cs.segment IN ('VIP', '企业客户')
""").fetchdf()
# 5. 导出最终结果
conn.sql("""
COPY (
SELECT * FROM blended
) TO 'analysis_output.parquet' (FORMAT PARQUET)
""")
print("分析完成!结果已保存到 analysis_output.parquet")
print(blended.head())
总结
DuckDB Python 集成提供了一个独特而强大的组合:SQL 完整的分析能力和 Python 的灵活性,全部集成在单个进程中,无需任何外部依赖。无论你是要替换缓慢的 Pandas groupby 操作、构建跨 CSV/Parquet/JSON 的 ETL 管道,还是创建交互式数据分析应用,DuckDB 都能提供所需的性能和简洁性。
核心要点回顾:
- 安装极其简单——
pip install duckdb,一行命令搞定 - Pandas 集成无缝——直接在 DataFrame 上运行 SQL,零拷贝
- 多格式原生支持——CSV、Parquet、JSON 全部开箱即用
- 生产环境就绪——参数化查询、内存限制、线程控制一应俱全
- 性能优先——向量化执行、过滤器下推、列式存储加速
现在就尝试将 DuckDB 集成到你的 Python 数据分析工作中吧!更多 DuckDB 教程,请参考我们的 安装指南 和 入门指南 2026。记住:最快的 Python 数据处理代码,就是把重活交给 DuckDB 的代码。
