package handlers import ( "bufio" "bytes" "encoding/json" "fmt" "io" "log" "net/http" "strings" "trade/web/internal/store" ) // AIConfig LLM 调用配置。 type AIConfig struct { BaseURL string APIKey string Model string } // resolveLLMConfig 返回实际使用的 LLM 配置:DB 优先,环境变量作 fallback。 func (d *Deps) resolveLLMConfig() *AIConfig { cfg := &AIConfig{ BaseURL: "https://api.deepseek.com/v1", Model: "deepseek-chat", } // 尝试从 DB 读取 if dbCfg, err := d.Futures.GetLLMConfig(); err == nil && dbCfg != nil && dbCfg.APIKey != "" { cfg.APIKey = dbCfg.APIKey if dbCfg.BaseURL != "" { cfg.BaseURL = dbCfg.BaseURL } if dbCfg.Model != "" { cfg.Model = dbCfg.Model } return cfg } // fallback 到环境变量 if d.AIConfig != nil { if d.AIConfig.APIKey != "" { cfg.APIKey = d.AIConfig.APIKey } if d.AIConfig.BaseURL != "" { cfg.BaseURL = d.AIConfig.BaseURL } if d.AIConfig.Model != "" { cfg.Model = d.AIConfig.Model } } return cfg } // Analyze 接收 ts_code + trade_date,查库拼 prompt,调 LLM 并以 SSE 流式返回。 func (d *Deps) Analyze(w http.ResponseWriter, r *http.Request) { llmCfg := d.resolveLLMConfig() if llmCfg.APIKey == "" { writeErr(w, http.StatusServiceUnavailable, "LLM API Key 未配置,请在管理后台设置") return } q := r.URL.Query() tsCode := q.Get("ts_code") tradeDate := q.Get("trade_date") if tsCode == "" || tradeDate == "" { writeErr(w, http.StatusBadRequest, "缺少 ts_code 或 trade_date") return } ctx, err := d.Futures.GetAnalysisContext(tsCode, tradeDate) if err != nil { writeErr(w, http.StatusNotFound, err.Error()) return } prompt := buildPrompt(ctx) // SSE flusher, ok := w.(http.Flusher) if !ok { writeErr(w, http.StatusInternalServerError, "不支持 SSE") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") if err := streamLLM(llmCfg, prompt, w, flusher); err != nil { log.Printf("[ai] stream error: %v", err) sendSSE(w, flusher, "error", err.Error()) } sendSSE(w, flusher, "done", "") } func sendSSE(w io.Writer, flusher http.Flusher, event, data string) { if event != "" { fmt.Fprintf(w, "event: %s\n", event) } for _, line := range strings.Split(data, "\n") { fmt.Fprintf(w, "data: %s\n", line) } fmt.Fprint(w, "\n") flusher.Flush() } // streamLLM 调用 LLM chat/completions,逐 token 推送 SSE。 func streamLLM(cfg *AIConfig, prompt []map[string]string, w io.Writer, flusher http.Flusher) error { body := map[string]any{ "model": cfg.Model, "messages": prompt, "stream": true, } payload, _ := json.Marshal(body) req, err := http.NewRequest("POST", strings.TrimRight(cfg.BaseURL, "/")+"/chat/completions", bytes.NewReader(payload)) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.Header.Set("Authorization", "Bearer "+cfg.APIKey) resp, err := http.DefaultClient.Do(req) if err != nil { return fmt.Errorf("llm request: %w", err) } defer resp.Body.Close() if resp.StatusCode != 200 { b, _ := io.ReadAll(io.LimitReader(resp.Body, 1024)) return fmt.Errorf("llm status %d: %s", resp.StatusCode, string(b)) } scanner := bufio.NewScanner(resp.Body) for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if line == "" || !strings.HasPrefix(line, "data: ") { continue } data := strings.TrimPrefix(line, "data: ") if data == "[DONE]" { return nil } var chunk struct { Choices []struct { Delta struct { Content string `json:"content"` } `json:"delta"` } `json:"choices"` } if err := json.Unmarshal([]byte(data), &chunk); err != nil { continue // 忽略解析失败的行 } for _, c := range chunk.Choices { if c.Delta.Content != "" { sendSSE(w, flusher, "token", c.Delta.Content) } } } return scanner.Err() } // buildPrompt 用 AnalysisContext 构建 system + user 消息。 func buildPrompt(ctx *store.AnalysisContext) []map[string]string { s := ctx.Score // 解析 detail_json 中的关键字段 var detail struct { ShortDetails []map[string]any `json:"short_details"` MediumDetail map[string]any `json:"medium_detail"` LongDetail map[string]any `json:"long_detail"` Volatility map[string]any `json:"volatility"` AdaptiveW map[string]any `json:"adaptive_weights"` VolPenalty float64 `json:"vol_penalty"` Delta1D *float64 `json:"composite_delta"` Delta5D *float64 `json:"composite_delta_5d"` } if s.Detail != nil { _ = json.Unmarshal(s.Detail, &detail) } var sb strings.Builder sb.WriteString("你是一位资深期货技术分析师,擅长从量化打分系统中解读市场信号。\n") // 综合 sb.WriteString(fmt.Sprintf("\n## 合约 %s %s\n", s.TsCode, s.TradeDate)) sb.WriteString(fmt.Sprintf("- 收盘 %.2f 持仓 %.0f (日变动 %+.0f)\n", s.Close, s.OI, s.OIChg)) sb.WriteString(fmt.Sprintf("- 综合分 **%.1f** / 100\n", s.Composite)) sb.WriteString(fmt.Sprintf("- 分层: 短期 %.1f 中期 %.1f 长期 %.1f\n", s.ShortTerm, s.MediumTerm, s.LongTerm)) sb.WriteString(fmt.Sprintf("- 信号: %s\n", s.Signal)) // 波动率 if v, ok := detail.Volatility["vol_penalty"]; ok { dp := 0.0 if vv, ok := v.(float64); ok { dp = vv } risk := 0.0 if vr, ok := detail.Volatility["vol_risk"]; ok { if vv, ok := vr.(float64); ok { risk = vv } } sb.WriteString(fmt.Sprintf("- 波动率惩罚系数: %.3f (综合风险 %.2f%%)\n", dp, risk*100)) } // 自适应权重 if aw, ok := detail.AdaptiveW["trend_strength"]; ok { sb.WriteString(fmt.Sprintf("- 趋势强度: %.2f → 权重 短期%.0f%%/中期%.0f%%/长期%.0f%%\n", aw, valPct(detail.AdaptiveW, "w_short"), valPct(detail.AdaptiveW, "w_medium"), valPct(detail.AdaptiveW, "w_long"))) } // 分数动量 if detail.Delta1D != nil { sb.WriteString(fmt.Sprintf("- 分数动量: Δ1d %+.1f", *detail.Delta1D)) } if detail.Delta5D != nil { sb.WriteString(fmt.Sprintf(" Δ5d %+.1f", *detail.Delta5D)) } if detail.Delta1D != nil || detail.Delta5D != nil { sb.WriteString("\n") } // 短期细节 sb.WriteString("\n## 短期动力 (近7日逐日)\n") sb.WriteString("| 日期 | 象限 | 涨跌% | OI变化% | 得分 |\n") sb.WriteString("|------|------|-------|---------|------|\n") for _, d := range detail.ShortDetails { q := fmt.Sprint(d["quadrant"]) qcn := map[string]string{ "accumulation": "增仓涨", "distribution": "增仓跌", "covering": "减仓涨", "liquidation": "减仓跌", "flat": "持平", }[q] if qcn == "" { qcn = q } sb.WriteString(fmt.Sprintf("| %s | %s | %+.2f%% | %+.2f%% | %.1f |\n", d["trade_date"], qcn, floatVal(d, "price_chg_pct")*100, floatVal(d, "oi_chg_pct")*100, floatVal(d, "score"))) } // 中期细节 md := detail.MediumDetail sb.WriteString("\n## 中期趋势 (15日)\n") sb.WriteString(fmt.Sprintf("- 价格信号: %.1f (收益率 %+.2f%%)\n", floatVal(md, "price_signal"), floatVal(md, "price_return_pct"))) sb.WriteString(fmt.Sprintf("- 资金意愿: %.1f\n", floatVal(md, "fund_signal"))) a, d_, c, l := intVal(md, "accumulation_days"), intVal(md, "distribution_days"), intVal(md, "covering_days"), intVal(md, "liquidation_days") sb.WriteString(fmt.Sprintf("- 象限分布: 增仓涨 %d天 / 增仓跌 %d天 / 减仓涨 %d天 / 减仓跌 %d天\n", a, d_, c, l)) // 长期细节 ld := detail.LongDetail sb.WriteString("\n## 长期结构 (30日)\n") sb.WriteString(fmt.Sprintf("- OI 趋势分: %.1f (端点变化 %+.2f%%)\n", floatVal(ld, "oi_score"), floatVal(ld, "oi_change_pct"))) sb.WriteString(fmt.Sprintf("- 价格趋势分: %.1f (30日收益 %+.2f%%)\n", floatVal(ld, "price_score"), floatVal(ld, "price_return_30d_pct"))) // 近 5 日分数趋势 if len(ctx.RecentScores) > 0 { sb.WriteString("\n## 近5日分数趋势\n") for i, rs := range ctx.RecentScores { if i >= 5 { break } sb.WriteString(fmt.Sprintf("- %s 综合 %.1f\n", rs.TradeDate, rs.Composite)) } } // 近30日K线数据(供支撑阻力分析) if len(ctx.Candles) > 0 { start := 0 if len(ctx.Candles) > 30 { start = len(ctx.Candles) - 30 } sb.WriteString("\n## 近30日K线(开/高/低/收)\n") sb.WriteString("| 日期 | 开盘 | 最高 | 最低 | 收盘 |\n") sb.WriteString("|------|------|------|------|------|\n") for _, c := range ctx.Candles[start:] { sb.WriteString(fmt.Sprintf("| %s | %.1f | %.1f | %.1f | %.1f |\n", c.TradeDate, c.Open, c.High, c.Low, c.Close)) } } sb.WriteString("\n请从以下4个角度简要分析(使用中文):\n") sb.WriteString("1. 当前多空格局(2-3句话)\n") sb.WriteString("2. 资金行为特征(2-3句话)\n") sb.WriteString("3. 关键风险点(2-3句话)\n") sb.WriteString("4. 支撑与阻力(明确指出最近的关键支撑位和阻力位,基于近30日高低点和均线位置,给出具体价位和依据)\n") return []map[string]string{ {"role": "user", "content": sb.String()}, } } func floatVal(m map[string]any, key string) float64 { v, _ := m[key].(float64) return v } func intVal(m map[string]any, key string) int { v, _ := m[key].(float64) return int(v) } func valPct(m map[string]any, key string) float64 { v, _ := m[key].(float64) return v * 100 }