Featured image of post DuckDB 搭建 A 股板块轮动监控系统:量化分析师的数据变现利器

DuckDB 搭建 A 股板块轮动监控系统:量化分析师的数据变现利器

用 DuckDB + akshare + SQL 窗口函数搭建一个完整的 A 股行业板块轮动监控系统,从数据采集、动量计算到自动推送报告。50 行 SQL 实现专业量化策略,可包装为月费 99 元的 SaaS 产品。

为什么板块轮动监控是一个可以赚钱的项目?

A 股市场有一条铁律:没有永远上涨的板块,只有不断轮动的结构行情。2024-2026 年的数据显示,申万一级行业的季度收益率极差高达 40% 以上——选对板块比选对个股更重要。

市面上同类的板块轮动 SaaS 工具月费在 299-999 元不等。而用 DuckDB + 免费数据源(akshare),你可以在一个小时内搭建出功能更强、更定制化的私有版本。不仅能自己搞量化交易参考,还能包装成数据产品卖给客户。

本文将从零开始,带你搭建一个完整的自动化板块轮动监控系统。


系统架构概览

整个系统的核心流程如下:

数据采集(akshare) → DuckDB本地存储 → SQL指标计算 → 信号生成 → 自动推送

所有组件都运行在你的 Linux 服务器上(或你手头的任何一台机器),通过 cron 定时调度,完全自动化。


第一步:环境准备与数据采集

安装依赖

pip install duckdb akshare pandas

akshare 是一个免费开源的 A 股数据接口库,无需 API Key,无需付费。

获取行业板块列表并写入 DuckDB

打开 DuckDB 客户端或者创建 Python 脚本。我们先连接数据库,获取申万一级行业分类的所有板块:

import akshare as ak
import duckdb
import pandas as pd
from datetime import datetime, timedelta

# 连接 DuckDB(自动创建本地文件数据库)
con = duckdb.connect("sector_monitor.db")

# 获取申万一级行业列表
sector_df = ak.stock_board_industry_name_em()
print(f"监测 {len(sector_df)} 个行业板块")

# 取最近 90 个自然日的数据(约 60 个交易日)
end_date = datetime.now().strftime("%Y%m%d")
start_date = (datetime.now() - timedelta(days=90)).strftime("%Y%m%d")

# 创建主表
con.execute("""
    CREATE TABLE IF NOT EXISTS sector_daily (
        日期 DATE,
        sector VARCHAR,
        开盘 DOUBLE,
        收盘 DOUBLE,
        最高 DOUBLE,
        最低 DOUBLE,
        成交量 BIGINT,
        成交额 BIGINT,
        振幅 DOUBLE,
        涨跌幅 DOUBLE,
        涨跌额 DOUBLE,
        换手率 DOUBLE
    )
""")

for _, row in sector_df.head(5).iterrows():  # 先测试前5个板块
    sector_name = row["板块名称"]
    try:
        df = ak.stock_board_industry_hist_em(
            symbol=sector_name,
            start_date=start_date,
            end_date=end_date,
            period="daily",
            adjust="qfq"
        )
        if df.empty:
            continue
        df["sector"] = sector_name
        
        # 将 DataFrame 注册为临时表并写入
        con.register("df_tmp", df)
        con.execute("""
            INSERT INTO sector_daily 
            SELECT 日期, sector, 开盘, 收盘, 最高, 最低, 
                   成交量, 成交额, 振幅, 涨跌幅, 涨跌额, 换手率
            FROM df_tmp
        """)
        print(f"  ✓ {sector_name}: {len(df)} 条记录")
    except Exception as e:
        print(f"  ✗ {sector_name}: {e}")

💡 提示:完整采集所有 31 个申万一级行业约需 2 分钟。生产环境中建议将数据采集和指标计算分离,采集放在盘后定时执行。


第二步:用 SQL 计算核心动量指标

数据入库后,真正的分析才开始。我们用 DuckDB 的窗口函数计算三个核心指标:

