一条SQL查遍MySQL、PostgreSQL和CSV:DuckDB跨库JOIN实战

公司数据分散在MySQL订单库、PostgreSQL用户库、Excel/CSV商品目录里?DuckDB ATTACH语法一条SQL跨三个数据源JOIN,告别导出-合并-VLOOKUP的原始流程,效率提升50倍。

一、每个数据分析师都经历过的"数据孤岛"噩梦

你在电商公司做数据分析。老板问:“上个月销售额排名前100的商品中,老客户复购率是多少?”

数据在哪里?

  • 订单数据:在 MySQL 订单库里,7 个表,按月份分表
  • 用户标签:在 PostgreSQL 分析库里,用来标记新老客户
  • 商品信息:在运营部门每周更新的 Excel/CSV 里,字段名还经常变

传统做法是什么?一个让人崩溃的三步走流程:

第1步:从 MySQL 导出销售数据 → 跑一条 SQL → 导出 CSV(5分钟)
第2步:从 PostgreSQL 导出用户标签 → 跑一条 SQL → 导出 CSV(5分钟)
第3步:在 Excel 里用 VLOOKUP 合并三个 CSV → 手指悬停等待(10分钟,还容易卡死)
第4步:发现漏了数据 → 重新导出 → 重新 VLOOKUP(痛苦加倍)
第5步:老板说"再加个字段看看" → 全部重来(崩溃)

整个过程至少 30 分钟到 1 小时,数据量一大 Excel 直接崩溃,而且完全无法复用——换个日期范围全部重做。

二、DuckDB 解法:ATTACH + 跨库 JOIN

DuckDB 有一个被严重低估的功能:ATTACH 语句可以像挂载硬盘一样把外部数据库挂进来,然后直接用 SQL 跨数据源 JOIN。

这意味着什么?一条 SQL 搞定三个数据源,无需导出、无需合并、无需 VLOOKUP。

2.1 ATTACH 基本原理

-- 挂载一个 SQLite 数据库(最简单示例)
ATTACH 'path/to/file.db' AS my_db (TYPE SQLITE);

-- 现在可以跨库查询了
SELECT *
FROM my_db.some_table AS a
JOIN main.public.another_table AS b ON a.id = b.id;

DuckDB 支持 ATTACH 的数据源包括:

数据源ATTACH 语法类型标识
SQLiteATTACH 'file.db' (TYPE SQLITE)SQLITE
MySQLATTACH '' (TYPE MYSQL)MYSQL
PostgreSQLATTACH 'pg_conn_str' (TYPE POSTGRES)POSTGRES
DuckDB 自身ATTACH 'data.duckdb'DUCKDB
Delta LakeATTACH './delta_dir' (TYPE DELTA)DELTA

注意:连接 MySQL 和 PostgreSQL 需要安装对应的扩展:

INSTALL mysql_scanner; LOAD mysql_scanner;
INSTALL postgres_scanner; LOAD postgres_scanner;

2.2 完整实战:电商跨库查询

下面是一个完整的 Python 脚本,模拟了电商场景的三个数据源,无需真实数据库——我们用 DuckDB 的内存表和 CSV 文件来模拟,复制就能跑。

#!/usr/bin/env python3
"""
DuckDB 跨库 JOIN 实战演示
模拟场景:电商公司数据分散在三个数据源,一条 SQL 完成联查

前置条件: pip install duckdb openpyxl
"""

import duckdb
import os

# ====== 第1步:创建模拟数据 ======

# 模拟 MySQL 订单表 (CSV 文件)
orders_csv = """order_id,customer_id,product_id,amount,order_date
1001,201,5001,299.00,2026-05-01
1002,202,5002,159.00,2026-05-01
1003,201,5003,899.00,2026-05-02
1004,203,5001,299.00,2026-05-02
1005,204,5004,459.00,2026-05-03
1006,202,5002,159.00,2026-05-03
1007,205,5005,1299.00,2026-05-04
1008,203,5003,899.00,2026-05-04
1009,206,5001,299.00,2026-05-05
1010,201,5004,459.00,2026-05-05
"""

