修复:为 user-svc 添加健康检查和启动顺序控制

This commit is contained in:
fish
2026-03-28 21:54:09 +08:00
parent 5ac0a52bb1
commit c5260bcae8
31 changed files with 1995 additions and 167 deletions

View File

@@ -23,21 +23,21 @@ const (
)
type Client struct {
hub *Hub
conn *websocket.Conn
send chan *Message
Hub *Hub
Conn *websocket.Conn
Send chan *Message
}
func NewClient(hub *Hub, conn *websocket.Conn) *Client {
return &Client{
hub: hub,
conn: conn,
send: make(chan *Message, 256),
Hub: hub,
Conn: conn,
Send: make(chan *Message, 256),
}
}
// sendWs 发送 WebSocket 消息,自动生成 seq、cmd、timestamp
func (c *Client) sendWs(cmd string, data interface{}) error {
// SendWs 发送 WebSocket 消息,自动生成 seq、cmd、timestamp
func (c *Client) SendWs(cmd string, data interface{}) error {
// 生成唯一请求ID
seq := "req_" + time.Now().Format("20060102150405") + "_" + generateRandomString(8)
@@ -57,8 +57,8 @@ func (c *Client) sendWs(cmd string, data interface{}) error {
}
// 写入 WebSocket 连接
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.Conn.WriteMessage(websocket.TextMessage, msgBytes); err != nil {
log.Printf("error writing message: %v", err)
return err
}
@@ -77,21 +77,21 @@ func generateRandomString(length int) string {
return string(result)
}
func (c *Client) readPump() {
func (c *Client) ReadPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
c.Hub.Unregister <- c
c.Conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetReadLimit(maxMessageSize)
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
c.Conn.SetPongHandler(func(string) error {
c.Conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
_, message, err := c.conn.ReadMessage()
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
@@ -107,18 +107,19 @@ func (c *Client) readPump() {
}
// 处理消息
c.handleMessage(msgMap)
c.HandleMessage(msgMap)
}
}
func (c *Client) handleMessage(msgMap map[string]interface{}) {
func (c *Client) HandleMessage(msgMap map[string]interface{}) {
// 检查是否是 ping 消息(保持向后兼容)
if msgType, ok := msgMap["type"].(string); ok {
if msgTypeStr, ok := msgMap["type"].(string); ok {
msgType := MessageType(msgTypeStr)
switch msgType {
case MessageTypePing:
// 回复 pong
pongMsg := &Message{Type: MessageTypePong}
c.send <- pongMsg
c.Send <- pongMsg
return
case MessageTypeText:
// 广播文本消息
@@ -127,7 +128,7 @@ func (c *Client) handleMessage(msgMap map[string]interface{}) {
Content: msgMap["content"].(string),
Data: msgMap["data"],
}
c.hub.broadcast <- msg
c.Hub.Broadcast <- msg
return
case MessageTypeCommand:
// 处理命令(保持向后兼容)
@@ -135,7 +136,7 @@ func (c *Client) handleMessage(msgMap map[string]interface{}) {
Type: MessageTypeCommand,
Data: msgMap["data"],
}
c.handleCommand(msg)
c.HandleCommand(msg)
return
}
}
@@ -147,28 +148,28 @@ func (c *Client) handleMessage(msgMap map[string]interface{}) {
seq := msgMap["seq"].(string)
// 处理命令
c.handleNewCommand(seq, cmd, data)
c.HandleNewCommand(seq, cmd, data)
}
}
func (c *Client) writePump() {
func (c *Client) WritePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
c.Conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
case message, ok := <-c.Send:
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// Hub 关闭了通道
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
w, err := c.Conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
@@ -186,20 +187,20 @@ func (c *Client) writePump() {
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
pingMsg := &Message{Type: MessageTypePing}
pingBytes, err := json.Marshal(pingMsg)
if err != nil {
return
}
if err := c.conn.WriteMessage(websocket.TextMessage, pingBytes); err != nil {
if err := c.Conn.WriteMessage(websocket.TextMessage, pingBytes); err != nil {
return
}
}
}
}
func (c *Client) handleCommand(msg *Message) {
func (c *Client) HandleCommand(msg *Message) {
// 处理命令逻辑
// 这里可以根据命令类型执行不同的操作
log.Printf("Received command: %v", msg.Data)
@@ -217,22 +218,22 @@ func (c *Client) handleCommand(msg *Message) {
Type: MessageTypeError,
Content: "Invalid register command: missing account or password",
}
c.send <- errorResponse
c.Send <- errorResponse
return
}
// 调用用户服务注册
if c.hub.userService != nil {
if c.Hub.UserService != nil {
// 异步调用用户服务注册
go func() {
resp, err := c.hub.userService.Register(nil, account, password)
resp, err := c.Hub.UserService.Register(nil, account, password)
if err != nil {
// 回复错误信息
errorResponse := &Message{
Type: MessageTypeError,
Content: "Register failed: " + err.Error(),
}
c.send <- errorResponse
c.Send <- errorResponse
return
}
@@ -247,7 +248,7 @@ func (c *Client) handleCommand(msg *Message) {
"code": resp.Response.Code,
},
}
c.send <- successResponse
c.Send <- successResponse
}()
} else {
// 回复错误信息
@@ -255,7 +256,7 @@ func (c *Client) handleCommand(msg *Message) {
Type: MessageTypeError,
Content: "User service not available",
}
c.send <- errorResponse
c.Send <- errorResponse
}
return
}
@@ -268,10 +269,10 @@ func (c *Client) handleCommand(msg *Message) {
Data: msg.Data,
}
c.send <- response
c.Send <- response
}
func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) {
func (c *Client) HandleNewCommand(seq string, cmd string, data interface{}) {
// 处理新的命令结构
log.Printf("Received new command: %s, seq: %s, data: %v", cmd, seq, data)
@@ -285,7 +286,7 @@ func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) {
if !accountOk || !passwordOk {
// 回复错误信息
c.sendWs(cmd, map[string]interface{}{
c.SendWs(cmd, map[string]interface{}{
"type": "error",
"content": "Invalid register command: missing account or password",
})
@@ -293,13 +294,13 @@ func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) {
}
// 调用用户服务注册
if c.hub.userService != nil {
if c.Hub.UserService != nil {
// 异步调用用户服务注册
go func() {
resp, err := c.hub.userService.Register(nil, account, password)
resp, err := c.Hub.UserService.Register(nil, account, password)
if err != nil {
// 回复错误信息
c.sendWs(cmd, map[string]interface{}{
c.SendWs(cmd, map[string]interface{}{
"type": "error",
"content": "Register failed: " + err.Error(),
})
@@ -307,7 +308,7 @@ func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) {
}
// 回复成功信息
c.sendWs(cmd, map[string]interface{}{
c.SendWs(cmd, map[string]interface{}{
"type": "text",
"content": "Register successful",
"data": map[string]interface{}{
@@ -320,7 +321,7 @@ func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) {
}()
} else {
// 回复错误信息
c.sendWs(cmd, map[string]interface{}{
c.SendWs(cmd, map[string]interface{}{
"type": "error",
"content": "User service not available",
})
@@ -329,7 +330,7 @@ func (c *Client) handleNewCommand(seq string, cmd string, data interface{}) {
return
default:
// 其他命令处理
c.sendWs(cmd, map[string]interface{}{
c.SendWs(cmd, map[string]interface{}{
"type": "text",
"content": "Command executed successfully",
"data": data,

View File

@@ -3,57 +3,57 @@ package ws
import (
"log"
"backend/gateway/internal/service"
"gateway/internal/service"
)
type Hub struct {
// 注册的客户端
clients map[*Client]bool
Clients map[*Client]bool
// 从客户端接收的消息
broadcast chan *Message
Broadcast chan *Message
// 注册请求
register chan *Client
Register chan *Client
// 注销请求
unregister chan *Client
Unregister chan *Client
// 用户服务
userService *service.UserService
UserService *service.UserService
}
func NewHub(userService *service.UserService) *Hub {
return &Hub{
broadcast: make(chan *Message),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
userService: userService,
Broadcast: make(chan *Message),
Register: make(chan *Client),
Unregister: make(chan *Client),
Clients: make(map[*Client]bool),
UserService: userService,
}
}
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
log.Printf("Client connected. Total clients: %d", len(h.clients))
case client := <-h.Register:
h.Clients[client] = true
log.Printf("Client connected. Total clients: %d", len(h.Clients))
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
log.Printf("Client disconnected. Total clients: %d", len(h.clients))
case client := <-h.Unregister:
if _, ok := h.Clients[client]; ok {
delete(h.Clients, client)
close(client.Send)
log.Printf("Client disconnected. Total clients: %d", len(h.Clients))
}
case message := <-h.broadcast:
for client := range h.clients {
case message := <-h.Broadcast:
for client := range h.Clients {
select {
case client.send <- message:
case client.Send <- message:
default:
close(client.send)
delete(h.clients, client)
close(client.Send)
delete(h.Clients, client)
}
}
}