优化三层打分模型:短期引入幅度因子与量能确认,中期资金信号连续化,长期加入价格维度,新增波动率调整

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
fish
2026-05-04 22:57:45 +08:00
parent 23a1149c5f
commit 76ddc495c1
5 changed files with 247 additions and 61 deletions

View File

@@ -1,5 +1,6 @@
import argparse
import sys
from typing import Optional
from . import contracts, fetcher, scorer, storage
@@ -37,15 +38,19 @@ def run(ts_code: str, trade_date: Optional[str] = None) -> int:
print(f"\n信号: {result.signal}")
print("=" * 65)
quadrant_names = {
"accumulation": "增仓上涨", "distribution": "增仓下跌",
"covering": "减仓上涨", "liquidation": "减仓下跌", "flat": "持仓持平",
}
print("\n[短期动力] 近7日逐日打分:")
print("-" * 65)
print("-" * 80)
for d in result.detail.short_details:
tag = "增仓" if d["oi_chg"] > 0 else "减仓"
if abs(d["oi_chg"] / d["oi"]) < 0.01:
tag = "持平"
price_dir = "" if d["close"] >= d["pre_close"] else ""
print(f" {d['trade_date']} {tag:>4} + {price_dir} "
f"持仓{d['oi_chg']:>+8,.0f} 得分: {d['score']:>3}")
q = quadrant_names.get(d["quadrant"], d["quadrant"])
print(f" {d['trade_date']} {q} "
f"涨跌{d['price_chg_pct']*100:>+.2f}% "
f"OI变化{d['oi_chg_pct']*100:>+.2f}% "
f"量比{d['vol_ratio']:.2f} "
f"得分: {d['score']:>5.1f}")
md = result.detail.medium_detail
print(f"\n[中期趋势] 明细:")
@@ -53,14 +58,24 @@ def run(ts_code: str, trade_date: Optional[str] = None) -> int:
print(f" 价格信号得分: {md['price_signal']:.1f}")
print(f" 增仓上涨天数: {md['long_up_days']}")
print(f" 增仓下跌天数: {md['long_down_days']}")
print(f" 资金意愿得分: {md['fund_signal']}")
print(f" 资金意愿得分: {md['fund_signal']:.1f}")
ld = result.detail.long_detail
print(f"\n[长期结构] 明细:")
print(f" OI趋势得分: {ld['oi_score']:.1f} (权重 60%)")
print(f" 价格趋势得分: {ld['price_score']:.1f} (权重 40%)")
print(f" 30日价格收益: {ld['price_return_30d_pct']:+.2f}%")
print(f" 30日前收盘价: {ld['price_before_30d']:.2f}")
print(f" 近30日日均持仓: {ld['avg_oi']:,.0f}")
print(f" 30日前持仓量: {ld['oi_before']:,.0f}")
print(f" 持仓变化幅度: {ld['change_pct']:+.2f}%")
vd = result.detail.volatility
print(f"\n[波动率调整]")
print(f" 日波动率(30d std): {vd['daily_vol_pct']*100:.2f}%")
print(f" ATR%: {vd['atr_pct']*100:.2f}%")
print(f" 惩罚系数: {vd['vol_penalty']:.3f}")
print(f"\n[OK] 数据已持久化到 PostgreSQL")
return 0

View File

@@ -22,6 +22,7 @@ class ScoreDetail:
short_details: list = field(default_factory=list)
medium_detail: dict = field(default_factory=dict)
long_detail: dict = field(default_factory=dict)
volatility: dict = field(default_factory=dict)
@dataclass

View File

