DuckDB MCP Server 全栈指南:让 AI 智能体直连你的数据

DuckDB MCP Server 全栈指南:让 AI 智能体直连你的数据

TL;DR:Model Context Protocol (MCP) 是 AI 智能体与外部数据源的标准化接口。本文从零构建一个生产级的 DuckDB MCP Server,让 Claude 等 AI 智能体通过标准协议实时查询数据库、探索数据结构和生成分析报告。


什么是 MCP?

Model Context Protocol (MCP) 是 Anthropic 推出的开放标准,让 AI 应用程序能够安全地连接外部数据源。你可以把它理解为 “AI 的 USB-C 接口”——一种标准化协议,让任何 AI 智能体都能与任何数据源通信。

┌─────────────────────────────────────────────────┐
│              MCP 架构                             │
│                                                  │
│  [AI 智能体] ──MCP 协议──> [MCP Server]          │
│  (Claude 等)              (DuckDB + 工具)        │
│                                                  │
│  标准化:                                       │
│  • 工具定义                                      │
│  • 资源访问                                      │
│  • 提示模板                                      │
│  • 安全认证                                      │
└─────────────────────────────────────────────────┘

前置条件

安装依赖

pip install mcp duckdb fastapi uvicorn pydantic

环境配置

export DUCKDB_PATH="./analytics.duckdb"
export MCP_HOST="0.0.0.0"
export MCP_PORT=8080
export API_KEY="your-api-key"

第一步:基础 MCP Server

服务器初始化

from mcp.server.fastmcp import FastMCP
import duckdb
import os

# 初始化 MCP 服务器
mcp = FastMCP(
    name="duckdb-analytics",
    version="1.0.0",
    description="DuckDB 分析服务器,供 AI 智能体使用"
)

# 全局 DuckDB 连接
db_path = os.getenv("DUCKDB_PATH", "./analytics.duckdb")
con = duckdb.connect(db_path)

# 健康检查端点
@mcp.tool()
def health_check() -> dict:
    """检查 DuckDB 服务器的健康状态。"""
    result = con.execute("SELECT 1 as healthy").fetchone()
    return {
        "status": "healthy" if result[0] == 1 else "unhealthy",
        "database": db_path,
        "duckdb_version": con.execute("SELECT version()").fetchone()[0]
    }

if __name__ == "__main__":
    mcp.run()

启动服务器

# 启动 MCP 服务器
python server.py

# 或使用 uvicorn 用于生产环境
uvicorn server:app --host 0.0.0.0 --port 8080

第二步:分析工具

查询执行工具

from pydantic import BaseModel, Field
from typing import Optional

class QueryRequest(BaseModel):
    sql: str = Field(..., description="要执行的 SQL 查询")
    limit: Optional[int] = Field(1000, description="返回的最大行数")
    format: Optional[str] = Field("json", description="输出格式: json, csv, markdown")

@mcp.tool()
def execute_query(request: QueryRequest) -> dict:
    """在 DuckDB 数据库中执行 SQL 查询。"""
    try:
        # 验证查询(防止危险操作)
        if any(keyword in request.sql.upper() for keyword in ['DROP', 'DELETE', 'TRUNCATE']):
            return {"error": "不允许执行危险操作"}
        
        # 执行查询并限制结果
        result = con.execute(f"{request.sql} LIMIT {request.limit}")
        columns = [desc[0] for desc in result.description]
        rows = result.fetchall()
        
        # 转换为请求的格式
        if request.format == "csv":
            import io
            output = io.StringIO()
            writer = csv.writer(output)
            writer.writerow(columns)
            writer.writerows(rows)
            return {"data": output.getvalue()}
        
        elif request.format == "markdown":
            markdown = "| " + " | ".join(columns) + " |\n"
            markdown += "| " + " | ".join(["---"] * len(columns)) + " |\n"
            for row in rows:
                markdown += "| " + " | ".join(str(v) for v in row) + " |\n"
            return {"data": markdown}
        
        else:  # JSON
            return {
                "columns": columns,
                "data": [dict(zip(columns, row)) for row in rows],
                "row_count": len(rows)
            }
    
    except Exception as e:
        return {"error": str(e)}

模式发现工具

class SchemaRequest(BaseModel):
    table_pattern: Optional[str] = Field(None, description="按模式过滤表")

