from typing import Optional import pandas as pd from .models import ScoreDetail, ScoreResult def _daily_short_score(row: pd.Series) -> int: """单日短期动力打分。""" oi = float(row["oi"]) oi_chg = float(row["oi_chg"]) close = float(row["close"]) pre_close = float(row["pre_close"]) oi_change_pct = abs(oi_chg / oi) if oi != 0 else 0 price_up = close >= pre_close if oi_change_pct < 0.01: return 60 if price_up else 40 oi_increasing = oi_chg > 0 if oi_increasing and price_up: return 100 if oi_increasing and not price_up: return 0 if not oi_increasing and price_up: return 70 return 30 def calc_short_term(df: pd.DataFrame, window: int = 7) -> tuple[float, list]: recent = df.iloc[-window:].copy() scores = [] details = [] for _, row in recent.iterrows(): score = _daily_short_score(row) scores.append(score) details.append({ "trade_date": str(row["trade_date"]), "close": float(row["close"]), "pre_close": float(row["pre_close"]), "oi": float(row["oi"]), "oi_chg": float(row["oi_chg"]), "score": score, }) return sum(scores) / len(scores), details def calc_medium_term(df: pd.DataFrame, window: int = 15) -> tuple[float, dict]: if len(df) < window + 1: raise ValueError(f"数据不足,需要至少 {window + 1} 行") recent = df.iloc[-window:].copy() close_now = float(df.iloc[-1]["close"]) close_before = float(df.iloc[-window - 1]["close"]) price_return = (close_now - close_before) / close_before if close_before != 0 else 0 price_score = max(0.0, min(100.0, 50.0 + price_return * 500)) long_up = 0 long_down = 0 for _, row in recent.iterrows(): if row["oi_chg"] > 0: if row["close"] >= row["pre_close"]: long_up += 1 else: long_down += 1 fund_score = 80 if long_up > long_down else (20 if long_up < long_down else 50) score = price_score * 0.6 + fund_score * 0.4 detail = { "price_return_pct": round(price_return * 100, 2), "price_signal": round(price_score, 1), "long_up_days": long_up, "long_down_days": long_down, "fund_signal": fund_score, } return score, detail def calc_long_term(df: pd.DataFrame, window: int = 30) -> tuple[float, dict]: if len(df) < window + 1: raise ValueError(f"数据不足,需要至少 {window + 1} 行") recent_oi = df.iloc[-window:]["oi"] avg_oi = recent_oi.mean() oi_before = float(df.iloc[-window - 1]["oi"]) change_pct = (avg_oi - oi_before) / oi_before if oi_before != 0 else 0 if change_pct > 0.10: score = 90 elif change_pct > 0.05: score = 70 elif change_pct > -0.05: score = 50 elif change_pct > -0.10: score = 30 else: score = 10 detail = { "avg_oi": round(float(avg_oi), 0), "oi_before": round(oi_before, 0), "change_pct": round(change_pct * 100, 2), } return score, detail def _interpret(composite: float) -> str: if composite >= 80: return "强烈看多区域 — 价格与资金共振,趋势多头的温床" if composite >= 50: return "偏多/震荡偏强 — 上涨但资金犹豫,或空头离场反弹" if composite >= 40: return "偏空/震荡偏弱 — 多头止损,或缺乏资金的阴跌" return "强烈看空区域 — 资金主动且持续地打压价格" def score_daily(df: pd.DataFrame, trade_date: Optional[str] = None) -> ScoreResult: """对 DataFrame 中指定日期或最新一条记录打分。""" if len(df) < 31: raise ValueError(f"数据量不足(仅 {len(df)} 行),需要至少 31 行") if trade_date: trade_date_str = str(trade_date) mask = df["trade_date"].astype(str) == trade_date_str if not mask.any(): raise ValueError(f"指定日期 {trade_date_str} 不在数据中") pos = mask.idxmax() df = df.iloc[:pos + 1].copy() if len(df) < 31: raise ValueError(f"指定日期 {trade_date_str} 之前数据不足(仅 {len(df)} 行),需要至少 31 行") latest = df.iloc[-1] short, short_details = calc_short_term(df, 7) medium, medium_detail = calc_medium_term(df, 15) long_, long_detail = calc_long_term(df, 30) composite = short * 0.4 + medium * 0.35 + long_ * 0.25 signal = _interpret(composite) return ScoreResult( ts_code=str(latest["ts_code"]), trade_date=str(latest["trade_date"]), close=float(latest["close"]), oi=float(latest["oi"]), oi_chg=float(latest["oi_chg"]), short_term=round(short, 1), medium_term=round(medium, 1), long_term=round(long_, 1), composite=round(composite, 1), signal=signal, detail=ScoreDetail( short_details=short_details, medium_detail=medium_detail, long_detail=long_detail, ), )