# 模拟 PostgreSQL 用户表 (CSV 文件)
users_csv = """customer_id,name,city,member_level,register_date
201,张三,北京,金牌,2025-01-15
202,李四,上海,银牌,2025-03-20
203,王五,广州,金牌,2025-02-01
204,赵六,深圳,普通,2025-06-10
205,陈七,杭州,银牌,2025-04-05
206,孙八,成都,普通,2025-08-15
"""

# 模拟商品目录 (Excel 将会生成)
products_csv = """product_id,product_name,category,unit_price,cost_price
5001,无线蓝牙耳机,数码,299.00,180.00
5002,保温杯,家居,159.00,80.00
5003,智能手表,数码,899.00,550.00
5004,运动鞋,服饰,459.00,280.00
5005,平板支架,数码,1299.00,800.00
"""

# 写入临时文件
os.makedirs("day07_data", exist_ok=True)
with open("day07_data/orders.csv", "w") as f:
    f.write(orders_csv)
with open("day07_data/users.csv", "w") as f:
    f.write(users_csv)
with open("day07_data/products.csv", "w") as f:
    f.write(products_csv)

# ====== 第2步:用 DuckDB 模拟跨源 JOIN ======

# 启动 DuckDB 内存数据库
con = duckdb.connect()

# 挂载 CSV 目录,模拟"不同数据源"
con.execute("""
    CREATE VIEW orders AS 
    SELECT * FROM read_csv_auto('day07_data/orders.csv')
""")
con.execute("""
    CREATE VIEW users AS 
    SELECT * FROM read_csv_auto('day07_data/users.csv')
""")
con.execute("""
    CREATE VIEW products AS 
    SELECT * FROM read_csv_auto('day07_data/products.csv')
""")

print("=" * 60)
print("📊 跨源分析:金牌会员购买了哪些商品?")
print("=" * 60)

# 一条 SQL 跨三个"数据源" JOIN
result = con.execute("""
    SELECT 
        u.name AS 客户名称,
        u.city AS 城市,
        u.member_level AS 会员等级,
        p.product_name AS 商品名称,
        p.category AS 品类,
        o.amount AS 单价,
        (o.amount - p.cost_price) AS 毛利
    FROM orders o
    JOIN users u ON o.customer_id = u.customer_id
    JOIN products p ON o.product_id = p.product_id
    WHERE u.member_level IN ('金牌', '银牌')
    ORDER BY o.amount DESC
""").fetchdf()

print(result.to_string(index=False))

print("\n" + "=" * 60)
print("💰 每日销售额汇总(含毛利)")
print("=" * 60)

result2 = con.execute("""
    SELECT 
        o.order_date AS 日期,
        COUNT(DISTINCT o.order_id) AS 订单数,
        SUM(o.amount) AS 销售额,
        SUM(o.amount - p.cost_price) AS 总毛利,
        ROUND(AVG(o.amount - p.cost_price), 2) AS 平均每单毛利
    FROM orders o
    JOIN products p ON o.product_id = p.product_id
    GROUP BY o.order_date
    ORDER BY o.order_date
""").fetchdf()

print(result2.to_string(index=False))

print("\n" + "=" * 60)
print("🏆 商品品类毛利分析")
print("=" * 60)

result3 = con.execute("""
    SELECT 
        p.category AS 品类,
        COUNT(*) AS 销量,
        SUM(o.amount) AS 销售额,
        SUM(o.amount - p.cost_price) AS 总毛利,
        ROUND(AVG(o.amount - p.cost_price), 2) AS 平均毛利,
        ROUND(SUM(o.amount - p.cost_price) / SUM(o.amount) * 100, 1) AS 毛利率_percent
    FROM orders o
    JOIN products p ON o.product_id = p.product_id
    GROUP BY p.category
    ORDER BY 总毛利 DESC
""").fetchdf()

print(result3.to_string(index=False))

# ====== 第3步:输出为 Excel 报表 ======

from duckdb import connect as duck_connect

