用 DuckDB 搭建 AI 数据问答机器人——让非技术人员也能"对话查数据"

难度:⭐⭐⭐ | 预计耗时:1-2 小时搭建,之后可无限扩展
变现路径:为企业搭建内部数据问答系统,单次交付 5000-20000 元


一、为什么"对话查数据"是一门好生意?

想象这个场景:你去找一家电商公司的运营总监聊合作,他说:“我们每天要看 5 个维度的销售数据,但每次都要找数据分析师排期。”

你回答:“我帮你搭一个系统,以后你们直接在聊天框里问’上周华东区哪三款商品退货率最高’,系统自动给你答案。”

这就是 AI 数据问答机器人的核心价值——让不会 SQL 的业务人员,能用自然语言获取数据洞察。

传统方案怎么做?前端写 React → 后端写 FastAPI → 对接 LLM API → 把自然语言转 SQL → 执行查询 → 返回结果。这套流程至少需要 2000 行代码。

用 DuckDB,整个核心逻辑不到 150 行。原因很简单:DuckDB 是进程内数据库,不需要单独部署,Python 里 import duckdb 就能用,天然适合嵌入 AI Agent 的推理链路中。


二、系统架构:四步完成自然语言到数据结果

用户提问(自然语言)→ LLM 转 SQL → DuckDB 执行 → 格式化返回
       ↑                                      ↓
       └─────────── 结果反馈给 LLM 生成回答 ←┘

整个流程分为四个核心步骤:

  1. 语义理解:LLM 将自然语言转化为 SQL 查询
  2. 安全沙箱:限制 DuckDB 只能读取指定表,防止恶意查询
  3. 结果执行:DuckDB 高效执行查询,返回结构化数据
  4. 自然语言回复:LLM 将数据结果转化为易懂的业务语言

三、完整可运行代码

第一步:准备数据和 DuckDB 表

import duckdb
import json

# 创建 DuckDB 内存数据库
con = duckdb.connect(':memory:')

# 模拟电商订单数据
con.execute("""
CREATE TABLE orders AS
SELECT * FROM read_csv_auto('https://github.com/duckdb/duckdb-data/raw/main/data/orders.csv');
""")

# 查看表结构
print(con.execute("DESCRIBE orders").fetchdf())

如果你的数据在本地 CSV/Parquet 文件中,直接读取即可:

# 读取本地 CSV(自动推断 schema)
con.read_csv_auto('/path/to/sales_data.csv')

# 读取 Parquet 文件(列式存储,速度更快)
con.read_parquet('/path/to/sales_data.parquet')

# 批量读取目录下所有 Parquet 文件
con.read_parquet('/path/to/data/*.parquet')

第二步:构建 SQL 生成器

import os
from openai import OpenAI

client = OpenAI(api_key=os.environ.get('OPENAI_API_KEY'))

def get_table_schema(con):
    """获取 DuckDB 中所有表的结构信息"""
    tables = con.execute("SHOW TABLES").fetchall()
    schema_info = {}
    for table_name, in tables:
        columns = con.execute(f"DESCRIBE {table_name}").fetchall()
        schema_info[table_name] = [
            {'column': col[0], 'type': col[1]} for col in columns
        ]
    return schema_info

def generate_sql(user_query, schema_info):
    """将自然语言转换为 SQL"""
    schema_str = json.dumps(schema_info, indent=2, ensure_ascii=False)
    
    messages = [
        {
            "role": "system",
            "content": """你是一个 SQL 专家。用户会用自然语言提问,你需要将其转换为 DuckDB SQL 查询。
            
重要规则:
1. 只允许使用以下表,不要创建新表
2. 查询结果控制在 100 行以内
3. 如果涉及数值计算,使用 ROUND 保留 2 位小数
4. 如果用户的问题无法用现有数据回答,返回空字符串而不是猜测
5. 只输出 SQL,不要输出解释文字"""
        },
        {
            "role": "user",
            "content": f"""数据库表结构:
{schema_str}

用户问题:{user_query}

请生成 SQL 查询(只输出 SQL 语句本身):"""
        }
    ]
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        temperature=0.1,
        max_tokens=500
    )
    
    sql = response.choices[0].message.content.strip()
    # 清理可能的 markdown 代码块标记
    if sql.startswith("```"):
        sql = sql.split("\n", 1)[1].rsplit("```", 1)[0].strip()
    return sql

第三步:安全执行引擎

class SafeQueryEngine:
    """安全的 DuckDB 查询执行器"""
    
    def __init__(self, connection):
        self.con = connection
    
    def execute_query(self, sql, max_rows=100):
        """执行 SQL 并返回结果"""
        # 安全检查:只允许 SELECT
        sql_upper = sql.strip().upper()
        if not sql_upper.startswith("SELECT"):
            raise PermissionError("只允许执行 SELECT 查询")
        
        # 检查是否包含危险的函数
        dangerous_keywords = ['DROP', 'DELETE', 'INSERT', 'UPDATE', 'ALTER', 'CREATE', 'EXEC']
        for keyword in dangerous_keywords:
            if keyword in sql_upper:
                raise PermissionError(f"不允许执行包含 {keyword} 的查询")
        
        try:
            result = self.con.execute(sql)
            df = result.fetchdf()
            
            # 限制行数
            if len(df) > max_rows:
                df = df.head(max_rows)
            
            return {
                'success': True,
                'data': df.to_dict(orient='records'),
                'columns': list(df.columns),
                'row_count': len(df)
            }
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }

第四步:生成自然语言回答

