From 9b2380fefb6503176bc79399ce93488ac521bcd3 Mon Sep 17 00:00:00 2001 From: fish Date: Sat, 28 Mar 2026 20:32:03 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=20WebSocket=20?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E7=BB=93=E6=9E=84=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=96=B0=E7=9A=84=E5=91=BD=E4=BB=A4=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- backend/gateway/internal/ws/client.go | 143 ++++++++++++++++++++++++-- 1 file changed, 137 insertions(+), 6 deletions(-) diff --git a/backend/gateway/internal/ws/client.go b/backend/gateway/internal/ws/client.go index c7a621c..503214c 100644 --- a/backend/gateway/internal/ws/client.go +++ b/backend/gateway/internal/ws/client.go @@ -58,26 +58,56 @@ func (c *Client) readPump() { break } - var msg Message - if err := json.Unmarshal(message, &msg); err != nil { + // 解析为通用的消息结构 + var msgMap map[string]interface{} + if err := json.Unmarshal(message, &msgMap); err != nil { log.Printf("error unmarshaling message: %v", err) continue } // 处理消息 - switch msg.Type { + c.handleMessage(msgMap) + } +} + +func (c *Client) handleMessage(msgMap map[string]interface{}) { + // 检查是否是 ping 消息(保持向后兼容) + if msgType, ok := msgMap["type"].(string); ok { + switch msgType { case MessageTypePing: // 回复 pong pongMsg := &Message{Type: MessageTypePong} c.send <- pongMsg + return case MessageTypeText: // 广播文本消息 - c.hub.broadcast <- &msg + msg := &Message{ + Type: MessageTypeText, + Content: msgMap["content"].(string), + Data: msgMap["data"], + } + c.hub.broadcast <- msg + return case MessageTypeCommand: - // 处理命令 - c.handleCommand(&msg) + // 处理命令(保持向后兼容) + msg := &Message{ + Type: MessageTypeCommand, + Data: msgMap["data"], + } + c.handleCommand(msg) + return } } + + // 处理新的消息结构 + if cmd, ok := msgMap["cmd"].(string); ok { + // 提取数据 + data := msgMap["data"] + seq := msgMap["seq"].(string) + + // 处理命令 + c.handleNewCommand(seq, cmd, data) + } } func (c *Client) writePump() { @@ -199,3 +229,104 @@ func (c *Client) handleCommand(msg *Message) { c.send <- response } + +func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) { + // 处理新的命令结构 + log.Printf("Received new command: %s, seq: %s, data: %v", cmd, seq, data) + + // 根据 cmd 字段处理不同的命令 + switch cmd { + case "user.register": + // 提取注册信息 + if registerData, ok := data.(map[string]interface{}); ok { + account, accountOk := registerData["account"].(string) + password, passwordOk := registerData["password"].(string) + + if !accountOk || !passwordOk { + // 回复错误信息 + errorResponse := map[string]interface{}{ + "seq": seq, + "cmd": cmd, + "type": "error", + "content": "Invalid register command: missing account or password", + "timestamp": time.Now().UnixMilli(), + } + c.sendJSON(errorResponse) + return + } + + // 调用用户服务注册 + if c.hub.userService != nil { + // 异步调用用户服务注册 + go func() { + resp, err := c.hub.userService.Register(nil, account, password) + if err != nil { + // 回复错误信息 + errorResponse := map[string]interface{}{ + "seq": seq, + "cmd": cmd, + "type": "error", + "content": "Register failed: " + err.Error(), + "timestamp": time.Now().UnixMilli(), + } + c.sendJSON(errorResponse) + return + } + + // 回复成功信息 + successResponse := map[string]interface{}{ + "seq": seq, + "cmd": cmd, + "type": "text", + "content": "Register successful", + "data": map[string]interface{}{ + "user_id": resp.UserId, + "account": resp.Account, + "message": resp.Response.Message, + "code": resp.Response.Code, + }, + "timestamp": time.Now().UnixMilli(), + } + c.sendJSON(successResponse) + }() + } else { + // 回复错误信息 + errorResponse := map[string]interface{}{ + "seq": seq, + "cmd": cmd, + "type": "error", + "content": "User service not available", + "timestamp": time.Now().UnixMilli(), + } + c.sendJSON(errorResponse) + } + } + return + default: + // 其他命令处理 + response := map[string]interface{}{ + "seq": seq, + "cmd": cmd, + "type": "text", + "content": "Command executed successfully", + "data": data, + "timestamp": time.Now().UnixMilli(), + } + c.sendJSON(response) + } +} + +func (c *Client) sendJSON(data interface{}) { + // 将数据转换为 JSON 并发送 + msgBytes, err := json.Marshal(data) + if err != nil { + log.Printf("error marshaling message: %v", err) + return + } + + // 写入 WebSocket 连接 + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.TextMessage, msgBytes); err != nil { + log.Printf("error writing message: %v", err) + } +}