try:
    con.execute("INSTALL spatial; LOAD spatial;")
    con.execute("""
        COPY (
            SELECT 
                o.order_id,
                u.name,
                u.city,
                u.member_level,
                p.product_name,
                p.category,
                o.amount,
                (o.amount - p.cost_price) AS profit
            FROM orders o
            JOIN users u ON o.customer_id = u.customer_id
            JOIN products p ON o.product_id = p.product_id
        ) TO 'day07_data/跨源分析报表.xlsx' 
        WITH (FORMER XLSX);
    """)
    print("\n✅ 报表已导出: day07_data/跨源分析报表.xlsx")
except Exception as e:
    print(f"\n⚠️ 导出 XLSX 需要 spatial 扩展: {e}")
    print("可以改用 CSV 导出:")
    con.execute("""
        COPY (
            SELECT 
                o.order_id,
                u.name,
                u.city,
                u.member_level,
                p.product_name,
                p.category,
                o.amount,
                (o.amount - p.cost_price) AS profit
            FROM orders o
            JOIN users u ON o.customer_id = u.customer_id
            JOIN products p ON o.product_id = p.product_id
        ) TO 'day07_data/跨源分析报表.csv' (HEADER, DELIMITER ',');
    """)
    print("✅ 报表已导出: day07_data/跨源分析报表.csv")

# ====== 第4步:验证结果 ======
print("\n" + "=" * 60)
print("📈 验证:所有订单是否都关联到用户和商品?")
print("=" * 60)

validation = con.execute("""
    SELECT 
        '订单总数' AS 指标, CAST(COUNT(*) AS VARCHAR) AS 值 FROM orders
    UNION ALL
    SELECT '匹配到用户', CAST(COUNT(*) AS VARCHAR) FROM orders o 
        JOIN users u ON o.customer_id = u.customer_id
    UNION ALL
    SELECT '匹配到商品', CAST(COUNT(*) AS VARCHAR) FROM orders o 
        JOIN products p ON o.product_id = p.product_id
    UNION ALL
    SELECT '完全匹配', CAST(COUNT(*) AS VARCHAR) FROM orders o 
        JOIN users u ON o.customer_id = u.customer_id
        JOIN products p ON o.product_id = p.product_id
""").fetchdf()

print(validation.to_string(index=False))

con.close()
print("\n🎉 跨库 JOIN 演示完成!")

2.3 真实环境:连接 MySQL + PostgreSQL

当你有真实的 MySQL 和 PostgreSQL 数据库时,脚本是这样的:

-- 安装扩展(只需一次)
INSTALL mysql_scanner;
LOAD mysql_scanner;
INSTALL postgres_scanner;
LOAD postgres_scanner;

-- 挂载 MySQL 订单库
ATTACH 'host=localhost port=3306 dbname=orders_db user=analyst password=xxx' 
    AS mysql_db (TYPE MYSQL);

-- 挂载 PostgreSQL 用户库
ATTACH 'host=localhost port=5432 dbname=users_db user=analyst password=xxx' 
    AS pg_db (TYPE POSTGRES);

-- 挂载本地商品 CSV
CREATE VIEW products AS 
SELECT * FROM read_csv_auto('products.csv');

-- 一条 SQL 跨三个数据源
SELECT 
    u.name,
    u.city,
    p.product_name,
    SUM(o.amount) AS total_spent
FROM mysql_db.orders AS o
JOIN pg_db.public.customers AS u ON o.customer_id = u.customer_id
JOIN products AS p ON o.product_id = p.product_id
WHERE o.order_date >= '2026-04-01'
GROUP BY u.name, u.city, p.product_name
ORDER BY total_spent DESC;

三、效果对比:传统方式 vs DuckDB

场景传统做法(导出 + VLOOKUP)DuckDB ATTACH 方案
跨 MySQL + PG + CSV 联查30 分钟 ~ 1 小时10 ~ 30 秒
Excel 打开 50 万行数据卡死/崩溃毫秒级返回
修改分析维度重新导出 + VLOOKUP改一行 SQL
定期生成报表每次手动重复脚本一键运行
数据量 10GB+Excel/Pandas 内存爆炸流式处理无压力
学习成本会 VLOOKUP 即可会标准 SQL 即可
依赖Excel + 多个数据库客户端只用 DuckDB
复用性基本为零SQL 脚本永久可用