def generate_answer(user_query, query_result, schema_info):
    """将查询结果转化为自然语言回答"""
    if not query_result['success']:
        return f"抱歉,查询执行失败:{query_result['error']}。请换一种方式提问。"
    
    if query_result['row_count'] == 0:
        return "根据当前数据,没有找到匹配的结果。"
    
    # 构建简要的数据摘要
    data_summary = json.dumps(query_result['data'][:5], indent=2, ensure_ascii=False)
    
    messages = [
        {
            "role": "system",
            "content": """你是一个数据分析师。用户用自然语言提问,你已经执行了 SQL 查询并得到了结果。
请将结果转化为用户容易理解的中文回答。要求:
1. 先总结核心发现(1-2句话)
2. 列出关键数据点
3. 给出简短的业务建议
4. 如果数据量较大,只展示前几条样例"""
        },
        {
            "role": "user",
            "content": f"""用户问题:{user_query}

查询结果(前5条):
{data_summary}

{query_result['row_count']} 条记录

请用自然语言回答用户的问题:"""
        }
    ]
    
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=messages,
        temperature=0.3,
        max_tokens=1000
    )
    
    return response.choices[0].message.content

第五步:完整的对话循环

def chat_loop():
    """交互式对话循环"""
    print("=" * 60)
    print("🤖 DuckDB 智能数据问答助手")
    print("=" * 60)
    print("输入你的问题(输入 'quit' 退出)")
    print()
    
    schema_info = get_table_schema(con)
    engine = SafeQueryEngine(con)
    
    while True:
        user_query = input("💬 你: ").strip()
        if user_query.lower() in ('quit', 'exit', 'q'):
            print("再见!👋")
            break
        
        # Step 1: 生成 SQL
        print("\n⏳ 正在生成查询...")
        sql = generate_sql(user_query, schema_info)
        print(f"   SQL: {sql}")
        
        if not sql:
            print("❌ 无法理解您的问题,请换一种方式描述。")
            continue
        
        # Step 2: 执行查询
        print("⏳ 正在执行查询...")
        query_result = engine.execute_query(sql)
        
        if not query_result['success']:
            print(f"❌ {query_result['error']}")
            continue
        
        # Step 3: 生成自然语言回答
        print("⏳ 正在生成回答...")
        answer = generate_answer(user_query, query_result, schema_info)
        print(f"\n🤖 助手: {answer}")
        print()
        print("-" * 60)

# 启动对话
if __name__ == "__main__":
    chat_loop()

四、进阶:封装成 Web 服务

如果你想把这个能力提供给多人使用,只需加一层 FastAPI 封装:

from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(title="DuckDB 数据问答 API")

class QueryRequest(BaseModel):
    question: str

@app.post("/ask")
def ask_question(req: QueryRequest):
    sql = generate_sql(req.question, schema_info)
    result = engine.execute_query(sql)
    answer = generate_answer(req.question, result, schema_info)
    return {"answer": answer}

启动后,任何前端(网页、微信小程序、飞书机器人)都能调用这个 API。


五、变现指南:如何把这套系统卖出去?

场景 1:中小企业内部数据看板

目标客户:日订单量 100-1000 单的电商卖家、连锁餐饮品牌。

卖点:“让店长不用等分析师,自己就能查数据。”

定价:一次性交付 8000-15000 元 + 每月 500-1000 元维护费。

交付周期:1-2 天(数据准备好后)。

场景 2:金融研报自动化

目标客户:券商研究所、基金公司的初级分析师。

卖点:“把重复性的数据整理工作交给 AI,分析师专注于逻辑判断。”

定价:企业级授权 30000-80000 元/年。

场景 3:SaaS 数据问答产品

卖点:面向没有数据团队的中小企业,按席位收费。

定价:99-299 元/月/用户。


六、关键优化技巧

1. 缓存高频查询

from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_query(sql_hash, sql_result_json):
    pass

2. 使用 DuckDB 的并行执行

# 开启多线程加速
con.execute("SET threads TO 4")

3. 复杂查询的分步执行

对于多表 JOIN 的复杂查询,可以拆分成多个简单查询,降低 LLM 出错概率:

def complex_query_pipeline(user_query, schema_info):
    """多步查询分解"""
    # 第一步:让 LLM 决定需要哪些表和字段
    plan = generate_query_plan(user_query, schema_info)
    # 第二步:逐步执行每个子查询
    # 第三步:组合结果生成最终回答
    ...

七、总结

用 DuckDB 搭建 AI 数据问答系统的核心优势:

  1. 零运维:进程内数据库,不需要部署任何服务
  2. 高性能:向量化执行,百万级数据毫秒级响应
  3. 易嵌入:Python/R/Node.js 都有原生绑定
  4. 格式兼容:直接读取 CSV/Parquet/JSON/Excel,无需数据清洗

这套系统的 MVP 你可以在一个下午搭出来,然后拿去见客户谈单。数据产品化,从来不是技术问题,而是"你能不能把数据变成别人看得懂的语言"。


📖 本文的完整版已发布在 duckdblab.org,包含更详细的步骤、错误排查指南和部署模板。想系统学习 DuckDB 实战?duckdblab.org 上有完整的教程系列,从入门到企业级部署全覆盖。

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计

⚠️ 本站为独立社区项目,与 DuckDB 基金会及 DuckDB 官方项目无任何从属、背书或赞助关系。

"DuckDB" 是 DuckDB 基金会的注册商标,本站仅以事实描述方式使用该名称。

本站内容仅供教育与社区推广用途,不构成任何商业服务。