指标含义计算方式
5日动量短期趋势过去5个交易日的累计收益率
20日动量中期趋势过去20个交易日的累计收益率(约1个月)
60日动量长期趋势过去60个交易日的累计收益率(约1个季度)
-- 建立板块动量视图
CREATE OR REPLACE VIEW sector_momentum AS
WITH daily_return AS (
    SELECT
        sector,
        日期,
        (收盘 - LAG(收盘) OVER (PARTITION BY sector ORDER BY 日期)) 
            / NULLIF(LAG(收盘) OVER (PARTITION BY sector ORDER BY 日期), 0) AS daily_ret
    FROM sector_daily
),
momentum AS (
    SELECT
        sector,
        MAX(日期) AS latest_date,
        -- 5日累计收益(使用几何累乘更精确)
        EXP(SUM(LN(1 + COALESCE(daily_ret, 0))) 
            OVER (PARTITION BY sector ORDER BY 日期 
                  ROWS BETWEEN 4 PRECEDING AND CURRENT ROW)) - 1 AS ret_5d,
        -- 20日累计收益
        EXP(SUM(LN(1 + COALESCE(daily_ret, 0))) 
            OVER (PARTITION BY sector ORDER BY 日期 
                  ROWS BETWEEN 19 PRECEDING AND CURRENT ROW)) - 1 AS ret_20d,
        -- 60日累计收益
        EXP(SUM(LN(1 + COALESCE(daily_ret, 0))) 
            OVER (PARTITION BY sector ORDER BY 日期 
                  ROWS BETWEEN 59 PRECEDING AND CURRENT ROW)) - 1 AS ret_60d,
        -- 20日平均成交额(判断资金活跃度)
        AVG(成交额) OVER (PARTITION BY sector ORDER BY 日期 
                          ROWS BETWEEN 19 PRECEDING AND CURRENT ROW) AS avg_volume_20d
    FROM daily_return
)
SELECT DISTINCT
    sector,
    ret_5d,
    ret_20d,
    ret_60d,
    avg_volume_20d,
    -- 动量综合评分:短期趋势权重最高
    ret_5d * 0.5 + ret_20d * 0.3 + ret_60d * 0.2 AS momentum_score
FROM momentum
WHERE 日期 = (SELECT MAX(日期) FROM daily_return)
ORDER BY momentum_score DESC;

为什么用几何累乘而不是简单相加?

假设一个板块昨天涨 10%,今天跌 10%。简单相加的收益率为 0%,但实际收益率为 (1+0.1)×(1-0.1)-1 = -1%。几何累乘(使用 LNEXP)精确计算了复利效应,尤其在高波动行情下差异显著。

DuckDB 的窗口函数性能在这里体现得淋漓尽致——31 个板块 × 60 个交易日 ≈ 1860 行数据,毫秒级完成全部计算。如果是 MySQL 5.7,同样的窗口函数写法(OVER (PARTITION BY ...))根本跑不起来。


第三步:生成交易信号

有了动量评分,接下来就是信号生成。我们将所有板块按动量排名,生成买入/持有/卖出信号:

-- 排名与信号生成
CREATE OR REPLACE VIEW sector_signals AS
WITH ranked AS (
    SELECT *,
        ROW_NUMBER() OVER (ORDER BY momentum_score DESC) AS rank_asc,
        ROW_NUMBER() OVER (ORDER BY momentum_score ASC) AS rank_desc
    FROM sector_momentum
)
SELECT
    sector,
    ROUND(ret_5d * 100, 2) AS ret_5d_pct,
    ROUND(ret_20d * 100, 2) AS ret_20d_pct,
    ROUND(momentum_score * 100, 2) AS score,
    ROUND(avg_volume_20d / 1e8, 1) AS avg_amount_yi,  -- 单位:亿元
    CASE 
        WHEN rank_asc <= 5 THEN '🔥 强势领涨'
        WHEN rank_asc <= 15 THEN '⚡ 动量偏强'
        WHEN rank_desc <= 5 THEN '❄️ 弱势回避'
        ELSE '➡️ 中性'
    END AS signal,
    CASE
        WHEN rank_asc <= 5 AND avg_volume_20d > 1e10 THEN '买入'  -- 强势 + 放量
        WHEN rank_desc <= 5 THEN '卖出/回避'
        ELSE '持有/观望'
    END AS action
FROM ranked
ORDER BY rank_asc;

这里的关键逻辑是:

  • 买入信号:动量排名前 5 + 20 日均成交额大于 100 亿(量价配合)
  • 卖出信号:动量排名倒数前 5
  • 持有信号:中间板块,保持观望