量化效果:

以一个真实案例计算——某电商公司需要每天出跨数据源运营报表:

  • 传统方式:数据分析师每天花 40 分钟 导出、合并、检查数据
  • DuckDB 方案:写一次 SQL 脚本,每天执行耗时 15 秒
  • 月节省时间:40 分钟 × 22 工作日 = 880 分钟(14.7 小时)
  • 换算成薪资:按 ¥50/小时计算,每月节省 ¥735/人

四、ATTACH 的工作原理(知其所以然)

理解 ATTACH 背后的机制,能帮你更好地设计和优化跨库查询。

4.1 ATTACH 不是 ETL

ATTACH 不会把数据复制到 DuckDB —— 它只是在 DuckDB 中创建一个外部表引用。查询时,DuckDB 会实时推下查询到源数据库,只拉取需要的数据。

举个例子:

-- DuckDB 会在 MySQL 端执行这个 GROUP BY
-- 只回传聚合后的少量结果,而不是全表扫描
SELECT customer_id, COUNT(*) 
FROM mysql_db.orders 
WHERE order_date >= '2026-01-01'
GROUP BY customer_id;

DuckDB 的优化器会自动把 WHEREGROUP BYLIMIT 等操作推下到源数据库执行,最小化数据传输量。

4.2 性能关键因素

因素说明优化建议
网络延迟跨库查询依赖网络尽量把 DuckDB 部署在数据库附近
推下优化聚合/过滤在源端执行多用 WHERE 减少数据量
索引利用源表的索引仍然有效在 JOIN 字段上建索引
数据量DuckDB 不缓存外部表重复查询可用 CREATE TABLE AS 快照

4.3 性能优化技巧

-- ❌ 不推荐:全表拉取后再过滤
SELECT * FROM mysql_db.orders;  -- 可能拉几百万行

-- ✅ 推荐:推下过滤 + 限制
SELECT * FROM mysql_db.orders 
WHERE order_date >= '2026-05-01' 
LIMIT 1000;

-- ✅ 复杂查询:先聚合再 JOIN
WITH daily_stats AS (
    -- 这个聚合在 MySQL 端执行
    SELECT customer_id, DATE(order_date) AS day, SUM(amount) AS daily_total
    FROM mysql_db.orders
    WHERE order_date >= '2026-04-01'
    GROUP BY customer_id, DATE(order_date)
)
-- 再和本地数据 JOIN
SELECT u.name, d.day, d.daily_total
FROM daily_stats d
JOIN pg_db.public.customers u ON d.customer_id = u.customer_id
ORDER BY d.daily_total DESC
LIMIT 20;

五、进阶玩法:ATTACH 的更多应用场景

5.1 数据迁移:跨库拷贝

-- MySQL → DuckDB 本地表(一次性快照)
CREATE TABLE local_orders AS 
SELECT * FROM mysql_db.orders 
WHERE order_date >= '2026-01-01';

-- DuckDB → PostgreSQL(写回)
CREATE TABLE pg_db.public.report AS 
SELECT * FROM local_analytics;

5.2 多环境对比

-- 同时挂载生产和测试库
ATTACH 'prod_conn' AS prod (TYPE POSTGRES);
ATTACH 'staging_conn' AS staging (TYPE POSTGRES);

-- 对比两个环境的数据差异
SELECT 
    COALESCE(p.order_id, s.order_id) AS order_id,
    p.amount AS prod_amount,
    s.amount AS staging_amount,
    (p.amount - s.amount) AS diff
FROM prod.public.orders p
FULL OUTER JOIN staging.public.orders s 
    ON p.order_id = s.order_id
WHERE p.amount IS DISTINCT FROM s.amount;

5.3 定时报表自动化

# 配合 cron 定时执行,每天自动出报表
import duckdb
import smtplib

con = duckdb.connect()

# ATTACH 各数据源(同上)
con.execute("ATTACH '...' AS mysql_db (TYPE MYSQL)")
con.execute("ATTACH '...' AS pg_db (TYPE POSTGRES)")