@@ -5,44 +5,79 @@ import pandas as pd
from .models import ScoreDetail, ScoreResult
def _daily_short_score(row: pd.Series) -> int:
"""单日短期动力打分。"""
def _daily_short_score(row: pd.Series, avg_vol_7d: float) -> dict:
"""单日短期动力打分(连续值 + 幅度因子 + 量能确认)"""
oi = float(row["oi"])
oi_chg = float(row["oi_chg"])
close = float(row["close"])
pre_close = float(row["pre_close"])
vol = float(row.get("vol", 0))
oi_change_pct = abs(oi_chg / oi) if oi != 0 else 0
oi_chg_pct = oi_chg / oi if oi != 0 else 0.0
price_chg_pct = (close - pre_close) / pre_close if pre_close != 0 else 0.0
price_up = close >= pre_close
if oi_change_pct < 0.01:
return 60 if price_up else 40
oi_increasing = oi_chg > 0
if oi_increasing and price_up:
return 100
if oi_increasing and not price_up:
return 0
if not oi_increasing and price_up:
return 70
return 30
# 象限基础分
if abs(oi_chg_pct) < 0.01:
base = 60.0 if price_up else 40.0
quadrant = "flat"
elif oi_increasing and price_up:
base = 75.0
quadrant = "accumulation"
elif oi_increasing and not price_up:
base = 25.0
quadrant = "distribution"
elif not oi_increasing and price_up:
base = 65.0
quadrant = "covering"
else:
base = 20.0
quadrant = "liquidation"
# 幅度加成OI 变化率封顶 5%,价格涨跌幅封顶 3%
oi_mag = min(1.0, abs(oi_chg_pct) / 0.05)
price_mag = min(1.0, abs(price_chg_pct) / 0.03)
if quadrant in ("accumulation", "liquidation"):
boost = (oi_mag + price_mag) / 2.0 * 20.0
elif quadrant == "flat":
boost = price_mag * 10.0
else:
boost = 0.0
# 量能确认
vol_ratio = vol / avg_vol_7d if avg_vol_7d > 0 else 1.0
vol_factor = 0.9 + 0.2 * min(vol_ratio, 1.5)
score = max(0.0, min(100.0, (base + boost) * vol_factor))
return {
"trade_date": str(row["trade_date"]),
"close": close,
"pre_close": pre_close,
"oi": oi,
"oi_chg": oi_chg,
"oi_chg_pct": round(float(oi_chg_pct), 4),
"price_chg_pct": round(float(price_chg_pct), 4),
"vol": float(vol),
"vol_ratio": round(float(vol_ratio), 2),
"quadrant": quadrant,
"score": round(float(score), 1),
}
def calc_short_term(df: pd.DataFrame, window: int = 7) -> tuple[float, list]:
recent = df.iloc[-window:].copy()
avg_vol_7d = float(recent["vol"].mean()) if "vol" in recent.columns else 0.0
scores = []
details = []
for _, row in recent.iterrows():
score = _daily_short_score(row)
scores.append(score)
details.append({
"trade_date": str(row["trade_date"]),
"close": float(row["close"]),
"pre_close": float(row["pre_close"]),
"oi": float(row["oi"]),
"oi_chg": float(row["oi_chg"]),
"score": score,
})
detail = _daily_short_score(row, avg_vol_7d)
scores.append(detail["score"])
details.append(detail)
return sum(scores) / len(scores), details
@@ -66,7 +101,7 @@ def calc_medium_term(df: pd.DataFrame, window: int = 15) -> tuple[float, dict]:
else:
long_down += 1
fund_score = 80 if long_up > long_down else (20 if long_up < long_down else 50)
fund_score = 50.0 + (long_up - long_down) / window * 50.0
score = price_score * 0.6 + fund_score * 0.4
detail = {
@@ -74,7 +109,8 @@ def calc_medium_term(df: pd.DataFrame, window: int = 15) -> tuple[float, dict]:
"price_signal": round(price_score, 1),
"long_up_days": long_up,
"long_down_days": long_down,
"fund_signal": fund_score,
"fund_signal": round(fund_score, 1),
"window": window,
}
return score, detail
@@ -84,26 +120,31 @@ def calc_long_term(df: pd.DataFrame, window: int = 30) -> tuple[float, dict]:
raise ValueError(f"数据不足,需要至少 {window + 1}")
recent_oi = df.iloc[-window:]["oi"]
avg_oi = recent_oi.mean()
avg_oi = float(recent_oi.mean())
oi_before = float(df.iloc[-window - 1]["oi"])
change_pct = (avg_oi - oi_before) / oi_before if oi_before != 0 else 0
oi_change_pct = (avg_oi - oi_before) / oi_before if oi_before != 0 else 0.0
if change_pct > 0.10:
score = 90
elif change_pct > 0.05:
score = 70
elif change_pct > -0.05:
score = 50
elif change_pct > -0.10:
score = 30
else:
score = 10
# OI 趋势分 (60%)
oi_score = max(0.0, min(100.0, 50.0 + oi_change_pct * 250))
# 价格趋势分 (40%)
close_now = float(df.iloc[-1]["close"])
price_before = float(df.iloc[-window - 1]["close"])
price_return_30d = (close_now - price_before) / price_before if price_before != 0 else 0.0
price_score = max(0.0, min(100.0, 50.0 + price_return_30d * 200))
score = oi_score * 0.6 + price_score * 0.4
detail = {
"avg_oi": round(float(avg_oi), 0),
"avg_oi": round(avg_oi, 0),
"oi_before": round(oi_before, 0),
"change_pct": round(change_pct * 100, 2),
"change_pct": round(oi_change_pct * 100, 2),
"oi_score": round(oi_score, 1),
"price_score": round(price_score, 1),
"price_return_30d_pct": round(price_return_30d * 100, 2),
"price_before_30d": round(price_before, 2),
"window": window,
}
return score, detail
@@ -139,7 +180,31 @@ def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResu
medium, medium_detail = calc_medium_term(df, 15)
long_, long_detail = calc_long_term(df, 30)
composite = short * 0.4 + medium * 0.35 + long_ * 0.25
# 波动率调整
recent_30 = df.iloc[-30:].copy()
recent_30["ret"] = recent_30["close"].pct_change()
daily_vol = float(recent_30["ret"].std())
recent_30["tr"] = recent_30.apply(
lambda r: max(
r["high"] - r["low"],
abs(r["high"] - r["pre_close"]),
abs(r["low"] - r["pre_close"]),
),
axis=1,
)
atr = float(recent_30["tr"].mean())
avg_close_30 = float(recent_30["close"].mean())
atr_pct = (atr / avg_close_30) if avg_close_30 else 0.0
vol_ref = 0.015
if daily_vol <= vol_ref:
vol_penalty = 1.0
else:
vol_penalty = max(0.85, 1.0 - (daily_vol - vol_ref) * 10)
composite_raw = short * 0.4 + medium * 0.35 + long_ * 0.25
composite = round(composite_raw * vol_penalty, 1)
signal = _interpret(composite)
return ScoreResult(
@@ -157,5 +222,10 @@ def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResu
short_details=short_details,
medium_detail=medium_detail,
long_detail=long_detail,
volatility={
"daily_vol_pct": round(float(daily_vol), 4),
"atr_pct": round(float(atr_pct), 4),
"vol_penalty": round(float(vol_penalty), 4),
},
),
)