前言 生产里的 WebSocket 服务往往要在「多协程读、写」和「连接生命周期」之间做折中。 下面这套写法把「读」留在 HTTP 处理协程里循环 ReadMessage,把「写」集中到单独协程里串行执行,避免多协程同时 WriteMessage 带来的竞态。 本文在参考常见业务服务结构的前提下,只保留升级、登记连接、收文本、经通道发送、断开清理这几步,便于你对照扩展自己的消息协议。
依赖 在项目目录执行下面命令拉取 gorilla/websocket,版本以你本地 go.mod 解析结果为准。
1 go get github.com/gorilla/websocket
实现 升级器与路由 Upgrader 负责把 HTTP 连接升级为 WebSocket。CheckOrigin 在示例里返回 true 仅方便本地与内网调试,上线后应改为按来源白名单校验。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 package mainimport ( "fmt" "net/http" "sync" "time" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024 , WriteBufferSize: 1024 , CheckOrigin: func (r *http.Request) bool { return true }, }
连接模型与全局表 每个连接挂一个带缓冲的 Send 通道,写协程只从该通道取字符串再 WriteMessage。 用 sync.RWMutex 保护 connByAddr,键使用 fmt.Sprintf("%p", conn) 仅为演示;若你需要稳定、可跨重启关联的 ID,应换成连接建立时生成的 UUID 等业务键。
1 2 3 4 5 6 7 8 9 10 11 type ConnModel struct { Conn *websocket.Conn Send chan string ConnAddr string closeOnce sync.Once } var ( stateMu sync.RWMutex connByAddr = map [string ]*ConnModel{} )
写协程与发送入口 startWriter 为每个连接启动一个协程,循环读 Send。 发送前设置写超时,写失败时触发清理并关闭连接,避免僵尸写协程一直阻塞。
下面展示写协程与对外的 sendMsg。sendMsg 里用读锁查表,再 select 投递。default 分支表示队列已满,可按需打日志或断开连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func startWriter (m *ConnModel) { go func () { for msg := range m.Send { _ = m.Conn.SetWriteDeadline(time.Now().Add(10 * time.Second)) if err := m.Conn.WriteMessage(websocket.TextMessage, []byte (msg)); err != nil { closeEvent(m.Conn) _ = m.Conn.Close() return } } }() } func sendMsg (conn *websocket.Conn, text string ) { if conn == nil { return } key := fmt.Sprintf("%p" , conn) stateMu.RLock() m := connByAddr[key] stateMu.RUnlock() if m == nil || m.Send == nil { return } select { case m.Send <- text: default : } }
处理函数与读循环 echoHandler 在升级成功后注册连接、启动写协程,然后在当前协程里阻塞读文本。 读出错或客户端关闭时跳出循环,defer 里统一做关闭与清理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 func echoHandler (w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil ) if err != nil { return } defer func () { closeEvent(conn) _ = conn.Close() }() conn.SetCloseHandler(func (code int , text string ) error { closeEvent(conn) return nil }) key := fmt.Sprintf("%p" , conn) m := &ConnModel{ Conn: conn, Send: make (chan string , 256 ), ConnAddr: key, } startWriter(m) stateMu.Lock() connByAddr[key] = m stateMu.Unlock() for { mt, data, err := conn.ReadMessage() if err != nil { break } if mt == websocket.TextMessage { sendMsg(conn, string (data)) } } }
断开时清理 从全局表删除条目并关闭 Send,让写协程退出。defer 与 SetCloseHandler 都可能触发清理,因此用 sync.Once 包住 close(Send),避免重复关闭引发 panic。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 func closeEvent (conn *websocket.Conn) { key := fmt.Sprintf("%p" , conn) stateMu.Lock() defer stateMu.Unlock() if m, ok := connByAddr[key]; ok { delete (connByAddr, key) m.closeOnce.Do(func () { if m.Send != nil { close (m.Send) m.Send = nil } }) } }
启动 HTTP 监听端口与注册路由与普通 HTTP 服务相同。
1 2 3 4 func main () { http.HandleFunc("/ws" , echoHandler) _ = http.ListenAndServe(":8080" , nil ) }
验证
启动程序后,用浏览器控制台或任意 WebSocket 客户端连接 ws://127.0.0.1:8080/ws。
发送一段文本,应收到相同内容的回显。
关闭客户端标签或断开网络,服务端应不再持有该键对应条目(可在 closeEvent 内临时打日志确认)。
扩展 在保持「单写协程」的前提下,你可以把收包逻辑拆成解析 JSON、校验字段、再按业务码分发。 广播时对 connByAddr 中每个连接调用 sendMsg 即可。 若需要优雅退出,可对 http.Server 使用 Shutdown,并向所有连接发送关闭帧后再清理。
总结 步骤简述如下。
用 Upgrader 处理 GET 升级请求并得到 *websocket.Conn。
为每个连接创建带缓冲的 Send 通道,单独协程消费通道并执行 WriteMessage。
读循环留在处理协程中,业务上只把「待发内容」丢进 Send。
断开时在互斥保护下删表、close(Send),并关闭底层连接。
注意:CheckOrigin 不可长期全开。%p 仅作演示键。 写失败须与读退出路径一样走清理逻辑,避免泄漏与重复写。
前端测试 本地打开 HTML,向本机服务发一条文本并打印回包。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 <!DOCTYPE html > <html lang ="en" > <head > <meta charset ="UTF-8" > <title > WS 测试</title > </head > <body > <script > let ws = new WebSocket ("ws://localhost:10088/ws" ); ws.onopen = function ( ) { console .log ("Connected" ); ws.send ("Hello, World!" ); }; ws.onmessage = function (event ) { console .log ("Received message: " + event.data ); }; ws.onclose = function ( ) { console .log ("Disconnected" ); }; </script > </body > </html >
构建 build_linux64.bat
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @echo off set TARGET_FILE=xh-control-wsset TARGET_DIR=_distset TARGET_DIR_PATH=%cd %\%TARGET_DIR%\echo ---------------------------------echo Delete Origin DirIF EXIST %TARGET_DIR_PATH% rd /S /Q %TARGET_DIR_PATH% IF NOT EXIST %TARGET_DIR% mkdir %TARGET_DIR% echo ---------------------------------echo Build APIset GOOS=linuxset GOARCH=amd64set CGO_ENABLED=0 go build echo Copy APImove %TARGET_FILE% %TARGET_DIR_PATH% > NUL xcopy /e /i "config" %TARGET_DIR_PATH%config > NUL echo ---------------------------------echo Successpause
Nginx配置 要在应用里识别真实客户端,应读取 X-Forwarded-For(或你约定的 X-Real-IP 等),并注意链式代理时 X-Forwarded-For 可能是逗号分隔的多段地址,通常取第一个非信任段需结合部署规范。
Nginx 侧示例(把客户端地址塞进转发头,供上游 Go 读取)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 upstream mcwstest { server 127.0.0.1:8017 ; } map $http_upgrade $connection_upgrade { default upgrade; '' close; } server { location / { proxy_pass http://mcwstest; proxy_http_version 1 .1 ; proxy_set_header Upgrade $http_upgrade ; proxy_set_header Connection $connection_upgrade ; proxy_set_header Host $host ; proxy_set_header X-Real-IP $remote_addr ; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for ; } }
服务器操作 添加服务 准备一个最小单元文件。
假设你的可执行文件放在 /data/wwwjarapi/8931mcws/xh-control-ws,示例中用 root 用户(生产环境建议创建专用系统用户运行)。
创建文件
1 vi /etc/systemd/system/xh-control-ws.service
内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 [Unit] Description =xh-control-wsAfter =network.target[Service] Type =simpleUser =rootGroup =rootWorkingDirectory =/data/wwwjarapi/8931 mcwsExecStart =/data/wwwjarapi/8931 mcws/xh-control-wsRestart =alwaysRestartSec =5 sStartLimitInterval =60 sStartLimitBurst =10 StandardOutput =journalStandardError =journal[Install] WantedBy =multi-user.target
[Unit]:Description 为服务描述,After=network.target 表示等网络就绪后再启动。
[Service] 配置说明:
Type=simple → 拉起进程就算启动完成User=root → 用 root 身份跑(示例用,生产建议专用用户)Group=root → 用户组 rootWorkingDirectory=… → 先 cd 到该目录再启动ExecStart=… → 要执行的命令(必须绝对路径)Restart=always → 一挂就重启RestartSec=5s → 等待5秒再重启StartLimitInterval=60s + StartLimitBurst=10 → 60 秒内超 10 次重启就放弃StandardOutput=journal → 把 stdout 打进 systemd 日志StandardError=journal → 把 stderr 也打进 systemd 日志
启用并立即启 1 2 sudo systemctl daemon-reloadsudo systemctl enable --now xh-control-ws
之后若修改过 .service 文件,需要先执行 daemon-reload 再 start 或 restart。
验证 手动 kill 进程:
1 2 ps -ef | grep xh-control-ws sudo kill -9 <pid>
几秒后执行 systemctl status xh-control-ws,应看到 Active: active (running),说明已被自动拉起。
查看状态 1 systemctl status xh-control-ws
查看实时日志:
1 journalctl -u xh-control-ws -f
服务启动/停止 正常停止/启动 1 2 3 4 5 6 7 8 sudo systemctl stop xh-control-wssudo systemctl start xh-control-wssudo systemctl restart xh-control-ws
彻底禁用自启 1 2 sudo systemctl disable --now xh-control-ws