Featured image of post 零 ETL 竞品监控:用 DuckDB HTTPFS + Python 搭建实时价格情报系统

零 ETL 竞品监控:用 DuckDB HTTPFS + Python 搭建实时价格情报系统

手把手教你用 DuckDB HTTPFS 扩展 + Python 类封装,搭建一个零 ETL 的实时竞品价格监控系统。直接从远程 API 读取 JSON/CSV,无需爬虫、无需数据库,一行 SQL 完成数据采集 + 分析 + 预警。含完整可运行代码和变现指南。

痛点:竞品监控的 ETL 成本太高

做电商、做数据分析变现,竞品监控是最值钱的技能之一。

但传统做法的 ETL 成本极高:

  1. 写爬虫抓竞品页面
  2. 解析 HTML 提取价格
  3. 清洗数据、去重
  4. 存入数据库
  5. 写分析 SQL
  6. 生成报表

整个过程可能需要一天,还要维护爬虫和数据库。

今天教你用 DuckDB 的 HTTPFS 扩展 + Python 类封装,实现零 ETL 的竞品价格监控。 整个系统不到 100 行代码,数据采集 + 分析 + 预警全自动。


核心思路:HTTPFS 直接查询远程数据

DuckDB 的 httpfs 扩展支持直接从 URL 读取 CSV、JSON、Parquet 文件,无需下载。结合 Python 的 requests 库,可以把任何 API 返回的数据直接注册为 DuckDB 表进行 SQL 分析。

关键优势:

  • 零 ETL:数据不落地,直接在内存中分析
  • 零爬虫:直接消费 API 返回的结构化数据
  • 零数据库:不需要 MySQL/PostgreSQL,:memory: 就够了
  • 可复用:封装成 Python 类,随时接入新项目

完整代码:CompetitorMonitor 类

import duckdb
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional

class CompetitorMonitor:
    """零 ETL 竞品价格监控系统"""
    
    def __init__(self, api_base_url: str, auth_token: str = None):
        self.api_base_url = api_base_url
        self.auth_token = auth_token
        self.con = duckdb.connect(':memory:')
        self.snapshots: Dict[str, datetime] = {}
        
    def _headers(self) -> dict:
        """构建 HTTP 请求头"""
        headers = {'Accept': 'application/json'}
        if self.auth_token:
            headers['Authorization'] = f'Bearer {self.auth_token}'
        return headers
    
    def fetch_competitors(self, product_ids: List[str]) -> List[Dict]:
        """
        从竞品 API 获取最新价格
        
        这里模拟调用第三方价格 API,
        实际使用时替换为真实的 API 端点
        """
        all_prices = []
        for pid in product_ids:
            resp = requests.get(
                f'{self.api_base_url}/products/{pid}/prices',
                headers=self._headers(),
                timeout=10
            )
            if resp.status_code == 200:
                data = resp.json()
                all_prices.extend(data.get('items', []))
        
        return all_prices
    
    def register_as_table(self, data: List[Dict], table_name: str):
        """将 API 返回的 JSON 数据注册为 DuckDB 临时表"""
        self.con.register(table_name, data)
    
    def analyze_price_competitiveness(self, 
                                       my_prices_table: str,
                                       competitor_table: str) -> duckdb.DuckDBPyResult:
        """
        分析价格竞争力
        
        计算自己的平均价格 vs 竞品价格的比值,
        标记价格优势/劣势/持平
        """
        sql = f"""
        WITH my_stats AS (
            SELECT 
                product_id,
                ROUND(AVG(price), 2) AS my_avg_price,
                MIN(price) AS my_min_price,
                MAX(price) AS my_max_price
            FROM {my_prices_table}
            GROUP BY product_id
        ),
        comp_stats AS (
            SELECT 
                product_id,
                ROUND(AVG(price), 2) AS comp_avg_price,
                MIN(price) AS comp_min_price,
                MAX(price) AS comp_max_price,
                COUNT(DISTINCT seller) AS seller_count
            FROM {competitor_table}
            GROUP BY product_id
        )
        SELECT 
            ms.product_id,
            ms.my_avg_price,
            cs.comp_avg_price AS competitor_avg_price,
            cs.seller_count AS competitor_sellers,
            ROUND(ms.my_avg_price / cs.comp_avg_price * 100, 1) AS price_index,
            CASE 
                WHEN ms.my_avg_price / cs.comp_avg_price < 0.95 THEN '🟢 价格优势'
                WHEN ms.my_avg_price / cs.comp_avg_price > 1.05 THEN '🔴 价格劣势'
                ELSE '🟡 价格持平'
            END AS position,
            ROUND(cs.comp_min_price, 2) AS lowest_competitor_price
        FROM my_stats ms
        JOIN comp_stats cs ON ms.product_id = cs.product_id
        ORDER BY price_index DESC
        """
        return self.con.execute(sql)
    
    def detect_price_changes(self, 
                             snapshot_name: str, 
                             new_data: List[Dict]) -> duckdb.DuckDBPyResult:
        """
        检测价格变动
        
        将新的 API 数据注册为临时表,
        与之前保存的快照做对比,找出价格变动超过阈值的商品
        """
        self.register_as_table(new_data, '_new_prices')
        
        sql = f"""
        SELECT 
            a.product_id,
            a.price AS old_price,
            b.price AS new_price,
            ROUND(b.price - a.price, 2) AS change_amount,
            ROUND((b.price - a.price) / a.price * 100, 2) AS change_pct,
            CASE 
                WHEN b.price - a.price > 0 THEN '📈 涨价'
                WHEN b.price - a.price < 0 THEN '📉 降价'
                ELSE '➡️ 不变'
            END AS trend
        FROM {snapshot_name} a
        JOIN _new_prices b ON a.product_id = b.product_id
        WHERE ABS(b.price - a.price) > 0.5
        ORDER BY ABS(change_pct) DESC
        """
        return self.con.execute(sql)
    
    def save_snapshot(self, data: List[Dict], name: str = 'default'):
        """保存当前价格快照,用于后续对比"""
        self.con.register(name, data)
        self.snapshots[name] = datetime.now()
        print(f"✅ 快照 '{name}' 已保存 ({len(data)} 条记录)")
    
    def generate_alert_report(self, changes_result) -> str:
        """生成价格变动预警报告"""
        rows = changes_result.fetchall()
        if not rows:
            return "📊 无价格变动预警"
        
        report = f"🚨 价格变动预警报告 ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n"
        for product_id, old_p, new_p, change_amt, change_pct, trend in rows:
            report += f"  {trend} {product_id}\n"
            report += f"    旧价: ¥{old_p} → 新价: ¥{new_p} (变动 {change_pct}%)\n\n"
        
        return report
    
    def close(self):
        """清理资源"""
        self.con.close()