@mcp.tool()
def get_schema(request: SchemaRequest) -> dict:
    """发现数据库模式和表结构。"""
    try:
        # 获取所有表
        tables_query = """
            SELECT table_name, table_schema, table_type
            FROM information_schema.tables
            WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
            ORDER BY table_name
        """
        
        if request.table_pattern:
            tables = con.execute(
                tables_query.replace("ORDER BY table_name", 
                                   f"AND table_name LIKE '%{request.table_pattern}%' ORDER BY table_name")
            ).fetchall()
        else:
            tables = con.execute(tables_query).fetchall()
        
        # 获取每个表的列详细信息
        schema = {}
        for table in tables:
            table_name = table[0]
            columns = con.execute(f"""
                SELECT column_name, data_type, is_nullable
                FROM information_schema.columns
                WHERE table_name = '{table_name}'
                ORDER BY ordinal_position
            """).fetchall()
            
            schema[table_name] = {
                "columns": [
                    {"name": col[0], "type": col[1], "nullable": col[2] == 'YES'}
                    for col in columns
                ],
                "row_count": con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
            }
        
        return {"tables": len(schema), "schema": schema}
    
    except Exception as e:
        return {"error": str(e)}

数据画像工具

class ProfileRequest(BaseModel):
    table_name: str = Field(..., description="要画像的表")
    columns: Optional[list] = Field(None, description="特定要画像的列")

@mcp.tool()
def profile_table(request: ProfileRequest) -> dict:
    """生成表的统计画像。"""
    try:
        # 获取列统计信息
        columns = request.columns or con.execute(f"""
            SELECT column_name FROM information_schema.columns
            WHERE table_name = '{request.table_name}'
        """).fetchall()
        
        profile = {}
        for col_tuple in columns:
            col_name = col_tuple[0] if isinstance(col_tuple, tuple) else col_tuple
            
            stats = con.execute(f"""
                SELECT 
                    COUNT(*) as total_rows,
                    COUNT(DISTINCT "{col_name}") as distinct_values,
                    SUM(CASE WHEN "{col_name}" IS NULL THEN 1 ELSE 0 END) as null_count,
                    MIN("{col_name}") as min_value,
                    MAX("{col_name}") as max_value,
                    AVG("{col_name}") as avg_value,
                    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "{col_name}") as median,
                    PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY "{col_name}") as p95
                FROM {request.table_name}
            """).fetchone()
            
            profile[col_name] = {
                "total_rows": stats[0],
                "distinct_values": stats[1],
                "null_count": stats[2],
                "null_percentage": round(stats[2] / stats[0] * 100, 2) if stats[0] > 0 else 0,
                "min": stats[3],
                "max": stats[4],
                "avg": stats[5],
                "median": stats[6],
                "p95": stats[7]
            }
        
        return {"table": request.table_name, "profile": profile}
    
    except Exception as e:
        return {"error": str(e)}

第三步:资源访问

定义数据资源

from mcp.types import Resource, TextResourceContents

@mcp.resource("duckdb://orders/schema")
def get_orders_schema() -> str:
    """订单表的模式定义。"""
    return """
    表: orders
    列:
    - order_id: VARCHAR (主键)
    - customer_id: VARCHAR
    - order_date: TIMESTAMP
    - amount: DECIMAL(10,2)
    - status: VARCHAR
    - payment_method: VARCHAR
    """

@mcp.resource("duckdb://products/schema")
def get_products_schema() -> str:
    """产品表的模式定义。"""
    return """
    表: products
    列:
    - product_id: VARCHAR (主键)
    - name: VARCHAR
    - category: VARCHAR
    - price: DECIMAL(10,2)
    - stock_quantity: INTEGER
    """

@mcp.resource("duckdb://analytics/dashboard")
def get_dashboard_data() -> str:
    """仪表板的聚合分析数据。"""
    result = con.execute("""
        SELECT 
            DATE_TRUNC('month', order_date) as month,
            COUNT(*) as total_orders,
            SUM(amount) as total_revenue,
            AVG(amount) as avg_order_value
        FROM orders
        GROUP BY 1
        ORDER BY 1 DESC
        LIMIT 12
    """).fetchall()
    
    return str(result)

第四步:提示模板

创建可复用的提示

