Featured image of post Zero-ETL Competitive Intelligence: Building Real-Time Price Monitoring with DuckDB HTTPFS + Python

Zero-ETL Competitive Intelligence: Building Real-Time Price Monitoring with DuckDB HTTPFS + Python

Learn how to build a real-time competitor price monitoring system using DuckDB's HTTPFS extension and Python class encapsulation. Directly query remote APIs without crawling, databases, or ETL pipelines. Includes complete runnable code and monetization guide.

The Pain Point: ETL Costs Kill Competitor Monitoring

In e-commerce and data consulting, competitor monitoring is one of the most valuable skills.

But the traditional approach has enormous ETL costs:

  1. Write scrapers to crawl competitor pages
  2. Parse HTML to extract prices
  3. Clean and deduplicate data
  4. Store in a database
  5. Write analysis SQL
  6. Generate reports

This whole process can take a day, plus ongoing maintenance of scrapers and databases.

Today I’ll show you how to build a zero-ETL competitor price monitoring system using DuckDB’s HTTPFS extension + Python class encapsulation. Under 100 lines of code, with automated data collection, analysis, and alerts.


Core Concept: Query Remote Data Directly with HTTPFS

DuckDB’s httpfs extension lets you read CSV, JSON, and Parquet files directly from URLs without downloading. Combined with Python’s requests library, you can register any API response directly as a DuckDB table for SQL analysis.

Key advantages:

  • Zero ETL: Analyze data in memory without writing to disk
  • Zero crawlers: Consume structured API data directly
  • Zero databases: No MySQL/PostgreSQL needed, :memory: is enough
  • Reusable: Wrap in a Python class for quick project reuse

Complete Code: The CompetitorMonitor Class

import duckdb
import requests
from datetime import datetime, timedelta
from typing import List, Dict, Optional

class CompetitorMonitor:
    """Zero-ETL Competitor Price Monitoring System"""
    
    def __init__(self, api_base_url: str, auth_token: str = None):
        self.api_base_url = api_base_url
        self.auth_token = auth_token
        self.con = duckdb.connect(':memory:')
        self.snapshots: Dict[str, datetime] = {}
        
    def _headers(self) -> dict:
        """Build HTTP request headers"""
        headers = {'Accept': 'application/json'}
        if self.auth_token:
            headers['Authorization'] = f'Bearer {self.auth_token}'
        return headers
    
    def fetch_competitors(self, product_ids: List[str]) -> List[Dict]:
        """
        Fetch latest prices from competitor API
        
        Replace with real API endpoints in production
        """
        all_prices = []
        for pid in product_ids:
            resp = requests.get(
                f'{self.api_base_url}/products/{pid}/prices',
                headers=self._headers(),
                timeout=10
            )
            if resp.status_code == 200:
                data = resp.json()
                all_prices.extend(data.get('items', []))
        
        return all_prices
    
    def register_as_table(self, data: List[Dict], table_name: str):
        """Register API JSON data as a DuckDB temporary table"""
        self.con.register(table_name, data)
    
    def analyze_price_competitiveness(self, 
                                       my_prices_table: str,
                                       competitor_table: str) -> duckdb.DuckDBPyResult:
        """
        Analyze price competitiveness
        
        Calculate ratio of our average price vs competitor prices,
        marking price advantages/disadvatures/equilibrium
        """
        sql = f"""
        WITH my_stats AS (
            SELECT 
                product_id,
                ROUND(AVG(price), 2) AS my_avg_price,
                MIN(price) AS my_min_price,
                MAX(price) AS my_max_price
            FROM {my_prices_table}
            GROUP BY product_id
        ),
        comp_stats AS (
            SELECT 
                product_id,
                ROUND(AVG(price), 2) AS comp_avg_price,
                MIN(price) AS comp_min_price,
                MAX(price) AS comp_max_price,
                COUNT(DISTINCT seller) AS seller_count
            FROM {competitor_table}
            GROUP BY product_id
        )
        SELECT 
            ms.product_id,
            ms.my_avg_price,
            cs.comp_avg_price AS competitor_avg_price,
            cs.seller_count AS competitor_sellers,
            ROUND(ms.my_avg_price / cs.comp_avg_price * 100, 1) AS price_index,
            CASE 
                WHEN ms.my_avg_price / cs.comp_avg_price < 0.95 THEN 'Price Advantage'
                WHEN ms.my_avg_price / cs.comp_avg_price > 1.05 THEN 'Price Disadvantage'
                ELSE 'Price Parity'
            END AS position,
            ROUND(cs.comp_min_price, 2) AS lowest_competitor_price
        FROM my_stats ms
        JOIN comp_stats cs ON ms.product_id = cs.product_id
        ORDER BY price_index DESC
        """
        return self.con.execute(sql)
    
    def detect_price_changes(self, 
                             snapshot_name: str, 
                             new_data: List[Dict]) -> duckdb.DuckDBPyResult:
        """
        Detect price changes
        
        Register new API data as a temp table,
        compare with saved snapshot to find products with price changes
        """
        self.register_as_table(new_data, '_new_prices')
        
        sql = f"""
        SELECT 
            a.product_id,
            a.price AS old_price,
            b.price AS new_price,
            ROUND(b.price - a.price, 2) AS change_amount,
            ROUND((b.price - a.price) / a.price * 100, 2) AS change_pct,
            CASE 
                WHEN b.price - a.price > 0 THEN 'Price Increase'
                WHEN b.price - a.price < 0 THEN 'Price Decrease'
                ELSE 'Unchanged'
            END AS trend
        FROM {snapshot_name} a
        JOIN _new_prices b ON a.product_id = b.product_id
        WHERE ABS(b.price - a.price) > 0.5
        ORDER BY ABS(change_pct) DESC
        """
        return self.con.execute(sql)
    
    def save_snapshot(self, data: List[Dict], name: str = 'default'):
        """Save current price snapshot for future comparison"""
        self.con.register(name, data)
        self.snapshots[name] = datetime.now()
        print(f"Snapshot '{name}' saved ({len(data)} records)")
    
    def generate_alert_report(self, changes_result) -> str:
        """Generate price change alert report"""
        rows = changes_result.fetchall()
        if not rows:
            return "No price change alerts"
        
        report = f"Price Change Alert Report ({datetime.now().strftime('%Y-%m-%d %H:%M')})\n\n"
        for product_id, old_p, new_p, change_amt, change_pct, trend in rows:
            report += f"  {trend} {product_id}\n"
            report += f"    Old: ${old_p} -> New: ${new_p} (change {change_pct}%)\n\n"
        
        return report
    
    def close(self):
        """Clean up resources"""
        self.con.close()

