Go WebSocket 服务端基本交互模式

前言

生产里的 WebSocket 服务往往要在「多协程读、写」和「连接生命周期」之间做折中。
下面这套写法把「读」留在 HTTP 处理协程里循环 ReadMessage,把「写」集中到单独协程里串行执行,避免多协程同时 WriteMessage 带来的竞态。
本文在参考常见业务服务结构的前提下,只保留升级、登记连接、收文本、经通道发送、断开清理这几步,便于你对照扩展自己的消息协议。

依赖

在项目目录执行下面命令拉取 gorilla/websocket,版本以你本地 go.mod 解析结果为准。

1
go get github.com/gorilla/websocket

实现

升级器与路由

Upgrader 负责把 HTTP 连接升级为 WebSocket
CheckOrigin 在示例里返回 true 仅方便本地与内网调试,上线后应改为按来源白名单校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package main

import (
"fmt"
"net/http"
"sync"
"time"

"github.com/gorilla/websocket"
)

var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
return true
},
}

连接模型与全局表

每个连接挂一个带缓冲的 Send 通道,写协程只从该通道取字符串再 WriteMessage
sync.RWMutex 保护 connByAddr,键使用 fmt.Sprintf("%p", conn) 仅为演示;若你需要稳定、可跨重启关联的 ID,应换成连接建立时生成的 UUID 等业务键。

1
2
3
4
5
6
7
8
9
10
11
type ConnModel struct {
Conn *websocket.Conn
Send chan string
ConnAddr string
closeOnce sync.Once
}

var (
stateMu sync.RWMutex
connByAddr = map[string]*ConnModel{}
)

写协程与发送入口

startWriter 为每个连接启动一个协程,循环读 Send
发送前设置写超时,写失败时触发清理并关闭连接,避免僵尸写协程一直阻塞。

下面展示写协程与对外的 sendMsg
sendMsg 里用读锁查表,再 select 投递。
default 分支表示队列已满,可按需打日志或断开连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func startWriter(m *ConnModel) {
go func() {
for msg := range m.Send {
_ = m.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
if err := m.Conn.WriteMessage(websocket.TextMessage, []byte(msg)); err != nil {
closeEvent(m.Conn)
_ = m.Conn.Close()
return
}
}
}()
}

func sendMsg(conn *websocket.Conn, text string) {
if conn == nil {
return
}
key := fmt.Sprintf("%p", conn)
stateMu.RLock()
m := connByAddr[key]
stateMu.RUnlock()
if m == nil || m.Send == nil {
return
}
select {
case m.Send <- text:
default:
// 队列满:可记录日志或关闭连接
}
}

处理函数与读循环

echoHandler 在升级成功后注册连接、启动写协程,然后在当前协程里阻塞读文本。
读出错或客户端关闭时跳出循环,defer 里统一做关闭与清理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func echoHandler(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer func() {
closeEvent(conn)
_ = conn.Close()
}()

conn.SetCloseHandler(func(code int, text string) error {
closeEvent(conn)
return nil
})

key := fmt.Sprintf("%p", conn)
m := &ConnModel{
Conn: conn,
Send: make(chan string, 256),
ConnAddr: key,
}
startWriter(m)

stateMu.Lock()
connByAddr[key] = m
stateMu.Unlock()

for {
mt, data, err := conn.ReadMessage()
if err != nil {
break
}
if mt == websocket.TextMessage {
// 基本交互:原样回显;此处可换成 JSON 解析与路由
sendMsg(conn, string(data))
}
}
}

断开时清理

从全局表删除条目并关闭 Send,让写协程退出。
deferSetCloseHandler 都可能触发清理,因此用 sync.Once 包住 close(Send),避免重复关闭引发 panic。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func closeEvent(conn *websocket.Conn) {
key := fmt.Sprintf("%p", conn)
stateMu.Lock()
defer stateMu.Unlock()
if m, ok := connByAddr[key]; ok {
delete(connByAddr, key)
m.closeOnce.Do(func() {
if m.Send != nil {
close(m.Send)
m.Send = nil
}
})
}
}

启动 HTTP

监听端口与注册路由与普通 HTTP 服务相同。

1
2
3
4
func main() {
http.HandleFunc("/ws", echoHandler)
_ = http.ListenAndServe(":8080", nil)
}

验证

  1. 启动程序后,用浏览器控制台或任意 WebSocket 客户端连接 ws://127.0.0.1:8080/ws
  2. 发送一段文本,应收到相同内容的回显。
  3. 关闭客户端标签或断开网络,服务端应不再持有该键对应条目(可在 closeEvent 内临时打日志确认)。

扩展

