构建期货数据采集与三层打分系统

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
fish
2026-05-02 22:56:10 +08:00
parent e55aa8603b
commit c466dbbf3f
12 changed files with 681 additions and 0 deletions

26
tushare/.dockerignore Normal file
View File

@@ -0,0 +1,26 @@
__pycache__
*.py[cod]
*$py.class
.venv
venv
env
.env
.env.*
!.env.example
.pytest_cache
.mypy_cache
.ruff_cache
.coverage
htmlcov
.ipynb_checkpoints
.git
.gitignore
.idea
.vscode
*.log
*.csv
*.parquet
*.db
*.sqlite*
data/
cache/

28
tushare/Dockerfile Normal file
View File

@@ -0,0 +1,28 @@
FROM python:3.13.7-alpine3.22
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1 \
TZ=Asia/Shanghai \
DB_PATH=/app/data/futures.db
WORKDIR /app
# 运行时依赖 + 时区
RUN apk add --no-cache tzdata \
&& cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \
&& echo "Asia/Shanghai" > /etc/timezone
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 非 root 用户
RUN adduser -D -u 1000 app \
&& mkdir -p /app/data \
&& chown app:app /app/data
COPY --chown=app:app src ./src
USER app
CMD ["python", "-m", "src.main", "FG2609.ZCE"]

23
tushare/main.py Normal file
View File

@@ -0,0 +1,23 @@
import os
import sys
import tushare as ts
def main() -> int:
token = os.environ.get("TUSHARE_TOKEN")
if not token:
print("[ERROR] 未设置 TUSHARE_TOKEN 环境变量", file=sys.stderr)
return 1
ts.set_token(token)
pro = ts.pro_api()
df = pro.trade_cal(exchange="SHFE", start_date="20260101", end_date="20260110")
print(f"[OK] tushare 连通,返回 {len(df)} 行交易日历样本")
print(df.head())
return 0
if __name__ == "__main__":
sys.exit(main())

3
tushare/requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
tushare>=1.4.0
pandas>=2.2.0
requests>=2.31.0

0
tushare/src/__init__.py Normal file
View File

36
tushare/src/fetcher.py Normal file
View File

@@ -0,0 +1,36 @@
import os
from typing import Optional
import pandas as pd
import tushare as ts
def _init_api():
token = os.environ.get("TUSHARE_TOKEN")
if not token:
raise RuntimeError("TUSHARE_TOKEN 环境变量未设置")
ts.set_token(token)
return ts.pro_api()
def fetch_contract(ts_code: str, limit: int = 100) -> pd.DataFrame:
"""拉取指定期货合约的日线数据,返回按 trade_date 升序排列的 DataFrame。"""
pro = _init_api()
df = pro.fut_daily(ts_code=ts_code)
if df.empty:
raise RuntimeError(f"未返回 {ts_code} 的任何数据,可能合约代码错误或 token 积分不足")
cols = [
"ts_code", "trade_date", "open", "high", "low",
"close", "vol", "amount", "oi", "oi_chg", "pre_close",
]
df = df[[c for c in cols if c in df.columns]].copy()
numeric = ["open", "high", "low", "close", "vol", "amount", "oi", "oi_chg", "pre_close"]
for c in numeric:
if c in df.columns:
df[c] = pd.to_numeric(df[c], errors="coerce")
df = df.sort_values("trade_date").reset_index(drop=True)
return df

76
tushare/src/main.py Normal file
View File

@@ -0,0 +1,76 @@
import argparse
import sys
from . import fetcher, scorer, storage
def run(ts_code: str) -> int:
storage.init_db()
print(f"[1/4] 拉取 {ts_code} 数据...")
df = fetcher.fetch_contract(ts_code)
print(f" 返回 {len(df)}")
print(f"[2/4] 写入/更新 SQLite...")
storage.save_candles(df)
print(f"[3/4] 计算打分...")
result = scorer.score_daily(df)
print(f"[4/4] 保存打分结果...")
storage.save_score(result)
# 输出
print("\n" + "=" * 65)
print(f"合约: {result.ts_code:<20} 日期: {result.trade_date}")
print(f"收盘: {result.close:>10.2f} 持仓: {result.oi:>12,.0f}")
print(f"持仓变动: {result.oi_chg:>+8,.0f}")
print("=" * 65)
print(f"\n{'模块':<12} {'分数':>8} {'权重':>6} {'加权':>8}")
print("-" * 40)
print(f"{'短期动力':<12} {result.short_term:>8.1f} {0.4:>6.2f} {result.short_term * 0.4:>8.2f}")
print(f"{'中期趋势':<12} {result.medium_term:>8.1f} {0.35:>6.2f} {result.medium_term * 0.35:>8.2f}")
print(f"{'长期结构':<12} {result.long_term:>8.1f} {0.25:>6.2f} {result.long_term * 0.25:>8.2f}")
print("-" * 40)
print(f"{'综合分数':<12} {result.composite:>8.1f}")
print(f"\n信号: {result.signal}")
print("=" * 65)
print("\n[短期动力] 近7日逐日打分:")
print("-" * 65)
for d in result.detail.short_details:
tag = "增仓" if d["oi_chg"] > 0 else "减仓"
if abs(d["oi_chg"] / d["oi"]) < 0.01:
tag = "持平"
price_dir = "" if d["close"] >= d["pre_close"] else ""
print(f" {d['trade_date']} {tag:>4} + {price_dir} "
f"持仓{d['oi_chg']:>+8,.0f} 得分: {d['score']:>3}")
md = result.detail.medium_detail
print(f"\n[中期趋势] 明细:")
print(f" 15日价格收益率: {md['price_return_pct']:+.2f}%")
print(f" 价格信号得分: {md['price_signal']:.1f}")
print(f" 增仓上涨天数: {md['long_up_days']}")
print(f" 增仓下跌天数: {md['long_down_days']}")
print(f" 资金意愿得分: {md['fund_signal']}")
ld = result.detail.long_detail
print(f"\n[长期结构] 明细:")
print(f" 近30日日均持仓: {ld['avg_oi']:,.0f}")
print(f" 30日前持仓量: {ld['oi_before']:,.0f}")
print(f" 持仓变化幅度: {ld['change_pct']:+.2f}%")
print(f"\n[OK] 数据已持久化到 SQLite")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="期货合约三层打分模型")
parser.add_argument("ts_code", help="合约代码,如 FG2609.ZCE")
args = parser.parse_args()
return run(args.ts_code)
if __name__ == "__main__":
sys.exit(main())

