打分算法全面改进:修复方向性bug,引入自适应权重与分数动量

This commit is contained in:
fish
2026-05-10 15:52:03 +08:00
parent f2e4bf7041
commit 819b327cdb
6 changed files with 194 additions and 47 deletions

View File

@@ -1,4 +1,5 @@
tushare>=1.4.0
numpy>=2.0.0
pandas>=2.2.0
fastapi>=0.115.0
uvicorn[standard]>=0.34.0

View File

@@ -37,6 +37,9 @@ class RunResponse(BaseModel):
long_term: float
composite: float
signal: str
vol_penalty: float = 1.0
composite_delta: Optional[float] = None
composite_delta_5d: Optional[float] = None
@app.on_event("startup")
@@ -74,6 +77,9 @@ def run_pipeline(req: RunRequest):
long_term=result.long_term,
composite=result.composite,
signal=result.signal,
vol_penalty=result.vol_penalty,
composite_delta=result.composite_delta,
composite_delta_5d=result.composite_delta_5d,
)

View File

@@ -74,8 +74,24 @@ def run(ts_code: str, trade_date: Optional[str] = None) -> int:
print(f"\n[波动率调整]")
print(f" 日波动率(30d std): {vd['daily_vol_pct']*100:.2f}%")
print(f" ATR%: {vd['atr_pct']*100:.2f}%")
vol_risk = vd.get("vol_risk", vd["daily_vol_pct"])
print(f" 综合波动风险: {vol_risk*100:.2f}%")
print(f" 惩罚系数: {vd['vol_penalty']:.3f}")
aw = result.detail.adaptive_weights
if aw:
print(f"\n[自适应权重]")
print(f" 趋势强度: {aw['trend_strength']:.2f}")
print(f" 短期权重: {aw['w_short']:.2%} (基准 40%)")
print(f" 中期权重: {aw['w_medium']:.2%} (基准 35%)")
print(f" 长期权重: {aw['w_long']:.2%} (基准 25%)")
if result.composite_delta is not None:
print(f"\n[分数动量]")
print(f" 日变化 (Δ1d): {result.composite_delta:+.1f}")
if result.composite_delta_5d is not None:
print(f" 周变化 (Δ5d): {result.composite_delta_5d:+.1f}")
print(f"\n[OK] 数据已持久化到 PostgreSQL")
return 0
@@ -118,10 +134,12 @@ def run_range(ts_code: str, start_date: str, end_date: str) -> int:
"accumulation": "增仓上涨", "distribution": "增仓下跌",
"covering": "减仓上涨", "liquidation": "减仓下跌", "flat": "持仓持平",
}
print(f"\n{'日期':<12} {'收盘':>10} {'综合':>8} {'信号':<20}")
print("-" * 55)
print(f"\n{'日期':<12} {'收盘':>10} {'综合':>8} {'Δ1d':>6} {'Δ5d':>6} {'信号':<20}")
print("-" * 70)
for r in results:
print(f"{r.trade_date:<12} {r.close:>10.2f} {r.composite:>8.1f} {r.signal:<20}")
d1 = f"{r.composite_delta:+.1f}" if r.composite_delta is not None else " -"
d5 = f"{r.composite_delta_5d:+.1f}" if r.composite_delta_5d is not None else " -"
print(f"{r.trade_date:<12} {r.close:>10.2f} {r.composite:>8.1f} {d1:>6} {d5:>6} {r.signal:<20}")
print(f"\n[OK] {len(results)} 条打分已持久化到 PostgreSQL")
return 0

View File

@@ -23,6 +23,7 @@ class ScoreDetail:
medium_detail: dict = field(default_factory=dict)
long_detail: dict = field(default_factory=dict)
volatility: dict = field(default_factory=dict)
adaptive_weights: dict = field(default_factory=dict)
@dataclass
@@ -38,3 +39,6 @@ class ScoreResult:
composite: float
signal: str
detail: ScoreDetail
vol_penalty: float = 1.0
composite_delta: Optional[float] = None # 与前一日综合分差值
composite_delta_5d: Optional[float] = None # 与5日前综合分差值

View File