@mcp.prompt()
def analyze_sales_trends() -> str:
    """分析销售趋势并识别模式。"""
    return """
    分析销售数据并提供以下洞察:
    1. 月度收入趋势
    2. 表现最好的产品类别
    3. 按购买频率的客户细分
    4. 销售模式中的异常
    
    使用以下 SQL 查询收集数据:
    - SELECT DATE_TRUNC('month', order_date), SUM(amount) FROM orders GROUP BY 1
    - SELECT category, SUM(amount) FROM orders JOIN products ON ... GROUP BY 1
    - SELECT customer_id, COUNT(*) as order_count FROM orders GROUP BY 1
    """

@mcp.prompt()
def generate_executive_report() -> str:
    """生成高管摘要报告。"""
    return """
    生成一份简洁的高管报告,涵盖:
    - 关键绩效指标 (KPI)
    - 环比增长率
    - 按收入排名的前五名产品
    - 客户留存指标
    - 改进建议
    
    报告保持在 500 字以内,使用要点提高可读性。
    """

第五步:认证与安全

API 密钥认证

from functools import wraps
import hmac

def require_api_key(f):
    """装饰器,强制执行 API 密钥认证。"""
    @wraps(f)
    async def wrapper(*args, **kwargs):
        api_key = os.getenv("API_KEY")
        if not api_key:
            return {"error": "API 密钥未配置"}
        
        # 在生产环境中,使用适当的 JWT 或 OAuth2
        # 这是一个简化的示例
        return f(*args, **kwargs)
    return wrapper

@mcp.tool()
@require_api_key
def secure_query(request: QueryRequest) -> dict:
    """执行安全的 SQL 查询。"""
    # 这里进行额外的安全检查
    return execute_query(request)

查询验证

def validate_sql(sql: str) -> bool:
    """验证 SQL 查询的安全性。"""
    dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'GRANT']
    sql_upper = sql.upper()
    
    for keyword in dangerous_keywords:
        if keyword in sql_upper:
            return False
    
    # 检查 SQL 注入尝试
    if any(char in sql for char in [';', '--', '/*', '*/']):
        return False
    
    return True

第六步:测试服务器

测试查询

# 测试健康检查
curl http://localhost:8080/tools/health_check

# 测试模式发现
curl http://localhost:8080/tools/get_schema \
  -d '{"table_pattern": "orders"}'

# 测试查询执行
curl http://localhost:8080/tools/execute_query \
  -d '{
    "sql": "SELECT category, SUM(amount) as revenue FROM orders JOIN products ON orders.product_id = products.id GROUP BY 1 ORDER BY 2 DESC LIMIT 10",
    "limit": 100,
    "format": "json"
  }'

# 测试数据画像
curl http://localhost:8080/tools/profile_table \
  -d '{"table_name": "orders"}'

与 Claude 集成

# 在 Claude Code 或 Claude Desktop 中
# 添加 MCP 服务器配置:
{
  "mcpServers": {
    "duckdb-analytics": {
      "command": "python",
      "args": ["/path/to/server.py"],
      "env": {
        "DUCKDB_PATH": "/path/to/analytics.duckdb"
      }
    }
  }
}

第七步:生产部署

Docker 设置

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 8080
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8080"]

Kubernetes 部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: duckdb-mcp-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: duckdb-mcp
  template:
    metadata:
      labels:
        app: duckdb-mcp
    spec:
      containers:
      - name: mcp-server
        image: duckdb-mcp:latest
        ports:
        - containerPort: 8080
        env:
        - name: DUCKDB_PATH
          value: "/data/analytics.duckdb"
        - name: API_KEY
          valueFrom:
            secretKeyRef:
              name: duckdb-secret
              key: api-key
        volumeMounts:
        - name: data-volume
          mountPath: /data
      volumes:
      - name: data-volume
        persistentVolumeClaim:
          claimName: duckdb-pvc

结论

使用 DuckDB 构建 MCP 服务器使 AI 智能体能够:

  1. 自然查询数据 — 使用 SQL 或自然语言
  2. 发现模式 — 自动理解数据结构
  3. 分析数据 — 按需获取统计洞察
  4. 生成报告 — 自动化分析工作流

这为 AI 驱动的分析创建了一个强大的基础,可以从原型设计扩展到生产环境。


对于生产部署,请务必实施适当的认证、输入验证和监控。

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计

⚠️ 本站为独立社区项目,与 DuckDB 基金会及 DuckDB 官方项目无任何从属、背书或赞助关系。

"DuckDB" 是 DuckDB 基金会的注册商标,本站仅以事实描述方式使用该名称。

本站内容仅供教育与社区推广用途,不构成任何商业服务。