diff --git a/services/cn_futures_trading_records/src/crud/create.go b/services/cn_futures_trading_records/src/crud/create.go index 6d868d6..f528078 100644 --- a/services/cn_futures_trading_records/src/crud/create.go +++ b/services/cn_futures_trading_records/src/crud/create.go @@ -2,8 +2,9 @@ package crud import ( "cn_futures_trading_records/db" - "encoding/json" // 新增 + "encoding/json" "net/http" + "strconv" "time" "github.com/gin-gonic/gin" @@ -11,45 +12,45 @@ import ( "go.uber.org/zap" ) -// TradingRecordsCreateRequest 交易记录创建请求参数结构 -type TradingRecordsCreateRequest struct { - EventType string `json:"event_type"` - Payload struct { - OpenYear int `json:"open_year" binding:"required,min=1900,max=2200"` // 开仓时间:年(1900-2200) - OpenMonth int `json:"open_month" binding:"required,min=1,max=12"` // 开仓时间:月(1-12) - OpenDay int `json:"open_day" binding:"required,min=1,max=31"` // 开仓时间:日(1-31) - Symbol string `json:"symbol" binding:"required"` // 品种代码(如 RB、CU) - Contract string `json:"contract" binding:"required"` // 合约代码(如 2505) - Direction string `json:"direction" binding:"required,oneof=long short"` // 交易方向:long 多头 / short 空头 - OpenPrice float64 `json:"open_price" binding:"required"` // 开仓价格(单位:元) - OpenFee float64 `json:"open_fee" binding:"required,min=0"` // 开仓手续费(≥0) - CloseYear int `json:"close_year" binding:"required,min=1900,max=2200"` // 平仓时间:年(1900-2200) - CloseMonth int `json:"close_month" binding:"required,min=1,max=12"` // 平仓时间:月(1-12) - CloseDay int `json:"close_day" binding:"required,min=1,max=31"` // 平仓时间:日(1-31) - ClosePrice float64 `json:"close_price" binding:"required"` // 平仓价格(单位:元) - CloseFee float64 `json:"close_fee" binding:"required,min=0"` // 平仓手续费(≥0) - PriceDiff float64 `json:"price_diff" binding:"required"` // 平仓差价 = ClosePrice - OpenPrice - MinTick float64 `json:"min_tick" binding:"required"` // 品种最小跳点(如 1) - TickPrice float64 `json:"tick_price" binding:"required"` // 每跳价格(如 10 元) - DiffPnL float64 `json:"diff_pnl" binding:"required"` // 差价盈亏(已考虑方向与跳点) - TotalFee float64 `json:"total_fee" binding:"required,min=0"` // 手续费合计 = OpenFee + CloseFee(≥0) - ClosePnL float64 `json:"close_pnl" binding:"required"` // 平仓净盈亏 = DiffPnL - TotalFee - } `json:"payload" binding:"required"` +/* ---------- 公共 Payload 结构体 ---------- */ +type Payload struct { + OpenYear int `json:"open_year" binding:"required,min=1900,max=2200"` + OpenMonth int `json:"open_month" binding:"required,min=1,max=12"` + OpenDay int `json:"open_day" binding:"required,min=1,max=31"` + Symbol string `json:"symbol" binding:"required"` + Contract string `json:"contract" binding:"required"` + Direction string `json:"direction" binding:"required,oneof=long short"` + OpenPrice float64 `json:"open_price" binding:"required"` + OpenFee float64 `json:"open_fee" binding:"required,min=0"` + CloseYear int `json:"close_year" binding:"required,min=1900,max=2200"` + CloseMonth int `json:"close_month" binding:"required,min=1,max=12"` + CloseDay int `json:"close_day" binding:"required,min=1,max=31"` + ClosePrice float64 `json:"close_price" binding:"required"` + CloseFee float64 `json:"close_fee" binding:"required,min=0"` + PriceDiff float64 `json:"price_diff" binding:"required"` + MinTick float64 `json:"min_tick" binding:"required"` + TickPrice float64 `json:"tick_price" binding:"required"` + DiffPnL float64 `json:"diff_pnl" binding:"required"` + TotalFee float64 `json:"total_fee" binding:"required,min=0"` + ClosePnL float64 `json:"close_pnl" binding:"required"` +} + +/* ---------- 单条创建 ---------- */ +type TradingRecordsCreateRequest struct { + EventType string `json:"event_type"` + Payload Payload `json:"payload" binding:"required"` } -// TradingRecordsCreateResponse 创建响应结构 type TradingRecordsCreateResponse struct { Success bool `json:"success"` Message string `json:"message"` Data TradingRecordsCreateData `json:"data"` } -// TradingRecordsCreateData 响应数据结构 type TradingRecordsCreateData struct { RecordID string `json:"record_id"` } -// CreateHandler 处理交易记录创建逻辑 func CreateHandler(c *gin.Context) { startTime := time.Now() reqID := c.Request.Header.Get("X-TradingRecordsRequest-ID") @@ -66,124 +67,162 @@ func CreateHandler(c *gin.Context) { var req TradingRecordsCreateRequest if err := c.ShouldBindJSON(&req); err != nil { - zap.L().Warn("⚠️ 请求参数验证失败", - zap.String("req_id", reqID), - zap.Error(err), - ) - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: false, - Message: "请求参数错误,请检查字段格式与必填项", - }) + zap.L().Warn("⚠️ 请求参数验证失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: false, Message: "请求参数错误,请检查字段格式与必填项"}) return } - zap.L().Debug("✅ 请求参数验证通过", - zap.String("req_id", reqID), - zap.String("event_type", req.EventType), - zap.Any("payload", req.Payload), - ) + zap.L().Debug("✅ 请求参数验证通过", zap.String("req_id", reqID), zap.String("event_type", req.EventType), zap.Any("payload", req.Payload)) - // 开启数据库事务 tx, err := db.DB.Begin() if err != nil { - zap.L().Error("❌ 事务开启失败", - zap.String("req_id", reqID), - zap.Error(err), - ) - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: false, - Message: "系统错误,请稍后重试", - }) + zap.L().Error("❌ 事务开启失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: false, Message: "系统错误,请稍后重试"}) return } - defer func() { if r := recover(); r != nil { - if err := tx.Rollback(); err != nil { - zap.L().Error("💥 panic后事务回滚失败", - zap.String("req_id", reqID), - zap.Error(err), - ) - } - zap.L().Error("💥 事务处理发生panic", - zap.String("req_id", reqID), - zap.Any("recover", r), - ) - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: false, - Message: "系统错误,请稍后重试", - }) + _ = tx.Rollback() + zap.L().Error("💥 事务处理发生panic", zap.String("req_id", reqID), zap.Any("recover", r)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: false, Message: "系统错误,请稍后重试"}) } }() - // 将 payload 结构体转为 JSONB 可用的 []byte payloadBytes, err := json.Marshal(req.Payload) if err != nil { tx.Rollback() - zap.L().Error("❌ payload 序列化失败", - zap.String("req_id", reqID), - zap.Error(err), - ) - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: false, - Message: "内部数据转换错误", - }) + zap.L().Error("❌ payload 序列化失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: false, Message: "内部数据转换错误"}) return } - // 插入交易记录 var recordID string - err = tx.QueryRow( - `INSERT INTO cn_futures_trading_records (event_type, payload) - VALUES ('交易中', $1) - RETURNING id`, - payloadBytes, // 只剩一个占位符 - ).Scan(&recordID) + err = tx.QueryRow(`INSERT INTO cn_futures_trading_records (event_type, payload) VALUES ('交易中', $1) RETURNING id`, payloadBytes).Scan(&recordID) if err != nil { tx.Rollback() - zap.L().Error("❌ cn_futures_trading_records 插入失败", - zap.String("req_id", reqID), - zap.Error(err), - ) - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: false, - Message: "保存交易记录失败", - }) + zap.L().Error("❌ 插入失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: false, Message: "保存交易记录失败"}) return } - // 提交事务 if err := tx.Commit(); err != nil { tx.Rollback() - zap.L().Error("❌ 事务提交失败", - zap.String("req_id", reqID), - zap.String("record_id", recordID), - zap.Error(err), - ) - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: false, - Message: "数据提交失败,请稍后重试", - }) + zap.L().Error("❌ 事务提交失败", zap.String("req_id", reqID), zap.String("record_id", recordID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: false, Message: "数据提交失败,请稍后重试"}) return } duration := time.Since(startTime) - zap.L().Info("✅ 交易记录创建请求处理完成", - zap.String("req_id", reqID), - zap.String("record_id", recordID), - zap.Duration("duration", duration), - ) - - c.JSON(http.StatusOK, TradingRecordsCreateResponse{ - Success: true, - Message: "创建成功", - Data: TradingRecordsCreateData{ - RecordID: recordID, - }, - }) + zap.L().Info("✅ 交易记录创建完成", zap.String("req_id", reqID), zap.String("record_id", recordID), zap.Duration("duration", duration)) + c.JSON(http.StatusOK, TradingRecordsCreateResponse{Success: true, Message: "创建成功", Data: TradingRecordsCreateData{RecordID: recordID}}) } -// CreateBatchHandler 批量处理交易记录创建逻辑 -func CreateBatchHandler(c *gin.Context) { +/* ---------- 批量创建 ---------- */ +type TradingRecordsBatchCreateRequest struct { + Rows [][]string `json:"rows" binding:"required,min=1,max=5000"` +} -} \ No newline at end of file +type TradingRecordsBatchCreateResponse struct { + Success bool `json:"success"` + Message string `json:"message"` + RecordIDs []string `json:"record_ids"` +} + +func CreateBatchHandler(c *gin.Context) { + startTime := time.Now() + reqID := c.Request.Header.Get("X-TradingRecordsRequest-ID") + if reqID == "" { + reqID = uuid.New().String() + zap.L().Debug("✨ 生成新的请求ID", zap.String("req_id", reqID)) + } + + zap.L().Info("📥 收到交易记录批量创建请求", + zap.String("req_id", reqID), + zap.String("path", c.Request.URL.Path), + zap.String("method", c.Request.Method), + ) + + var req TradingRecordsBatchCreateRequest + if err := c.ShouldBindJSON(&req); err != nil { + zap.L().Warn("⚠️ 请求参数验证失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "请求参数错误,请检查字段格式与数量"}) + return + } + + zap.L().Debug("✅ 请求参数验证通过", zap.String("req_id", reqID), zap.Int("row_count", len(req.Rows))) + + tx, err := db.DB.Begin() + if err != nil { + zap.L().Error("❌ 事务开启失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "系统错误,请稍后重试"}) + return + } + defer func() { + if r := recover(); r != nil { + _ = tx.Rollback() + zap.L().Error("💥 事务处理发生panic", zap.String("req_id", reqID), zap.Any("recover", r)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "系统错误,请稍后重试"}) + } + }() + + stmt, err := tx.Prepare(`INSERT INTO cn_futures_trading_records (event_type, payload) VALUES ('交易中', $1) RETURNING id`) + if err != nil { + tx.Rollback() + zap.L().Error("❌ 预编译语句失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "系统错误,请稍后重试"}) + return + } + defer stmt.Close() + + var recordIDs []string + for idx, row := range req.Rows { + if len(row) != 18 { + tx.Rollback() + zap.L().Warn("⚠️ 字段数量不匹配", zap.String("req_id", reqID), zap.Int("row_index", idx), zap.Int("field_count", len(row))) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "第 " + strconv.Itoa(idx+1) + " 行字段数量不足 18 个"}) + return + } + + var p Payload + p.OpenYear, _ = strconv.Atoi(row[0]) + p.OpenMonth, _ = strconv.Atoi(row[1]) + p.OpenDay, _ = strconv.Atoi(row[2]) + p.Symbol = row[3] + p.Contract = row[4] + p.Direction = row[5] + p.OpenPrice, _ = strconv.ParseFloat(row[6], 64) + p.OpenFee, _ = strconv.ParseFloat(row[7], 64) + p.CloseYear, _ = strconv.Atoi(row[8]) + p.CloseMonth, _ = strconv.Atoi(row[9]) + p.CloseDay, _ = strconv.Atoi(row[10]) + p.ClosePrice, _ = strconv.ParseFloat(row[11], 64) + p.CloseFee, _ = strconv.ParseFloat(row[12], 64) + p.PriceDiff, _ = strconv.ParseFloat(row[13], 64) + p.MinTick, _ = strconv.ParseFloat(row[14], 64) + p.TickPrice, _ = strconv.ParseFloat(row[15], 64) + p.DiffPnL, _ = strconv.ParseFloat(row[16], 64) + p.TotalFee, _ = strconv.ParseFloat(row[17], 64) + p.ClosePnL = p.DiffPnL - p.TotalFee + + payloadBytes, _ := json.Marshal(p) + var id string + if err := stmt.QueryRow(payloadBytes).Scan(&id); err != nil { + tx.Rollback() + zap.L().Error("❌ 单行插入失败", zap.String("req_id", reqID), zap.Int("row_index", idx), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "第 " + strconv.Itoa(idx+1) + " 行插入失败"}) + return + } + recordIDs = append(recordIDs, id) + } + + if err := tx.Commit(); err != nil { + tx.Rollback() + zap.L().Error("❌ 事务提交失败", zap.String("req_id", reqID), zap.Error(err)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: false, Message: "数据提交失败,请稍后重试"}) + return + } + + duration := time.Since(startTime) + zap.L().Info("✅ 交易记录批量创建完成", zap.String("req_id", reqID), zap.Int("inserted_count", len(recordIDs)), zap.Duration("duration", duration)) + c.JSON(http.StatusOK, TradingRecordsBatchCreateResponse{Success: true, Message: "批量创建成功", RecordIDs: recordIDs}) +}