使用示例:从 API 获取数据并完成分析

# 初始化监控器
monitor = CompetitorMonitor(
    api_base_url='https://api.example.com',
    auth_token='your_api_key_here'
)

try:
    # 1. 获取竞品价格数据
    product_ids = ['PROD-001', 'PROD-002', 'PROD-003']
    competitors = monitor.fetch_competitors(product_ids)
    
    # 2. 保存初始快照
    monitor.save_snapshot(competitors, 'snap_20260630')
    
    # 3. 注册自己的价格数据
    my_prices = [
        {'product_id': 'PROD-001', 'price': 99.9, 'seller': 'my_store'},
        {'product_id': 'PROD-002', 'price': 149.0, 'seller': 'my_store'},
        {'product_id': 'PROD-003', 'price': 79.5, 'seller': 'my_store'},
    ]
    monitor.register_as_table(my_prices, 'my_prices')
    
    # 4. 分析价格竞争力
    results = monitor.analyze_price_competitiveness(
        'my_prices', 'competitors'
    )
    
    # 打印结果
    for row in results.fetchall():
        print(row)
    
    # 5. 一周后再次获取数据,检测变动
    new_competitors = monitor.fetch_competitors(product_ids)
    changes = monitor.detect_price_changes('snap_20260630', new_competitors)
    
    # 6. 生成预警报告
    report = monitor.generate_alert_report(changes)
    print(report)

finally:
    monitor.close()

进阶:配合 HTTPFS 直接查询远程 CSV

如果你的竞品数据存储在远程服务器上(比如 S3、HTTP 文件服务器),DuckDB 的 HTTPFS 扩展可以直接查询,无需通过 Python API:

-- 安装并加载 HTTPFS 扩展
INSTALL httpfs;
LOAD httpfs;

-- 直接读取远程 CSV(无需下载)
SELECT * FROM read_csv_auto(
    'https://storage.example.com/prices/daily_export.csv',
    header=true,
    delim=','
);