Usage Example: Fetch Data and Analyze

# Initialize the monitor
monitor = CompetitorMonitor(
    api_base_url='https://api.example.com',
    auth_token='your_api_key_here'
)

try:
    # 1. Fetch competitor price data
    product_ids = ['PROD-001', 'PROD-002', 'PROD-003']
    competitors = monitor.fetch_competitors(product_ids)
    
    # 2. Save initial snapshot
    monitor.save_snapshot(competitors, 'snap_20260630')
    
    # 3. Register our own price data
    my_prices = [
        {'product_id': 'PROD-001', 'price': 99.9, 'seller': 'my_store'},
        {'product_id': 'PROD-002', 'price': 149.0, 'seller': 'my_store'},
        {'product_id': 'PROD-003', 'price': 79.5, 'seller': 'my_store'},
    ]
    monitor.register_as_table(my_prices, 'my_prices')
    
    # 4. Analyze price competitiveness
    results = monitor.analyze_price_competitiveness(
        'my_prices', 'competitors'
    )
    
    # Print results
    for row in results.fetchall():
        print(row)
    
    # 5. One week later: fetch new data and detect changes
    new_competitors = monitor.fetch_competitors(product_ids)
    changes = monitor.detect_price_changes('snap_20260630', new_competitors)
    
    # 6. Generate alert report
    report = monitor.generate_alert_report(changes)
    print(report)

finally:
    monitor.close()

Advanced: Direct Remote CSV Query with HTTPFS

If your competitor data is stored on a remote server (S3, HTTP file server), DuckDB’s HTTPFS extension can query it directly without Python API:

-- Install and load HTTPFS extension
INSTALL httpfs;
LOAD httpfs;

-- Read remote CSV directly (no download needed)
SELECT * FROM read_csv_auto(
    'https://storage.example.com/prices/daily_export.csv',
    header=true,
    delim=','
);