在保持「单写协程」的前提下,你可以把收包逻辑拆成解析 JSON、校验字段、再按业务码分发。
广播时对 connByAddr 中每个连接调用 sendMsg 即可。
若需要优雅退出,可对 http.Server 使用 Shutdown,并向所有连接发送关闭帧后再清理。

总结

步骤简述如下。

  1. Upgrader 处理 GET 升级请求并得到 *websocket.Conn
  2. 为每个连接创建带缓冲的 Send 通道,单独协程消费通道并执行 WriteMessage
  3. 读循环留在处理协程中,业务上只把「待发内容」丢进 Send
  4. 断开时在互斥保护下删表、close(Send),并关闭底层连接。

注意:CheckOrigin 不可长期全开。
%p 仅作演示键。
写失败须与读退出路径一样走清理逻辑,避免泄漏与重复写。

前端测试

本地打开 HTML,向本机服务发一条文本并打印回包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WS 测试</title>
</head>
<body>
<script>
let ws = new WebSocket("ws://localhost:10088/ws");
ws.onopen = function () {
console.log("Connected");
ws.send("Hello, World!");
};

ws.onmessage = function (event) {
console.log("Received message: " + event.data);
};

ws.onclose = function () {
console.log("Disconnected");
};
</script>
</body>
</html>

构建

build_linux64.bat

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@echo off  
set TARGET_FILE=xh-control-ws
set TARGET_DIR=_dist
set TARGET_DIR_PATH=%cd%\%TARGET_DIR%\

echo ---------------------------------
echo Delete Origin Dir
IF EXIST %TARGET_DIR_PATH% rd /S /Q %TARGET_DIR_PATH%
IF NOT EXIST %TARGET_DIR% mkdir %TARGET_DIR%

echo ---------------------------------
echo Build API
set GOOS=linux
set GOARCH=amd64
set CGO_ENABLED=0
go build

echo Copy API
move %TARGET_FILE% %TARGET_DIR_PATH% > NUL
xcopy /e /i "config" %TARGET_DIR_PATH%config > NUL

echo ---------------------------------
echo Success

pause

Nginx配置

要在应用里识别真实客户端,应读取 X-Forwarded-For(或你约定的 X-Real-IP 等),并注意链式代理时 X-Forwarded-For 可能是逗号分隔的多段地址,通常取第一个非信任段需结合部署规范。

Nginx 侧示例(把客户端地址塞进转发头,供上游 Go 读取)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
upstream mcwstest {
server 127.0.0.1:8017;
}

map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}

server {
location / {
proxy_pass http://mcwstest;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
}

服务器操作

添加服务

准备一个最小单元文件。

假设你的可执行文件放在 /data/wwwjarapi/8931mcws/xh-control-ws,示例中用 root 用户(生产环境建议创建专用系统用户运行)。

创建文件

1
vi /etc/systemd/system/xh-control-ws.service

内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[Unit]
Description=xh-control-ws
After=network.target

[Service]
Type=simple
User=root
Group=root
WorkingDirectory=/data/wwwjarapi/8931mcws
ExecStart=/data/wwwjarapi/8931mcws/xh-control-ws
Restart=always
RestartSec=5s
StartLimitInterval=60s
StartLimitBurst=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target

[Unit]Description 为服务描述,After=network.target 表示等网络就绪后再启动。

[Service] 配置说明:

Type=simple → 拉起进程就算启动完成
User=root → 用 root 身份跑(示例用,生产建议专用用户)
Group=root → 用户组 root
WorkingDirectory=… → 先 cd 到该目录再启动
ExecStart=… → 要执行的命令(必须绝对路径)
Restart=always → 一挂就重启
RestartSec=5s → 等待5秒再重启
StartLimitInterval=60s + StartLimitBurst=10 → 60 秒内超 10 次重启就放弃
StandardOutput=journal → 把 stdout 打进 systemd 日志
StandardError=journal → 把 stderr 也打进 systemd 日志

启用并立即启

1
2
sudo systemctl daemon-reload
sudo systemctl enable --now xh-control-ws

之后若修改过 .service 文件,需要先执行 daemon-reloadstartrestart

验证

手动 kill 进程:

1
2
ps -ef | grep xh-control-ws
sudo kill -9 <pid>

几秒后执行 systemctl status xh-control-ws,应看到 Active: active (running),说明已被自动拉起。

查看状态

1
systemctl status xh-control-ws

查看实时日志:

1
journalctl -u xh-control-ws -f

服务启动/停止

正常停止/启动

1
2
3
4
5
6
7
8
# 只停掉当前运行实例(下次开机若 enable 了仍会自启)
sudo systemctl stop xh-control-ws

# 需要时再启动
sudo systemctl start xh-control-ws

# 重启(先 stop 再 start)
sudo systemctl restart xh-control-ws

彻底禁用自启

1
2
# --now 顺便帮你 stop
sudo systemctl disable --now xh-control-ws