39
tushare/src/models.py Normal file
View File

@@ -0,0 +1,39 @@
from dataclasses import dataclass, field
from typing import Optional
@dataclass
class Candle:
ts_code: str
trade_date: str
open: float
high: float
low: float
close: float
vol: float
amount: float
oi: float
oi_chg: float
pre_close: Optional[float] = None
@dataclass
class ScoreDetail:
short_details: list = field(default_factory=list)
medium_detail: dict = field(default_factory=dict)
long_detail: dict = field(default_factory=dict)
@dataclass
class ScoreResult:
ts_code: str
trade_date: str
close: float
oi: float
oi_chg: float
short_term: float
medium_term: float
long_term: float
composite: float
signal: str
detail: ScoreDetail

149
tushare/src/scorer.py Normal file
View File

@@ -0,0 +1,149 @@
import pandas as pd
from .models import ScoreDetail, ScoreResult
def _daily_short_score(row: pd.Series) -> int:
"""单日短期动力打分。"""
oi = float(row["oi"])
oi_chg = float(row["oi_chg"])
close = float(row["close"])
pre_close = float(row["pre_close"])
oi_change_pct = abs(oi_chg / oi) if oi != 0 else 0
price_up = close >= pre_close
if oi_change_pct < 0.01:
return 60 if price_up else 40
oi_increasing = oi_chg > 0
if oi_increasing and price_up:
return 100
if oi_increasing and not price_up:
return 0
if not oi_increasing and price_up:
return 70
return 30
def calc_short_term(df: pd.DataFrame, window: int = 7) -> tuple[float, list]:
recent = df.iloc[-window:].copy()
scores = []
details = []
for _, row in recent.iterrows():
score = _daily_short_score(row)
scores.append(score)
details.append({
"trade_date": str(row["trade_date"]),
"close": float(row["close"]),
"pre_close": float(row["pre_close"]),
"oi": float(row["oi"]),
"oi_chg": float(row["oi_chg"]),
"score": score,
})
return sum(scores) / len(scores), details
def calc_medium_term(df: pd.DataFrame, window: int = 15) -> tuple[float, dict]:
if len(df) < window + 1:
raise ValueError(f"数据不足,需要至少 {window + 1}")
recent = df.iloc[-window:].copy()
close_now = float(df.iloc[-1]["close"])
close_before = float(df.iloc[-window - 1]["close"])
price_return = (close_now - close_before) / close_before if close_before != 0 else 0
price_score = max(0.0, min(100.0, 50.0 + price_return * 500))
long_up = 0
long_down = 0
for _, row in recent.iterrows():
if row["oi_chg"] > 0:
if row["close"] >= row["pre_close"]:
long_up += 1
else:
long_down += 1
fund_score = 80 if long_up > long_down else (20 if long_up < long_down else 50)
score = price_score * 0.6 + fund_score * 0.4
detail = {
"price_return_pct": round(price_return * 100, 2),
"price_signal": round(price_score, 1),
"long_up_days": long_up,
"long_down_days": long_down,
"fund_signal": fund_score,
}
return score, detail
def calc_long_term(df: pd.DataFrame, window: int = 30) -> tuple[float, dict]:
if len(df) < window + 1:
raise ValueError(f"数据不足,需要至少 {window + 1}")
recent_oi = df.iloc[-window:]["oi"]
avg_oi = recent_oi.mean()
oi_before = float(df.iloc[-window - 1]["oi"])
change_pct = (avg_oi - oi_before) / oi_before if oi_before != 0 else 0
if change_pct > 0.10:
score = 90
elif change_pct > 0.05:
score = 70
elif change_pct > -0.05:
score = 50
elif change_pct > -0.10:
score = 30
else:
score = 10
detail = {
"avg_oi": round(float(avg_oi), 0),
"oi_before": round(oi_before, 0),
"change_pct": round(change_pct * 100, 2),
}
return score, detail
def _interpret(composite: float) -> str:
if composite >= 80:
return "强烈看多区域 — 价格与资金共振,趋势多头的温床"
if composite >= 50:
return "偏多/震荡偏强 — 上涨但资金犹豫,或空头离场反弹"
if composite >= 40:
return "偏空/震荡偏弱 — 多头止损,或缺乏资金的阴跌"
return "强烈看空区域 — 资金主动且持续地打压价格"
def score_daily(df: pd.DataFrame) -> ScoreResult:
"""对 DataFrame 中最新一条记录打分。"""
if len(df) < 31:
raise ValueError(f"数据量不足(仅 {len(df)} 行),需要至少 31 行")
latest = df.iloc[-1]
short, short_details = calc_short_term(df, 7)
medium, medium_detail = calc_medium_term(df, 15)
long_, long_detail = calc_long_term(df, 30)
composite = short * 0.4 + medium * 0.35 + long_ * 0.25
signal = _interpret(composite)
return ScoreResult(
ts_code=str(latest["ts_code"]),
trade_date=str(latest["trade_date"]),
close=float(latest["close"]),
oi=float(latest["oi"]),
oi_chg=float(latest["oi_chg"]),
short_term=round(short, 1),
medium_term=round(medium, 1),
long_term=round(long_, 1),
composite=round(composite, 1),
signal=signal,
detail=ScoreDetail(
short_details=short_details,
medium_detail=medium_detail,
long_detail=long_detail,
),
)