-- Read remote JSON (nested API responses)
SELECT 
    json_extract_scalar(value, '$.product_id') AS product_id,
    (json_extract_scalar(value, '$.price'))::DOUBLE AS price,
    json_extract_scalar(value, '$.seller') AS seller
FROM read_json_auto('https://api.competitor.com/prices.json');

-- Multi-source fusion: local CSV + remote JSON
WITH local_prices AS (
    SELECT product_id, AVG(price) AS avg_price
    FROM read_csv_auto('local_inventory.csv')
    GROUP BY product_id
),
remote_prices AS (
    SELECT 
        json_extract_scalar(value, '$.id') AS product_id,
        (json_extract_scalar(value, '$.price'))::DOUBLE AS price
    FROM read_json_auto('https://api.competitor.com/prices.json')
)
SELECT 
    l.product_id,
    l.avg_price AS my_price,
    r.price AS competitor_price,
    ROUND(l.avg_price / r.price * 100, 1) AS price_index
FROM local_prices l
JOIN remote_prices r ON l.product_id = r.product_id
WHERE l.avg_price / r.price > 1.1
ORDER BY price_index DESC;

Automation: Weekly Scheduled Monitoring

With cron or GitHub Actions, achieve fully automated competitor monitoring:

# crontab: Run every Monday at 9 AM
0 9 * * 1 cd /path/to/project && python3 monitor.py --weekly-report
# monitor.py with CLI support
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--mode', choices=['analyze', 'alert', 'weekly'], default='analyze')
    args = parser.parse_args()
    
    monitor = CompetitorMonitor('https://api.example.com')
    
    if args.mode == 'analyze':
        # Daily analysis
        df = monitor.fetch_and_analyze()
        df.to_csv('reports/daily_analysis.csv')
        
    elif args.mode == 'alert':
        # Price change alerts
        changes = monitor.check_price_changes()
        if changes.rowcount > 0:
            report = monitor.generate_alert_report(changes)
            send_notification(report)  # Telegram / Email
            
    elif args.mode == 'weekly':
        # Weekly report generation
        weekly_report = monitor.generate_weekly_report()
        save_report(weekly_report)

if __name__ == '__main__':
    main()

How Much Can You Earn With This?

Competitor monitoring is a high-value consulting service:

Entry Level: Help e-commerce sellers with competitor price monitoring, ¥3,000-8,000 per project

  • Client pain point: Don’t know competitor pricing strategies
  • Deliverable: A price competitiveness analysis report

Mid Level: Build automated monitoring dashboards, ¥2,000-5,000/month retainer

  • Client pain point: Manual monitoring is too time-consuming
  • Deliverable: Automated price change alert system

Advanced Level: Integrate multiple data sources (competitor prices + sales volume + reviews + inventory) for a complete business intelligence product, ¥5,000-15,000/month

  • Client pain point: Lack of systematic competitive intelligence
  • Deliverable: Auto-generated weekly competitive intelligence reports + real-time alerts

Your core competitiveness: You don’t need to write crawlers, deploy servers, or maintain databases. DuckDB’s HTTPFS + Python class encapsulation lets you query internet data like local files, with near-zero operational costs.


Summary

With DuckDB’s HTTPFS extension + Python class encapsulation, you can build a complete competitor price monitoring system in under 100 lines of code:

  1. Data Collection: Read remote data directly via API or HTTPFS, zero ETL
  2. Analysis Engine: Use SQL for price competitiveness analysis, period-over-period comparison, anomaly detection
  3. Alert System: Automatically detect price changes and generate alert reports
  4. Scheduled Tasks: Fully automated monitoring with cron

For data analysts and developers, this means you can quickly build various data-driven business intelligence products at near-zero cost.

The complete runnable code and competitor monitoring template are published on duckdblab.org, with more remote data source practical cases. Learn more DuckDB practical experience → duckdblab.org

📺 Watch video tutorials → Olap Studio YouTube

Subscribe for more DuckDB & AI automation tutorials

Built with Hugo
Theme Stack designed by Jimmy

⚠️ This site is an independent community project, not affiliated with, endorsed by, or sponsored by the DuckDB Foundation or official DuckDB project.

"DuckDB" is a registered trademark of the DuckDB Foundation. This site uses the name solely for factual description purposes.

All content is for educational and community promotion purposes only and does not constitute any commercial service.