DuckDB MCP Server 全栈指南:让 AI 智能体直连你的数据
TL;DR:Model Context Protocol (MCP) 是 AI 智能体与外部数据源的标准化接口。本文从零构建一个生产级的 DuckDB MCP Server,让 Claude 等 AI 智能体通过标准协议实时查询数据库、探索数据结构和生成分析报告。
什么是 MCP?
Model Context Protocol (MCP) 是 Anthropic 推出的开放标准,让 AI 应用程序能够安全地连接外部数据源。你可以把它理解为 “AI 的 USB-C 接口”——一种标准化协议,让任何 AI 智能体都能与任何数据源通信。
┌─────────────────────────────────────────────────┐
│ MCP 架构 │
│ │
│ [AI 智能体] ──MCP 协议──> [MCP Server] │
│ (Claude 等) (DuckDB + 工具) │
│ │
│ 标准化: │
│ • 工具定义 │
│ • 资源访问 │
│ • 提示模板 │
│ • 安全认证 │
└─────────────────────────────────────────────────┘
前置条件
安装依赖
pip install mcp duckdb fastapi uvicorn pydantic
环境配置
export DUCKDB_PATH="./analytics.duckdb"
export MCP_HOST="0.0.0.0"
export MCP_PORT=8080
export API_KEY="your-api-key"
第一步:基础 MCP Server
服务器初始化
from mcp.server.fastmcp import FastMCP
import duckdb
import os
# 初始化 MCP 服务器
mcp = FastMCP(
name="duckdb-analytics",
version="1.0.0",
description="DuckDB 分析服务器,供 AI 智能体使用"
)
# 全局 DuckDB 连接
db_path = os.getenv("DUCKDB_PATH", "./analytics.duckdb")
con = duckdb.connect(db_path)
# 健康检查端点
@mcp.tool()
def health_check() -> dict:
"""检查 DuckDB 服务器的健康状态。"""
result = con.execute("SELECT 1 as healthy").fetchone()
return {
"status": "healthy" if result[0] == 1 else "unhealthy",
"database": db_path,
"duckdb_version": con.execute("SELECT version()").fetchone()[0]
}
if __name__ == "__main__":
mcp.run()
启动服务器
# 启动 MCP 服务器
python server.py
# 或使用 uvicorn 用于生产环境
uvicorn server:app --host 0.0.0.0 --port 8080
第二步:分析工具
查询执行工具
from pydantic import BaseModel, Field
from typing import Optional
class QueryRequest(BaseModel):
sql: str = Field(..., description="要执行的 SQL 查询")
limit: Optional[int] = Field(1000, description="返回的最大行数")
format: Optional[str] = Field("json", description="输出格式: json, csv, markdown")
@mcp.tool()
def execute_query(request: QueryRequest) -> dict:
"""在 DuckDB 数据库中执行 SQL 查询。"""
try:
# 验证查询(防止危险操作)
if any(keyword in request.sql.upper() for keyword in ['DROP', 'DELETE', 'TRUNCATE']):
return {"error": "不允许执行危险操作"}
# 执行查询并限制结果
result = con.execute(f"{request.sql} LIMIT {request.limit}")
columns = [desc[0] for desc in result.description]
rows = result.fetchall()
# 转换为请求的格式
if request.format == "csv":
import io
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(columns)
writer.writerows(rows)
return {"data": output.getvalue()}
elif request.format == "markdown":
markdown = "| " + " | ".join(columns) + " |\n"
markdown += "| " + " | ".join(["---"] * len(columns)) + " |\n"
for row in rows:
markdown += "| " + " | ".join(str(v) for v in row) + " |\n"
return {"data": markdown}
else: # JSON
return {
"columns": columns,
"data": [dict(zip(columns, row)) for row in rows],
"row_count": len(rows)
}
except Exception as e:
return {"error": str(e)}
模式发现工具
class SchemaRequest(BaseModel):
table_pattern: Optional[str] = Field(None, description="按模式过滤表")
@mcp.tool()
def get_schema(request: SchemaRequest) -> dict:
"""发现数据库模式和表结构。"""
try:
# 获取所有表
tables_query = """
SELECT table_name, table_schema, table_type
FROM information_schema.tables
WHERE table_schema NOT IN ('pg_catalog', 'information_schema')
ORDER BY table_name
"""
if request.table_pattern:
tables = con.execute(
tables_query.replace("ORDER BY table_name",
f"AND table_name LIKE '%{request.table_pattern}%' ORDER BY table_name")
).fetchall()
else:
tables = con.execute(tables_query).fetchall()
# 获取每个表的列详细信息
schema = {}
for table in tables:
table_name = table[0]
columns = con.execute(f"""
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = '{table_name}'
ORDER BY ordinal_position
""").fetchall()
schema[table_name] = {
"columns": [
{"name": col[0], "type": col[1], "nullable": col[2] == 'YES'}
for col in columns
],
"row_count": con.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()[0]
}
return {"tables": len(schema), "schema": schema}
except Exception as e:
return {"error": str(e)}
数据画像工具
class ProfileRequest(BaseModel):
table_name: str = Field(..., description="要画像的表")
columns: Optional[list] = Field(None, description="特定要画像的列")
@mcp.tool()
def profile_table(request: ProfileRequest) -> dict:
"""生成表的统计画像。"""
try:
# 获取列统计信息
columns = request.columns or con.execute(f"""
SELECT column_name FROM information_schema.columns
WHERE table_name = '{request.table_name}'
""").fetchall()
profile = {}
for col_tuple in columns:
col_name = col_tuple[0] if isinstance(col_tuple, tuple) else col_tuple
stats = con.execute(f"""
SELECT
COUNT(*) as total_rows,
COUNT(DISTINCT "{col_name}") as distinct_values,
SUM(CASE WHEN "{col_name}" IS NULL THEN 1 ELSE 0 END) as null_count,
MIN("{col_name}") as min_value,
MAX("{col_name}") as max_value,
AVG("{col_name}") as avg_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY "{col_name}") as median,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY "{col_name}") as p95
FROM {request.table_name}
""").fetchone()
profile[col_name] = {
"total_rows": stats[0],
"distinct_values": stats[1],
"null_count": stats[2],
"null_percentage": round(stats[2] / stats[0] * 100, 2) if stats[0] > 0 else 0,
"min": stats[3],
"max": stats[4],
"avg": stats[5],
"median": stats[6],
"p95": stats[7]
}
return {"table": request.table_name, "profile": profile}
except Exception as e:
return {"error": str(e)}
第三步:资源访问
定义数据资源
from mcp.types import Resource, TextResourceContents
@mcp.resource("duckdb://orders/schema")
def get_orders_schema() -> str:
"""订单表的模式定义。"""
return """
表: orders
列:
- order_id: VARCHAR (主键)
- customer_id: VARCHAR
- order_date: TIMESTAMP
- amount: DECIMAL(10,2)
- status: VARCHAR
- payment_method: VARCHAR
"""
@mcp.resource("duckdb://products/schema")
def get_products_schema() -> str:
"""产品表的模式定义。"""
return """
表: products
列:
- product_id: VARCHAR (主键)
- name: VARCHAR
- category: VARCHAR
- price: DECIMAL(10,2)
- stock_quantity: INTEGER
"""
@mcp.resource("duckdb://analytics/dashboard")
def get_dashboard_data() -> str:
"""仪表板的聚合分析数据。"""
result = con.execute("""
SELECT
DATE_TRUNC('month', order_date) as month,
COUNT(*) as total_orders,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM orders
GROUP BY 1
ORDER BY 1 DESC
LIMIT 12
""").fetchall()
return str(result)
第四步:提示模板
创建可复用的提示
@mcp.prompt()
def analyze_sales_trends() -> str:
"""分析销售趋势并识别模式。"""
return """
分析销售数据并提供以下洞察:
1. 月度收入趋势
2. 表现最好的产品类别
3. 按购买频率的客户细分
4. 销售模式中的异常
使用以下 SQL 查询收集数据:
- SELECT DATE_TRUNC('month', order_date), SUM(amount) FROM orders GROUP BY 1
- SELECT category, SUM(amount) FROM orders JOIN products ON ... GROUP BY 1
- SELECT customer_id, COUNT(*) as order_count FROM orders GROUP BY 1
"""
@mcp.prompt()
def generate_executive_report() -> str:
"""生成高管摘要报告。"""
return """
生成一份简洁的高管报告,涵盖:
- 关键绩效指标 (KPI)
- 环比增长率
- 按收入排名的前五名产品
- 客户留存指标
- 改进建议
报告保持在 500 字以内,使用要点提高可读性。
"""
第五步:认证与安全
API 密钥认证
from functools import wraps
import hmac
def require_api_key(f):
"""装饰器,强制执行 API 密钥认证。"""
@wraps(f)
async def wrapper(*args, **kwargs):
api_key = os.getenv("API_KEY")
if not api_key:
return {"error": "API 密钥未配置"}
# 在生产环境中,使用适当的 JWT 或 OAuth2
# 这是一个简化的示例
return f(*args, **kwargs)
return wrapper
@mcp.tool()
@require_api_key
def secure_query(request: QueryRequest) -> dict:
"""执行安全的 SQL 查询。"""
# 这里进行额外的安全检查
return execute_query(request)
查询验证
def validate_sql(sql: str) -> bool:
"""验证 SQL 查询的安全性。"""
dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'GRANT']
sql_upper = sql.upper()
for keyword in dangerous_keywords:
if keyword in sql_upper:
return False
# 检查 SQL 注入尝试
if any(char in sql for char in [';', '--', '/*', '*/']):
return False
return True
第六步:测试服务器
测试查询
# 测试健康检查
curl http://localhost:8080/tools/health_check
# 测试模式发现
curl http://localhost:8080/tools/get_schema \
-d '{"table_pattern": "orders"}'
# 测试查询执行
curl http://localhost:8080/tools/execute_query \
-d '{
"sql": "SELECT category, SUM(amount) as revenue FROM orders JOIN products ON orders.product_id = products.id GROUP BY 1 ORDER BY 2 DESC LIMIT 10",
"limit": 100,
"format": "json"
}'
# 测试数据画像
curl http://localhost:8080/tools/profile_table \
-d '{"table_name": "orders"}'
与 Claude 集成
# 在 Claude Code 或 Claude Desktop 中
# 添加 MCP 服务器配置:
{
"mcpServers": {
"duckdb-analytics": {
"command": "python",
"args": ["/path/to/server.py"],
"env": {
"DUCKDB_PATH": "/path/to/analytics.duckdb"
}
}
}
}
第七步:生产部署
Docker 设置
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 8080
CMD ["uvicorn", "server:app", "--host", "0.0.0.0", "--port", "8080"]
Kubernetes 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: duckdb-mcp-server
spec:
replicas: 1
selector:
matchLabels:
app: duckdb-mcp
template:
metadata:
labels:
app: duckdb-mcp
spec:
containers:
- name: mcp-server
image: duckdb-mcp:latest
ports:
- containerPort: 8080
env:
- name: DUCKDB_PATH
value: "/data/analytics.duckdb"
- name: API_KEY
valueFrom:
secretKeyRef:
name: duckdb-secret
key: api-key
volumeMounts:
- name: data-volume
mountPath: /data
volumes:
- name: data-volume
persistentVolumeClaim:
claimName: duckdb-pvc
结论
使用 DuckDB 构建 MCP 服务器使 AI 智能体能够:
- 自然查询数据 — 使用 SQL 或自然语言
- 发现模式 — 自动理解数据结构
- 分析数据 — 按需获取统计洞察
- 生成报告 — 自动化分析工作流
这为 AI 驱动的分析创建了一个强大的基础,可以从原型设计扩展到生产环境。
对于生产部署,请务必实施适当的认证、输入验证和监控。