第四步:自动生成推送报告

这是让系统真正产生价值的一步。我们用 DuckDB 直接拼接出推送文本,然后通过 Telegram Bot / 飞书 Webhook / 邮件发送给用户:

# 用 DuckDB 直接生成报告文本
report = con.execute("""
SELECT 
    strftime(current_date, '%Y-%m-%d') || ' A股板块轮动日报' AS title,
    '---' AS sep1,
    '🔥 强势板块 Top 5:' AS section1,
    string_agg(
        '  ' || sector || ' | 5日: ' || ret_5d_pct || '% | 动量分: ' || score || ' | 信号: ' || action,
        chr(10)
    ) FILTER (WHERE rank_asc <= 5) AS top_sectors,
    '---' AS sep2,
    '❄️ 弱势板块 Top 5:' AS section2,
    string_agg(
        '  ' || sector || ' | 20日: ' || ret_20d_pct || '% | 信号: ' || action,
        chr(10)
    ) FILTER (WHERE rank_desc <= 5) AS bottom_sectors,
    '---' AS sep3,
    '💡 操作建议:' AS section3,
    CASE
        WHEN count(*) FILTER (WHERE action = '买入') >= 3 
        THEN '市场情绪偏乐观,关注强势板块回调后的二次入场机会'
        WHEN count(*) FILTER (WHERE action = '买入') = 0 
        THEN '无明确买入信号,建议观望或关注逆势抗跌板块'
        ELSE '结构性行情,关注动量持续居前的板块'
    END AS advice
FROM sector_signals
""").fetchone()

report_text = '\n'.join([str(r) for r in report if r])
print(report_text)

推送示例输出

2026-05-31 A股板块轮动日报
---
🔥 强势板块 Top 5:
  计算机 | 5日: 3.25% | 动量分: 2.18 | 信号: 买入
  电子 | 5日: 2.87% | 动量分: 1.95 | 信号: 买入
  通信 | 5日: 2.12% | 动量分: 1.56 | 信号: 买入
  传媒 | 5日: 1.89% | 动量分: 1.32 | 信号: 持有/观望
  国防军工 | 5日: 1.65% | 动量分: 1.08 | 信号: 持有/观望
---
❄️ 弱势板块 Top 5:
  房地产 | 20日: -4.23% | 信号: 卖出/回避
  建筑材料 | 20日: -3.87% | 信号: 卖出/回避
  美容护理 | 20日: -3.12% | 信号: 卖出/回避
  食品饮料 | 20日: -2.56% | 信号: 卖出/回避
  农林牧渔 | 20日: -2.01% | 信号: 卖出/回避
---
💡 操作建议:
市场情绪偏乐观,关注强势板块回调后的二次入场机会

推送代码示例(Telegram)

def send_telegram(bot_token, chat_id, text):
    import requests
    url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
    requests.post(url, json={"chat_id": chat_id, "text": text})

将上面的报告文本塞进去,配合 cron 定时执行,每天开盘前自动推送到你的付费群,就是一个完整的订阅产品。


第五步:部署与运维

创建调度脚本 run_sector_monitor.sh

#!/bin/bash
cd /path/to/project
python3 collect_data.py     # 数据采集
python3 compute_signals.py  # 指标计算
python3 send_report.py      # 推送报告

在 crontab 中配置定时执行:

# 每天下午 15:30(收盘后)采集数据
30 15 * * 1-5 /path/to/run_sector_monitor.sh >> /var/log/sector_monitor.log 2>&1
# 每天早上 08:30(开盘前)推送报告
30 8 * * 1-5 /path/to/send_report.py >> /var/log/sector_push.log 2>&1

DuckDB 为什么是板块轮动分析的最佳选择?

在整个项目中,我们深刻体会到 DuckDB 的几个核心优势:

1. 零配置即用pip install duckdb 到跑出第一条 SQL 结果,不到 30 秒。不需要搭 MySQL/PostgreSQL 服务器,不需要配置连接串,数据库就是一个文件。

2. 窗口函数全支持 板块轮动的核心就是窗口函数计算——LAG 算日收益率,SUM OVER ROWS BETWEEN 算累计收益,ROW_NUMBER 做排名。DuckDB 对标 PostgreSQL 的 SQL 语法支持度,比 MySQL 5.7 和 SQLite 强大得多。

