痛点:竞品监控的 ETL 成本太高
做电商、做数据分析变现,竞品监控是最值钱的技能之一。
但传统做法的 ETL 成本极高:
- 写爬虫抓竞品页面
- 解析 HTML 提取价格
- 清洗数据、去重
- 存入数据库
- 写分析 SQL
- 生成报表
整个过程可能需要一天,还要维护爬虫和数据库。
今天教你用 DuckDB 的 HTTPFS 扩展 + Python 类封装,实现零 ETL 的竞品价格监控。 整个系统不到 100 行代码,数据采集 + 分析 + 预警全自动。
核心思路:HTTPFS 直接查询远程数据
DuckDB 的 httpfs 扩展支持直接从 URL 读取 CSV、JSON、Parquet 文件,无需下载。结合 Python 的 requests 库,可以把任何 API 返回的数据直接注册为 DuckDB 表进行 SQL 分析。
关键优势:
- 零 ETL:数据不落地,直接在内存中分析
- 零爬虫:直接消费 API 返回的结构化数据
- 零数据库:不需要 MySQL/PostgreSQL,
:memory:就够了 - 可复用:封装成 Python 类,随时接入新项目
完整代码:CompetitorMonitor 类
import duckdb
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional
class CompetitorMonitor:
"""零 ETL 竞品价格监控系统"""
def __init__(self, api_base_url: str, auth_token: str = None):
self.api_base_url = api_base_url
self.auth_token = auth_token
self.con = duckdb.connect(':memory:')
self.snapshots: Dict[str, datetime] = {}
def _headers(self) -> dict:
"""构建 HTTP 请求头"""
headers = {'Accept': 'application/json'}
if self.auth_token:
headers['Authorization'] = f'Bearer {self.auth_token}'
return headers
def fetch_competitors(self, product_ids: List[str]) -> List[Dict]:
"""
从竞品 API 获取最新价格
这里模拟调用第三方价格 API,
实际使用时替换为真实的 API 端点
"""
all_prices = []
for pid in product_ids:
resp = requests.get(
f'{self.api_base_url}/products/{pid}/prices',
headers=self._headers(),
timeout=10
)
if resp.status_code == 200:
data = resp.json()
all_prices.extend(data.get('items', []))
return all_prices
def register_as_table(self, data: List[Dict], table_name: str):
"""将 API 返回的 JSON 数据注册为 DuckDB 临时表"""
self.con.register(table_name, data)
def analyze_price_competitiveness(self,
my_prices_table: str,
competitor_table: str) -> duckdb.DuckDBPyResult:
"""
分析价格竞争力
计算自己的平均价格 vs 竞品价格的比值,
标记价格优势/劣势/持平
"""
sql = f"""
WITH my_stats AS (
SELECT
product_id,
ROUND(AVG(price), 2) AS my_avg_price,
MIN(price) AS my_min_price,
MAX(price) AS my_max_price
FROM {my_prices_table}
GROUP BY product_id
),
comp_stats AS (
SELECT
product_id,
ROUND(AVG(price), 2) AS comp_avg_price,
MIN(price) AS comp_min_price,
MAX(price) AS comp_max_price,
COUNT(DISTINCT seller) AS seller_count
FROM {competitor_table}
GROUP BY product_id
)
SELECT
ms.product_id,
ms.my_avg_price,
cs.comp_avg_price AS competitor_avg_price,
cs.seller_count AS competitor_sellers,
ROUND(ms.my_avg_price / cs.comp_avg_price * 100, 1) AS price_index,
CASE
WHEN ms.my_avg_price / cs.comp_avg_price < 0.95 THEN '🟢 价格优势'
WHEN ms.my_avg_price / cs.comp_avg_price > 1.05 THEN '🔴 价格劣势'
ELSE '🟡 价格持平'
END AS position,
ROUND(cs.comp_min_price, 2) AS lowest_competitor_price
FROM my_stats ms
JOIN comp_stats cs ON ms.product_id = cs.product_id
ORDER BY price_index DESC
"""
return self.con.execute(sql)
def detect_price_changes(self,
snapshot_name: str,
new_data: List[Dict]) -> duckdb.DuckDBPyResult:
"""
检测价格变动
将新的 API 数据注册为临时表,
与之前保存的快照做对比,找出价格变动超过阈值的商品
"""
self.register_as_table(new_data, '_new_prices')
sql = f"""
SELECT
a.product_id,
a.price AS old_price,
b.price AS new_price,
ROUND(b.price - a.price, 2) AS change_amount,
ROUND((b.price - a.price) / a.price * 100, 2) AS change_pct,
CASE
WHEN b.price - a.price > 0 THEN '📈 涨价'
WHEN b.price - a.price < 0 THEN '📉 降价'
ELSE '➡️ 不变'
END AS trend
FROM {snapshot_name} a
JOIN _new_prices b ON a.product_id = b.product_id
WHERE ABS(b.price - a.price) > 0.5
ORDER BY ABS(change_pct) DESC
"""
return self.con.execute(sql)
def save_snapshot(self, data: List[Dict], name: str = 'default'):
"""保存当前价格快照,用于后续对比"""
self.con.register(name, data)
self.snapshots[name] = datetime.now()
print(f"✅ 快照 '{name}' 已保存 ({len(data)} 条记录)")
def generate_alert_report(self, changes_result) -> str:
"""生成价格变动预警报告"""
rows = changes_result.fetchall()
if not rows:
return "📊 无价格变动预警"
report = f"🚨 价格变动预警报告 ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n"
for product_id, old_p, new_p, change_amt, change_pct, trend in rows:
report += f" {trend} {product_id}\n"
report += f" 旧价: ¥{old_p} → 新价: ¥{new_p} (变动 {change_pct}%)\n\n"
return report
def close(self):
"""清理资源"""
self.con.close()
使用示例:从 API 获取数据并完成分析
# 初始化监控器
monitor = CompetitorMonitor(
api_base_url='https://api.example.com',
auth_token='your_api_key_here'
)
try:
# 1. 获取竞品价格数据
product_ids = ['PROD-001', 'PROD-002', 'PROD-003']
competitors = monitor.fetch_competitors(product_ids)
# 2. 保存初始快照
monitor.save_snapshot(competitors, 'snap_20260630')
# 3. 注册自己的价格数据
my_prices = [
{'product_id': 'PROD-001', 'price': 99.9, 'seller': 'my_store'},
{'product_id': 'PROD-002', 'price': 149.0, 'seller': 'my_store'},
{'product_id': 'PROD-003', 'price': 79.5, 'seller': 'my_store'},
]
monitor.register_as_table(my_prices, 'my_prices')
# 4. 分析价格竞争力
results = monitor.analyze_price_competitiveness(
'my_prices', 'competitors'
)
# 打印结果
for row in results.fetchall():
print(row)
# 5. 一周后再次获取数据,检测变动
new_competitors = monitor.fetch_competitors(product_ids)
changes = monitor.detect_price_changes('snap_20260630', new_competitors)
# 6. 生成预警报告
report = monitor.generate_alert_report(changes)
print(report)
finally:
monitor.close()
进阶:配合 HTTPFS 直接查询远程 CSV
如果你的竞品数据存储在远程服务器上(比如 S3、HTTP 文件服务器),DuckDB 的 HTTPFS 扩展可以直接查询,无需通过 Python API:
-- 安装并加载 HTTPFS 扩展
INSTALL httpfs;
LOAD httpfs;
-- 直接读取远程 CSV(无需下载)
SELECT * FROM read_csv_auto(
'https://storage.example.com/prices/daily_export.csv',
header=true,
delim=','
);
-- 读取远程 JSON(API 返回的嵌套结构)
SELECT
json_extract_scalar(value, '$.product_id') AS product_id,
(json_extract_scalar(value, '$.price'))::DOUBLE AS price,
json_extract_scalar(value, '$.seller') AS seller
FROM read_json_auto('https://api.example.com/prices.json');
-- 多源数据融合:本地 CSV + 远程 JSON
WITH local_prices AS (
SELECT product_id, AVG(price) AS avg_price
FROM read_csv_auto('local_inventory.csv')
GROUP BY product_id
),
remote_prices AS (
SELECT
json_extract_scalar(value, '$.id') AS product_id,
(json_extract_scalar(value, '$.price'))::DOUBLE AS price
FROM read_json_auto('https://api.competitor.com/prices.json')
)
SELECT
l.product_id,
l.avg_price AS my_price,
r.price AS competitor_price,
ROUND(l.avg_price / r.price * 100, 1) AS price_index
FROM local_prices l
JOIN remote_prices r ON l.product_id = r.product_id
WHERE l.avg_price / r.price > 1.1
ORDER BY price_index DESC;
定时自动化:每周自动监控
配合 cron 或 GitHub Actions,可以实现全自动的竞品监控:
# crontab 配置:每周一早上 9 点运行
0 9 * * 1 cd /path/to/project && python3 monitor.py --weekly-report
# monitor.py 支持命令行参数
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--mode', choices=['analyze', 'alert', 'weekly'], default='analyze')
args = parser.parse_args()
monitor = CompetitorMonitor('https://api.example.com')
if args.mode == 'analyze':
# 日常分析
df = monitor.fetch_and_analyze()
df.to_csv('reports/daily_analysis.csv')
elif args.mode == 'alert':
# 价格变动预警
changes = monitor.check_price_changes()
if changes.rowcount > 0:
report = monitor.generate_alert_report(changes)
# 发送 Telegram/邮件通知
send_notification(report)
elif args.mode == 'weekly':
# 周报生成
weekly_report = monitor.generate_weekly_report()
save_report(weekly_report)
if __name__ == '__main__':
main()
这个技能能赚多少钱?
竞品监控是一个高价值的咨询服务方向:
初级:帮电商卖家做竞品价格监控,单次项目 ¥3,000-8,000
- 客户痛点:不知道竞品定价策略
- 交付物:一份价格竞争力分析报告
中级:搭建自动化监控仪表盘,按月收取 ¥2,000-5,000 服务费
- 客户痛点:手动监控太耗时
- 交付物:自动化的价格变动预警系统
高级:整合多数据源(竞品价格 + 销量 + 评论 + 库存),做完整的商业情报产品,月费 ¥5,000-15,000
- 客户痛点:缺乏系统性竞争情报
- 交付物:每周自动生成的竞争情报报告 + 实时预警
核心竞争力:你不需要写爬虫、不需要部署服务器、不需要维护数据库。DuckDB 的 HTTPFS + Python 类封装,让你像查询本地文件一样查询互联网数据,整个系统的运维成本几乎为零。
总结
用 DuckDB 的 HTTPFS 扩展 + Python 类封装,你可以在 100 行代码内搭建一个完整的竞品价格监控系统:
- 数据采集:直接通过 API 或 HTTPFS 读取远程数据,零 ETL
- 分析引擎:用 SQL 做价格竞争力分析、环比对比、异常检测
- 预警系统:自动检测价格变动,生成预警报告
- 定时任务:配合 cron 实现全自动监控
对于数据分析师和开发者来说,这意味着你可以快速搭建各种数据驱动的商业情报产品,而且成本几乎为零。
本文的完整可运行代码和竞品监控模板已发布在 duckdblab.org,包含更多远程数据源的实战案例。深入学习 DuckDB 数据获取与分析 → duckdblab.org