@@ -1,12 +1,18 @@
import math
from typing import Optional
import numpy as np
import pandas as pd
from .models import ScoreDetail, ScoreResult
# ---------------------------------------------------------------------------
# 短期动力 — 单日打分
# ---------------------------------------------------------------------------
def _daily_short_score(row: pd.Series, avg_vol_7d: float) -> dict:
"""单日短期动力打分(连续值 + 幅度因子 + 量能确认)。"""
"""单日短期动力打分(连续值 + 方向性幅度加成 + 量能确认)。"""
oi = float(row["oi"])
oi_chg = float(row["oi_chg"])
close = float(row["close"])
@@ -18,37 +24,42 @@ def _daily_short_score(row: pd.Series, avg_vol_7d: float) -> dict:
price_up = close >= pre_close
oi_increasing = oi_chg > 0
# 象限基础分
# ── 象限基础分 ──
if abs(oi_chg_pct) < 0.01:
base = 60.0 if price_up else 40.0
quadrant = "flat"
elif oi_increasing and price_up:
base = 75.0
quadrant = "accumulation"
quadrant = "accumulation" # 增仓上涨
elif oi_increasing and not price_up:
base = 25.0
quadrant = "distribution"
quadrant = "distribution" # 增仓下跌
elif not oi_increasing and price_up:
base = 65.0
quadrant = "covering"
quadrant = "covering" # 减仓上涨
else:
base = 20.0
quadrant = "liquidation"
quadrant = "liquidation" # 减仓下跌
# 幅度加成OI 变化率封顶 5%,价格涨跌幅封顶 3%
# ── 幅度加成(方向性)──
# OI 变化率封顶 5%,价格涨跌幅封顶 3%
oi_mag = min(1.0, abs(oi_chg_pct) / 0.05)
price_mag = min(1.0, abs(price_chg_pct) / 0.03)
if quadrant in ("accumulation", "liquidation"):
boost = (oi_mag + price_mag) / 2.0 * 20.0
if quadrant == "accumulation":
boost = (oi_mag + price_mag) / 2.0 * 20.0 # 看多,加成推高分数
elif quadrant == "liquidation":
boost = -(oi_mag + price_mag) / 2.0 * 20.0 # 看空,扣分强化信号
elif quadrant == "flat":
boost = price_mag * 10.0
else:
boost = 0.0
boost = price_mag * 10.0 if price_up else -(price_mag * 10.0)
elif quadrant == "distribution":
boost = -price_mag * 10.0 # 增仓下跌:价格幅度扣分
else: # covering
boost = price_mag * 10.0 # 减仓上涨:价格幅度加分
# 量能确认
# ── 量能确认ratio=1.0 时为中性因子 1.0)──
vol_ratio = vol / avg_vol_7d if avg_vol_7d > 0 else 1.0
vol_factor = 0.9 + 0.2 * min(vol_ratio, 1.5)
vol_factor = 0.8 + 0.2 * min(vol_ratio, 2.0) # 范围 [0.8, 1.2],中性 = 1.0
score = max(0.0, min(100.0, (base + boost) * vol_factor))
@@ -67,7 +78,12 @@ def _daily_short_score(row: pd.Series, avg_vol_7d: float) -> dict:
}
# ---------------------------------------------------------------------------
# 短期动力 — 7 日窗口,指数加权平均
# ---------------------------------------------------------------------------
def calc_short_term(df: pd.DataFrame, window: int = 7) -> tuple[float, list]:
"""近 window 日逐日打分,指数加权平均(越近权重越高)。"""
recent = df.iloc[-window:].copy()
avg_vol_7d = float(recent["vol"].mean()) if "vol" in recent.columns else 0.0
@@ -78,8 +94,17 @@ def calc_short_term(df: pd.DataFrame, window: int = 7) -> tuple[float, list]:
scores.append(detail["score"])
details.append(detail)
return sum(scores) / len(scores), details
# 指数加权:权重从 exp(0)=1 递增到 exp(1)≈2.718
weights = np.exp(np.linspace(0, 1, window))
weights = weights / weights.sum()
weighted_avg = float(np.average(scores, weights=weights))
return weighted_avg, details
# ---------------------------------------------------------------------------
# 中期趋势 — 15 日窗口,四象限资金意愿
# ---------------------------------------------------------------------------
def calc_medium_term(df: pd.DataFrame, window: int = 15) -> tuple[float, dict]:
if len(df) < window + 1:
@@ -89,46 +114,65 @@ def calc_medium_term(df: pd.DataFrame, window: int = 15) -> tuple[float, dict]:
close_now = float(df.iloc[-1]["close"])
close_before = float(df.iloc[-window - 1]["close"])
# ── 价格信号 ──
price_return = (close_now - close_before) / close_before if close_before != 0 else 0
price_score = max(0.0, min(100.0, 50.0 + price_return * 500))
long_up = 0
long_down = 0
for _, row in recent.iterrows():
if row["oi_chg"] > 0:
if row["close"] >= row["pre_close"]:
long_up += 1
else:
long_down += 1
# ── 资金意愿(四象限全计入)──
accumulation = 0 # 增仓上涨
distribution = 0 # 增仓下跌
covering = 0 # 减仓上涨
liquidation = 0 # 减仓下跌
for _, row in recent.iterrows():
oi_inc = row["oi_chg"] > 0
price_up = row["close"] >= row["pre_close"]
if oi_inc and price_up:
accumulation += 1
elif oi_inc and not price_up:
distribution += 1
elif not oi_inc and price_up:
covering += 1
else:
liquidation += 1
# 增仓驱动权重 1.0,减仓驱动权重 0.5(持续性较差)
fund_score = 50.0 + (
accumulation * 1.0 - distribution * 1.0
+ covering * 0.5 - liquidation * 0.5
) / window * 50.0
fund_score = max(0.0, min(100.0, fund_score))
fund_score = 50.0 + (long_up - long_down) / window * 50.0
score = price_score * 0.6 + fund_score * 0.4
detail = {
"price_return_pct": round(price_return * 100, 2),
"price_signal": round(price_score, 1),
"long_up_days": long_up,
"long_down_days": long_down,
"accumulation_days": accumulation,
"distribution_days": distribution,
"covering_days": covering,
"liquidation_days": liquidation,
"fund_signal": round(fund_score, 1),
"window": window,
}
return score, detail
# ---------------------------------------------------------------------------
# 长期结构 — 30 日窗口,端点 OI 比较
# ---------------------------------------------------------------------------
def calc_long_term(df: pd.DataFrame, window: int = 30) -> tuple[float, dict]:
if len(df) < window + 1:
raise ValueError(f"数据不足,需要至少 {window + 1}")
recent_oi = df.iloc[-window:]["oi"]
avg_oi = float(recent_oi.mean())
# ── OI 趋势分(端点比较,消除均值滞后)──
oi_now = float(df.iloc[-1]["oi"])
oi_before = float(df.iloc[-window - 1]["oi"])
oi_change_pct = (avg_oi - oi_before) / oi_before if oi_before != 0 else 0.0
# OI 趋势分 (60%)
oi_change_pct = (oi_now - oi_before) / oi_before if oi_before != 0 else 0.0
oi_score = max(0.0, min(100.0, 50.0 + oi_change_pct * 250))
# 价格趋势分 (40%)
# ── 价格趋势分 ──
close_now = float(df.iloc[-1]["close"])
price_before = float(df.iloc[-window - 1]["close"])
price_return_30d = (close_now - price_before) / price_before if price_before != 0 else 0.0
@@ -136,19 +180,27 @@ def calc_long_term(df: pd.DataFrame, window: int = 30) -> tuple[float, dict]:
score = oi_score * 0.6 + price_score * 0.4
# 额外统计近 30 日 OI 均值供参考
recent_oi = df.iloc[-window:]["oi"]
detail = {
"avg_oi": round(avg_oi, 0),
"oi_now": round(oi_now, 0),
"oi_before": round(oi_before, 0),
"change_pct": round(oi_change_pct * 100, 2),
"oi_change_pct": round(oi_change_pct * 100, 2),
"oi_score": round(oi_score, 1),
"price_score": round(price_score, 1),
"price_return_30d_pct": round(price_return_30d * 100, 2),
"price_before_30d": round(price_before, 2),
"avg_oi_30d": round(float(recent_oi.mean()), 0),
"window": window,
}
return score, detail
# ---------------------------------------------------------------------------
# 信号解读
# ---------------------------------------------------------------------------
def _interpret(composite: float) -> str:
if composite >= 80:
return "强烈看多区域 — 价格与资金共振,趋势多头的温床"
@@ -159,6 +211,10 @@ def _interpret(composite: float) -> str:
return "强烈看空区域 — 资金主动且持续地打压价格"
# ---------------------------------------------------------------------------
# 主打分函数
# ---------------------------------------------------------------------------
def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResult:
"""对 DataFrame 中指定日期或最新一条记录打分。"""
if len(df) < 31:
@@ -176,11 +232,12 @@ def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResu
latest = df.iloc[-1]
# ── 三层原始分 ──
short, short_details = calc_short_term(df, 7)
medium, medium_detail = calc_medium_term(df, 15)
long_, long_detail = calc_long_term(df, 30)
# 波动率调整
# ── 波动率(日收益率标准差 + ATR%)──
recent_30 = df.iloc[-30:].copy()
recent_30["ret"] = recent_30["close"].pct_change()
daily_vol = float(recent_30["ret"].std())
@@ -197,13 +254,36 @@ def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResu
avg_close_30 = float(recent_30["close"].mean())
atr_pct = (atr / avg_close_30) if avg_close_30 else 0.0
# 综合波动率风险度量:日收益标准差 70% + ATR% 30%
vol_risk = daily_vol * 0.7 + atr_pct * 0.3
vol_ref = 0.015
if daily_vol <= vol_ref:
if vol_risk <= vol_ref:
vol_penalty = 1.0
else:
vol_penalty = max(0.85, 1.0 - (daily_vol - vol_ref) * 10)
vol_penalty = max(0.85, 1.0 - (vol_risk - vol_ref) * 10)
composite_raw = short * 0.4 + medium * 0.35 + long_ * 0.25
# ── 自适应权重(趋势越强,长期权重越高)──
price_return_30d = (
(float(df.iloc[-1]["close"]) - float(df.iloc[-31]["close"]))
/ float(df.iloc[-31]["close"])
if df.iloc[-31]["close"] != 0
else 0.0
)
# 趋势效率 = |30日收益率| / 日波动率,比率高 = 方向明确
trend_strength = abs(price_return_30d) / max(daily_vol, 0.005)
trend_factor = min(trend_strength / 3.0, 1.0) # 归一化到 [0, 1]
w_short_base = 0.40
w_medium_base = 0.35
w_long_base = 0.25
shift = trend_factor * 0.10 # 最多转移 10% 权重
w_short = w_short_base - shift
w_medium = w_medium_base
w_long = w_long_base + shift
# ── 综合分数 ──
composite_raw = short * w_short + medium * w_medium + long_ * w_long
composite = round(composite_raw * vol_penalty, 1)
signal = _interpret(composite)
@@ -218,6 +298,7 @@ def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResu
long_term=round(long_, 1),
composite=round(composite, 1),
signal=signal,
vol_penalty=round(float(vol_penalty), 4),
detail=ScoreDetail(
short_details=short_details,
medium_detail=medium_detail,
@@ -225,12 +306,24 @@ def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResu
volatility={
"daily_vol_pct": round(float(daily_vol), 4),
"atr_pct": round(float(atr_pct), 4),
"vol_risk": round(float(vol_risk), 4),
"vol_penalty": round(float(vol_penalty), 4),
},
adaptive_weights={
"trend_strength": round(float(trend_strength), 2),
"trend_factor": round(float(trend_factor), 2),
"w_short": round(float(w_short), 4),
"w_medium": round(float(w_medium), 4),
"w_long": round(float(w_long), 4),
},
),
)
# ---------------------------------------------------------------------------
# 区间 / 全量打分
# ---------------------------------------------------------------------------
def score_range(
df: pd.DataFrame, start_date: str, end_date: str
) -> tuple[list[ScoreResult], list[str]]:
@@ -253,6 +346,7 @@ def score_range(
except ValueError as e:
warnings.append(str(e))
_fill_deltas(results)
return results, warnings
@@ -277,6 +371,21 @@ def score_all(df: pd.DataFrame) -> tuple[list[ScoreResult], list[str], int, int]
except ValueError as e:
warnings.append(f"{trade_date}: {e}")
_fill_deltas(results)
total_days = len(df) - 30
scored_count = len(results)
return results, warnings, total_days, scored_count
# ---------------------------------------------------------------------------
# 辅助:填充分数动量
# ---------------------------------------------------------------------------
def _fill_deltas(results: list[ScoreResult]):
"""为结果列表中的每个 ScoreResult 填充 composite_delta 和 composite_delta_5d。"""
for i, r in enumerate(results):
if i >= 1 and results[i - 1].composite is not None:
r.composite_delta = round(r.composite - results[i - 1].composite, 1)
if i >= 5 and results[i - 5].composite is not None:
r.composite_delta_5d = round(r.composite - results[i - 5].composite, 1)

View File

@@ -103,6 +103,19 @@ def save_score(score: ScoreResult, db_url: str = DEFAULT_DB_URL):
conn = _get_conn(db_url)
try:
with conn.cursor() as cur:
detail_payload = {
"short_details": score.detail.short_details,
"medium_detail": score.detail.medium_detail,
"long_detail": score.detail.long_detail,
"volatility": score.detail.volatility,
"adaptive_weights": score.detail.adaptive_weights,
"vol_penalty": score.vol_penalty,
}
if score.composite_delta is not None:
detail_payload["composite_delta"] = score.composite_delta
if score.composite_delta_5d is not None:
detail_payload["composite_delta_5d"] = score.composite_delta_5d
cur.execute(
"""
INSERT INTO scores
@@ -132,11 +145,7 @@ def save_score(score: ScoreResult, db_url: str = DEFAULT_DB_URL):
score.long_term,
score.composite,
score.signal,
json.dumps({
"short_details": score.detail.short_details,
"medium_detail": score.detail.medium_detail,
"long_detail": score.detail.long_detail,
}, ensure_ascii=False, default=str),
json.dumps(detail_payload, ensure_ascii=False, default=str),
),
)
conn.commit()