131
tushare/src/storage.py Normal file
View File

@@ -0,0 +1,131 @@
import json
import os
import sqlite3
from typing import Optional
import pandas as pd
from .models import ScoreResult
DEFAULT_DB_PATH = os.environ.get("DB_PATH", "/app/data/futures.db")
def _get_conn(db_path: str = DEFAULT_DB_PATH) -> sqlite3.Connection:
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
return conn
def init_db(db_path: str = DEFAULT_DB_PATH):
"""初始化数据库,创建 candles 和 scores 表。"""
os.makedirs(os.path.dirname(db_path), exist_ok=True)
conn = _get_conn(db_path)
try:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("""
CREATE TABLE IF NOT EXISTS candles (
ts_code TEXT NOT NULL,
trade_date TEXT NOT NULL,
open REAL,
high REAL,
low REAL,
close REAL,
vol REAL,
amount REAL,
oi REAL,
oi_chg REAL,
pre_close REAL,
PRIMARY KEY (ts_code, trade_date)
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ts_code TEXT NOT NULL,
trade_date TEXT NOT NULL,
close REAL,
oi REAL,
oi_chg REAL,
short_term REAL,
medium_term REAL,
long_term REAL,
composite REAL,
signal TEXT,
detail_json TEXT,
created_at TEXT DEFAULT (datetime('now', 'localtime')),
UNIQUE (ts_code, trade_date)
)
""")
conn.commit()
finally:
conn.close()
def save_candles(df: pd.DataFrame, db_path: str = DEFAULT_DB_PATH):
"""批量写入/更新日线数据。"""
if df.empty:
return
conn = _get_conn(db_path)
try:
df = df.copy()
df = df.where(pd.notna(df), None)
records = df.to_dict(orient="records")
conn.executemany(
"""
INSERT OR REPLACE INTO candles
(ts_code, trade_date, open, high, low, close, vol, amount, oi, oi_chg, pre_close)
VALUES (:ts_code, :trade_date, :open, :high, :low, :close,
:vol, :amount, :oi, :oi_chg, :pre_close)
""",
records,
)
conn.commit()
finally:
conn.close()
def save_score(score: ScoreResult, db_path: str = DEFAULT_DB_PATH):
"""写入打分结果。"""
conn = _get_conn(db_path)
try:
conn.execute(
"""
INSERT OR REPLACE INTO scores
(ts_code, trade_date, close, oi, oi_chg,
short_term, medium_term, long_term, composite, signal, detail_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
score.ts_code,
score.trade_date,
score.close,
score.oi,
score.oi_chg,
score.short_term,
score.medium_term,
score.long_term,
score.composite,
score.signal,
json.dumps({
"short_details": score.detail.short_details,
"medium_detail": score.detail.medium_detail,
"long_detail": score.detail.long_detail,
}, ensure_ascii=False, default=str),
),
)
conn.commit()
finally:
conn.close()
def get_latest_score(ts_code: str, db_path: str = DEFAULT_DB_PATH) -> Optional[dict]:
"""查询最新打分记录。"""
conn = _get_conn(db_path)
try:
row = conn.execute(
"SELECT * FROM scores WHERE ts_code = ? ORDER BY trade_date DESC LIMIT 1",
(ts_code,),
).fetchone()
return dict(row) if row else None
finally:
conn.close()