DuckDB 多文件读取与数据质量分析:从 50 个混乱 CSV 到一键洞察
摘要:工作中经常遇到这样的场景——几十个甚至上百个 CSV/Excel 文件散落在各个文件夹里,列名不统一、编码不一致、缺少某些字段。用 Excel 手动合并?不现实。用 Python pandas 写一堆
glob+concat?维护成本极高。DuckDB 用一行 SQL 就能搞定这一切,还能顺便做数据质量分析和异常检测。
一、场景:这不是你一个人的痛点
你一定经历过这种绝望——
老板说"把上个月各分店的销售数据汇总一下"。你打开邮箱,收到 50 个 Excel 文件,每个文件的列名还不完全一样:有的叫"日期",有的叫"date";有的有"门店"列,有的没有。
于是你开始:一个个打开 → 复制数据 → 粘贴到总表 → 调整格式 → 核对数字 → 发现漏了两行 → 重来。花了 3 个小时,最后还不确定对不对。
你的目标:让他们用 Excel 的方式解决问题,但底层用 DuckDB。
以下岗位每周至少花 2-5 小时在"合并表格"这件事上:
| 岗位 | 典型场景 |
|---|---|
| 财务 | 每月合并各子公司报表 |
| 运营 | 汇总各渠道投放数据 |
| HR | 整合各部门考勤和绩效 |
| 市场 | 收集各区域活动效果 |
| 供应链 | 对比多家供应商报价 |
二、数据准备:模拟 50 个混乱的门店销售报表
假设我们有 50 个 CSV 文件,每个代表一个分店的月度销售数据:
sales_data/
├── store_001_sales.csv
├── store_002_sales.csv
├── ...
└── store_050_sales.csv
每个文件的结构类似但不完全一致:
日期,商品名称,类别,数量,单价,销售额,门店
2026-05-01,键盘,外设,10,299,2990,北京店
2026-05-01,鼠标,外设,20,99,1980,北京店
2026-05-02,显示器,硬件,5,2599,12995,北京店
注意:有些文件用中文列名,有些用英文列名,有些甚至混用。
三、核心方案:一行代码合并 50 个文件
方法 1:Glob 通配符——最简洁的方式
import duckdb
con = duckdb.connect(":memory:")
# 一行代码:读取目录下所有 CSV 并自动合并
result = con.execute("""
SELECT * FROM read_csv_auto('sales_data/store_*.csv')
""").fetchdf()
print(f"合并了 {len(result)} 条记录")
就这么简单。 DuckDB 会自动:
- 识别所有匹配的文件
- 推断列名和数据类型
- 自动对齐不同列名的字段
- 处理缺失列(自动填 NULL)
方法 2:处理列名不一致的情况
如果不同文件的列名差异较大,可以用 COALESCE 统一映射:
result = con.execute("""
SELECT
COALESCE("日期", date) AS sale_date,
COALESCE("商品名称", product) AS product_name,
COALESCE("类别", category) AS category,
COALESCE("数量", qty) AS quantity,
COALESCE("单价", unit_price) AS unit_price,
COALESCE("销售额", amount) AS amount,
COALESCE("门店", store) AS store
FROM read_csv_auto('sales_data/store_*.csv')
""").fetchdf()
这里用 COALESCE 尝试取中文列名,取不到就用英文列名。无论原始文件用什么语言命名,都能正确合并。
方法 3:用 Python 封装成可复用工具
import duckdb
from pathlib import Path
from typing import Optional
def merge_csv_glob(
pattern: str,
output_format: str = "dataframe",
columns: Optional[dict] = None
):
"""
合并 glob 模式匹配的所有 CSV 文件
Args:
pattern: glob 模式,如 'sales_data/*.csv'
output_format: 'dataframe', 'table', 或 'parquet'
columns: 列名映射字典,{'旧名': '新名'}
Returns:
pandas DataFrame 或 None
"""
con = duckdb.connect(":memory:")
base_query = f"SELECT * FROM read_csv_auto('{pattern}')"
if columns:
col_mapping = ", ".join(
f"COALESCE({old!r}, {new!r}) AS {new}"
for old, new in columns.items()
)
query = f"SELECT {col_mapping} FROM ({base_query})"
else:
query = base_query
if output_format == "dataframe":
return con.execute(query).fetchdf()
elif output_format == "parquet":
output_file = pattern.replace('*', 'merged_output')
con.execute(f"COPY ({query}) TO '{output_file}' (FORMAT PARQUET)")
return None
con.close()
# 使用示例
df = merge_csv_glob(
'sales_data/store_*.csv',
output_format='dataframe',
columns={
'日期': 'sale_date',
'商品名称': 'product_name',
'类别': 'category',
'数量': 'quantity',
'单价': 'unit_price',
'销售额': 'amount',
'门店': 'store'
}
)
print(f"合并完成:{len(df)} 条记录")
四、合并后的数据分析:这才是重点
合并数据只是第一步,真正的价值在于分析。
分析 1:各门店月度销售排名
ranking_sql = """
SELECT
store,
COUNT(*) AS transaction_count,
ROUND(SUM(amount), 2) AS total_sales,
ROUND(AVG(amount), 2) AS avg_order_value,
ROUND(SUM(amount) * 100.0 / SUM(SUM(amount)) OVER (), 1) AS share_pct
FROM read_csv_auto('sales_data/store_*.csv')
GROUP BY store
ORDER BY total_sales DESC
LIMIT 10;
"""
con.execute(ranking_sql).print()
输出示例:
+---------+-------------------+-------------+-----------------+------------+
| store | transaction_count | total_sales | avg_order_value | share_pct |
+---------+-------------------+-------------+-----------------+------------+
| 上海店 | 3,842 | 2,847,392 | 741.18 | 12.3 |
| 北京店 | 3,621 | 2,651,208 | 732.18 | 11.5 |
| 广州店 | 3,105 | 2,398,567 | 772.49 | 10.4 |
| 深圳店 | 2,987 | 2,287,431 | 765.55 | 9.9 |
| 成都店 | 2,756 | 2,156,892 | 782.63 | 9.3 |
+---------+-------------------+-------------+-----------------+------------+
分析 2:Z-Score 异常检测——找出销售额异常波动的日期
这是本文的亮点:用纯 SQL 做统计学意义上的异常检测,不需要 Python,不需要 scikit-learn。
spike_sql = """
WITH daily_sales AS (
SELECT
COALESCE("日期", date) AS sale_date,
ROUND(SUM(amount), 2) AS daily_total
FROM read_csv_auto('sales_data/store_*.csv')
GROUP BY 1
),
with_stats AS (
SELECT
sale_date,
daily_total,
AVG(daily_total) OVER () AS avg_daily,
STDDEV_SAMP(daily_total) OVER () AS std_daily
FROM daily_sales
)
SELECT
sale_date,
daily_total,
ROUND((daily_total - avg_daily) / NULLIF(std_daily, 0), 2) AS z_score,
CASE
WHEN z_score > 2 THEN '异常高峰'
WHEN z_score < -2 THEN '异常低谷'
ELSE '正常'
END AS status
FROM with_stats
WHERE ABS(z_score) > 2
ORDER BY z_score DESC;
"""
con.execute(spike_sql).print()
这个分析用到了窗口函数 AVG() OVER() 和 STDDEV_SAMP() OVER(),在 SQL 里计算每个日期相对于整体平均值的标准差(Z-score)。超过 ±2 标准差就被标记为异常。
分析 3:品类交叉分析——哪个品类的利润贡献最大?
category_sql = """
SELECT
COALESCE("类别", category) AS category,
COUNT(DISTINCT COALESCE("门店", store)) AS store_count,
ROUND(SUM(COALESCE("数量", qty) * COALESCE("单价", unit_price)), 2) AS gross_revenue,
ROUND(AVG(COALESCE("数量", qty) * COALESCE("单价", unit_price)), 2) AS avg_transaction,
ROUND(100.0 * SUM(COALESCE("数量", qty) * COALESCE("单价", unit_price)) /
NULLIF(SUM(COALESCE("数量", qty) * COALESCE("单价", unit_price)) OVER (), 0), 1) AS revenue_share
FROM read_csv_auto('sales_data/store_*.csv')
GROUP BY 1
ORDER BY gross_revenue DESC;
"""
con.execute(category_sql).print()
五、数据质量检查:合并前先看"病"
在正式分析之前,先对数据进行质量检查,看看有哪些问题:
# 检查缺失值分布
quality_sql = """
SELECT
'NULL_COUNT' AS metric,
SUM(CASE WHEN amount IS NULL THEN 1 ELSE 0 END) AS null_count,
SUM(CASE WHEN quantity IS NULL THEN 1 ELSE 0 END) AS qty_null,
SUM(CASE WHEN store IS NULL THEN 1 ELSE 0 END) AS store_null,
COUNT(*) AS total_rows
FROM read_csv_auto('sales_data/store_*.csv')
UNION ALL
SELECT
'VALUE_RANGE' AS metric,
MIN(amount)::VARCHAR,
MAX(amount)::VARCHAR,
MIN(quantity)::VARCHAR,
MAX(quantity)::VARCHAR
FROM read_csv_auto('sales_data/store_*.csv');
"""
con.execute(quality_sql).print()
输出示例:
+-------------+---------------------+------------------+---------------+--------------+
| metric | null_count | qty_null | store_null | total_rows |
+-------------+---------------------+------------------+---------------+--------------+
| NULL_COUNT | 23 | 5 | 12 | 187,500 |
| VALUE_RANGE | 0.00 | 999,999.00 | 1 | 500.00 |
+-------------+---------------------+------------------+---------------+--------------+
这能帮你快速定位:哪些字段缺失率高、数据范围是否合理。
六、性能对比:DuckDB vs Excel vs Pandas
| 数据规模 | Excel | Pandas | DuckDB |
|---|---|---|---|
| 10 个文件 × 1,000 行 | ~30 秒(手动) | 0.5 秒 | 0.05 秒 |
| 50 个文件 × 5,000 行 | 卡死 | 3 秒 | 0.1 秒 |
| 200 个文件 × 10,000 行 | 不可能 | 45 秒 | 0.3 秒 |
| 内存占用 | 高(Excel 独占) | 高(双倍内存) | 低(流式读取) |
关键区别:DuckDB 是 lazy evaluation——它不会一次性把所有数据加载到内存,而是边读边处理。 这意味着即使文件总共有 10GB,你的电脑也不需要 10GB 内存。
七、进阶:把结果写入 Parquet——分享更高效
# 直接写入 Parquet——文件更小、读取更快
con.execute("""
COPY (
SELECT
COALESCE("日期", date) AS sale_date,
COALESCE("商品名称", product) AS product_name,
COALESCE("类别", category) AS category,
COALESCE("数量", qty) AS quantity,
COALESCE("单价", unit_price) AS unit_price,
COALESCE("销售额", amount) AS amount,
COALESCE("门店", store) AS store
FROM read_csv_auto('sales_data/store_*.csv')
) TO 'merged_all_stores.parquet' (FORMAT PARQUET);
""")
import os
print(f"Parquet 文件大小: {os.path.getsize('merged_all_stores.parquet') / 1024:.1f} KB")
通常 Parquet 格式只有 CSV 的 30-50% 大小,而且保留数据类型,下次用 DuckDB 读取时不需要再推断。
如果你的文件在云端(比如阿里云 OSS 或 AWS S3),DuckDB 也可以直接读——用 s3:// 或 oss:// 协议,不需要下载到本地。
八、变现建议
学会这套技能后,你可以这样变现:
接外包:在猪八戒、Upwork 等平台接"数据清洗/合并"的外包任务,单次收费 500-5000 元不等。用 DuckDB 处理,10 分钟搞定别人 3 小时的活。
做模板产品:把你的
merge_csv_glob()函数封装成一个 Streamlit 小工具,上传到 Streamlit Community Cloud,免费供人使用,通过广告或增值服务变现。写教程卖课:在 B 站/YouTube 发布"DuckDB 实战系列"视频,积累粉丝后推出付费课程(定价 99-299 元)。
企业内部培训:很多传统企业的财务、运营部门还在用 Excel 手动合并报表。你可以提供 DuckDB 培训课程,单次企业内训收费 5,000-20,000 元。
SaaS 产品:基于 DuckDB + FastAPI 搭建一个"在线表格合并与分析"的 SaaS 平台,按月订阅收费。
真正赚钱的不是技术本身,而是你用技术解决了多少人的痛苦。
结语
以前需要半天的工作,现在 10 秒搞定。真正决定工作效率的,往往不是那些炫酷的技术,而是你把日常重复性工作自动化了多少。
下次再遇到"合并几十个文件"这种无聊但耗时的任务,先想想能不能用 DuckDB 一行 SQL 搞定。你会发现,你的工作时间会多出好几个小时。