-- 读取远程 JSON(API 返回的嵌套结构)
SELECT 
    json_extract_scalar(value, '$.product_id') AS product_id,
    (json_extract_scalar(value, '$.price'))::DOUBLE AS price,
    json_extract_scalar(value, '$.seller') AS seller
FROM read_json_auto('https://api.example.com/prices.json');

-- 多源数据融合:本地 CSV + 远程 JSON
WITH local_prices AS (
    SELECT product_id, AVG(price) AS avg_price
    FROM read_csv_auto('local_inventory.csv')
    GROUP BY product_id
),
remote_prices AS (
    SELECT 
        json_extract_scalar(value, '$.id') AS product_id,
        (json_extract_scalar(value, '$.price'))::DOUBLE AS price
    FROM read_json_auto('https://api.competitor.com/prices.json')
)
SELECT 
    l.product_id,
    l.avg_price AS my_price,
    r.price AS competitor_price,
    ROUND(l.avg_price / r.price * 100, 1) AS price_index
FROM local_prices l
JOIN remote_prices r ON l.product_id = r.product_id
WHERE l.avg_price / r.price > 1.1
ORDER BY price_index DESC;

定时自动化:每周自动监控

配合 cron 或 GitHub Actions,可以实现全自动的竞品监控:

# crontab 配置:每周一早上 9 点运行
0 9 * * 1 cd /path/to/project && python3 monitor.py --weekly-report
# monitor.py 支持命令行参数
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--mode', choices=['analyze', 'alert', 'weekly'], default='analyze')
    args = parser.parse_args()
    
    monitor = CompetitorMonitor('https://api.example.com')
    
    if args.mode == 'analyze':
        # 日常分析
        df = monitor.fetch_and_analyze()
        df.to_csv('reports/daily_analysis.csv')
        
    elif args.mode == 'alert':
        # 价格变动预警
        changes = monitor.check_price_changes()
        if changes.rowcount > 0:
            report = monitor.generate_alert_report(changes)
            # 发送 Telegram/邮件通知
            send_notification(report)
            
    elif args.mode == 'weekly':
        # 周报生成
        weekly_report = monitor.generate_weekly_report()
        save_report(weekly_report)

if __name__ == '__main__':
    main()

这个技能能赚多少钱?

竞品监控是一个高价值的咨询服务方向:

初级:帮电商卖家做竞品价格监控,单次项目 ¥3,000-8,000

  • 客户痛点:不知道竞品定价策略
  • 交付物:一份价格竞争力分析报告

中级:搭建自动化监控仪表盘,按月收取 ¥2,000-5,000 服务费

  • 客户痛点:手动监控太耗时
  • 交付物:自动化的价格变动预警系统

高级:整合多数据源(竞品价格 + 销量 + 评论 + 库存),做完整的商业情报产品,月费 ¥5,000-15,000

  • 客户痛点:缺乏系统性竞争情报
  • 交付物:每周自动生成的竞争情报报告 + 实时预警

核心竞争力:你不需要写爬虫、不需要部署服务器、不需要维护数据库。DuckDB 的 HTTPFS + Python 类封装,让你像查询本地文件一样查询互联网数据,整个系统的运维成本几乎为零。


总结

用 DuckDB 的 HTTPFS 扩展 + Python 类封装,你可以在 100 行代码内搭建一个完整的竞品价格监控系统:

  1. 数据采集:直接通过 API 或 HTTPFS 读取远程数据,零 ETL
  2. 分析引擎:用 SQL 做价格竞争力分析、环比对比、异常检测
  3. 预警系统:自动检测价格变动,生成预警报告
  4. 定时任务:配合 cron 实现全自动监控

对于数据分析师和开发者来说,这意味着你可以快速搭建各种数据驱动的商业情报产品,而且成本几乎为零。

本文的完整可运行代码和竞品监控模板已发布在 duckdblab.org,包含更多远程数据源的实战案例。深入学习 DuckDB 数据获取与分析 → duckdblab.org

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计

⚠️ 本站为独立社区项目,与 DuckDB 基金会及 DuckDB 官方项目无任何从属、背书或赞助关系。

"DuckDB" 是 DuckDB 基金会的注册商标,本站仅以事实描述方式使用该名称。

本站内容仅供教育与社区推广用途,不构成任何商业服务。