3. 向量化执行引擎 1860 行数据 + 窗口函数,计算时间 < 0.1 秒。换成 Pandas 做同样的操作,数据量大 10 倍时内存就开始报警了。DuckDB 的向量化执行按列处理,内存效率更高。

4. 与 Python 生态无缝集成 con.register("df_tmp", df) 这一行代码,让 Pandas DataFrame 和 DuckDB 表之间零拷贝互通。你用 akshare 抓到的数据直接注册为临时表,一条 SQL 插进去就行。


进阶变现思路

基础版的板块轮动监控已经是完整的产品。但要卖高价,你可以在此基础上叠加以下功能:

1. 多因子评分系统

在动量模型之上,加入更多因子:

CREATE OR REPLACE VIEW multi_factor_score AS
SELECT 
    m.sector,
    m.momentum_score * 0.3 +      -- 动量因子
    v.volume_change * 0.2 +       -- 成交量变化因子
    p.price_stability * 0.2 +     -- 价格稳定性因子
    r.relative_strength * 0.3     -- 相对强度因子
    AS composite_score
FROM sector_momentum m
JOIN sector_volume v USING (sector)
JOIN sector_stability p USING (sector)
JOIN sector_rel_strength r USING (sector);

2. 历史回测引擎

用 DuckDB 快速验证策略有效性:

-- 回测:每周调仓,买入动量前 3 的板块
WITH weekly_rank AS (
    SELECT 
        日期,
        sector,
        momentum_score,
        ROW_NUMBER() OVER (PARTITION BY 日期 ORDER BY momentum_score DESC) AS rnk
    FROM sector_daily_momentum
    WHERE dayofweek(日期) = 5  -- 每周五调仓
)
SELECT 
    sector,
    COUNT(*) AS hold_weeks,
    AVG(ret_20d) AS avg_return,
    STDDEV(ret_20d) AS volatility,
    AVG(ret_20d) / NULLIF(STDDEV(ret_20d), 0) AS sharpe_ratio
FROM weekly_rank
WHERE rnk <= 3
GROUP BY sector
ORDER BY sharpe_ratio DESC;

30 秒就能跑完 3 年的历史回测数据——这在传统数据库中可能要数分钟。

3. SaaS 化部署方案

价格方案:
- 基础版(99 元/月):每日板块轮动推送 + Top/Bottom 5
- 专业版(299 元/月):基础版 + 多因子评分 + 个股筛选
- 企业版(999 元/月):专业版 + 历史回测报告 + 定制因子

一台最低配的云服务器(2 核 4G,月费约 50 元),可以轻松支撑 100 个用户的每日推送。纯利润空间巨大。

4. 更多数据源扩展

数据源用途接入方式
北向资金外资流向分析akshare.stock_hsgt_north_net_flow_in_em
龙虎榜游资动向akshare.stock_lhb_yy_em
融资融券杠杆情绪akshare.stock_margin_detail_szse
股指期货基差市场情绪akshare.futures_main_sina

用 DuckDB 的跨表 JOIN,所有这些数据源的信号可以一键融合成一个综合评分。


总结

本文从零到一搭建了一个完整的 A 股板块轮动监控系统。核心代码不到 200 行,其中 SQL 只占 50 行,却完成了整个量化策略的计算引擎。

这个项目非常适合作为 DuckDB 实战的练手项目——技能点覆盖了:数据采集(Python + akshare)、数据存储(DuckDB本地文件)、分析计算(SQL窗口函数)、自动化调度(cron)、信息推送(Telegram/飞书API)。

更关键的是,它天然具备变现属性。市面上价值 299 元/月的 SaaS 工具,你用 DuckDB 一个下午就能搭建出竞品级别的功能。这就是数据分析师用 DuckDB 变现的最小可行产品。


📖 本文的完整工程代码(含推送模块、多因子扩展、历史回测脚本和 Docker 部署方案)已发布在 duckdblab.org,包含更详细的部署步骤,可直接上生产使用。

📺 Watch video tutorials → DuckDB Lab YouTube

Subscribe for more DuckDB & AI automation tutorials

使用 Hugo 构建
主题 StackJimmy 设计