# 生成日报
con.execute("""
    COPY (
        -- 跨源查询...
    ) TO '/tmp/daily_report.csv' (HEADER, DELIMITER ',');
""")

# 发送邮件(伪代码,需要配置 SMTP)
# send_email(to='boss@company.com', attachment='/tmp/daily_report.csv')
print("✅ 日报已生成")

六、注意事项与常见问题

6.1 MySQL 扩展安装

# 确保 MySQL 客户端库已安装
apt-get install -y default-libmysqlclient-dev  # Ubuntu/Debian
# 或
brew install mysql-client  # macOS

然后在 DuckDB 中:

INSTALL mysql_scanner;
LOAD mysql_scanner;

6.2 连接字符串格式

数据源连接字符串示例
MySQLhost=localhost port=3306 dbname=test user=root password=secret
PostgreSQLhost=localhost port=5432 dbname=test user=postgres password=secret
SQLite./data.db(文件路径即可)

6.3 常见坑

  1. MySQL 8.0 密码认证:确保使用 mysql_native_password 或更新 DuckDB 至 v1.5+ 以支持 caching_sha2_password
  2. PostgreSQL SSL 连接:添加 sslmode=require 参数
  3. 大表 JOIN:如果两边都有大表,考虑先拉取小表到 DuckDB 本地
  4. 字符集:默认 UTF-8,如果 MySQL 是 latin1 可能有乱码

七、变现方案

这个技能的市场价值极高,因为99% 的公司都存在数据孤岛问题

目标客户

  • 中小企业:数据分散在多个系统,没有专门的数据团队
  • 电商公司:订单系统 + 客服系统 + 财务系统各自独立
  • 连锁零售:各门店 + 总部 + 供应链多个数据源
  • 传统企业转型中:老旧数据库 + 新系统并存

报价方案

服务类型报价交付内容周期
一次性数据整合¥2,000-5,000跨库查询脚本 + Excel 报表模板1-3 天
月报自动化¥500-1,500/月定时生成跨源经营报表每月更新
数据仓库搭建¥5,000-15,000完整的 ETL 管线 + 分析看板1-2 周
数据整合培训¥1,500-3,000/次教客户团队用 DuckDB 自己查半天

获客渠道

  1. 闲鱼/猪八戒:搜索"数据整合"、“跨库查询”、“报表自动化”,直接展示 DuckDB 方案对比
  2. 企业微信社群:加入电商/零售行业群,主动询问"你们报表怎么出的?"
  3. 技术博客引流:本文就是在帮你建立专业形象

竞品对比

方案价格优势劣势
传统 ETL 工具 (Kettle/DataX)免费但需运维功能全面配置复杂,学习成本高
商业 BI (Tableau/Power BI)¥500-2000/月可视化好价格贵,跨源能力弱
花钱请人手动做¥300-500/月不用动脑不稳定,离职就断
DuckDB 方案(你)¥2,000-5,000一次搞定永久使用需要客户有基本技术认知

变现话术模板

“张总,我看到你们公司的数据分散在好几个系统里,每次出报表都要手工合并对吧?我有一套方案,用一条 SQL 就能把你所有系统的数据串起来,以后点一下就跑出来完整报表。前期整合费用 ¥3,000,以后每个月自动出,¥800/月。感兴趣的话,我可以先免费帮你做一次数据探查。你看这周三方便不?”

八、总结

DuckDB 的 ATTACH + 跨库 JOIN 能力,是这个数据库被严重低估的杀手级功能。它让数据分析师摆脱了"导出 → 合并 → VLOOKUP"的原始流程,把跨数据源查询的耗时从小时级压缩到秒级。

更重要的是,企业数据孤岛是个刚需痛,而 DuckDB 提供了一个低成本、易上手、秒见效的解决方案。掌握了这个技能,你就可以去解决真实世界里的整合问题,并且明码标价地赚钱。

今天就可以做的事:

  1. 用本文的模拟脚本跑一遍,理解 ATTACH 语法
  2. 找到你公司里数据最分散的一个场景
  3. 用 DuckDB 打通它,把结果给老板看
  4. 拿着这个案例去接外面公司的单子

附:清理临时文件

rm -rf day07_data/