Android Websocket客户端

前言

大多数Android网络请求都是使用的Retrofit/OkHttp,它本身也支持WS的连接。

为了各个页面都能进行WS通讯,这里使用全局 WebSocket 管理器 + 消息分发机制来实现。

客户端

连接工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
package com.xhkjedu.zxs_android.ws

import android.util.Log
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import java.util.concurrent.TimeUnit
import kotlin.math.min

object ZWsManager {
private var wsUrl = ""
private var webSocket: WebSocket? = null
private val client = OkHttpClient.Builder()
.pingInterval(20, TimeUnit.SECONDS)
.build()

//是否正常退出
private var isManualClose = false

//重连延迟
private var currentReconnectDelay = 1_000L

//重连最大延迟
private const val reconnectMaxDelay: Long = 30_000

//重连次数
private var reconnectCount = 0

//最大重连次数
private const val MAX_RECONNECT_COUNT = 10

//重连任务
private var reconnectJob: Job? = null


private val listeners = mutableListOf<ZWsListener>()

//重置参数
fun resetParas() {
reconnectCount = 0
currentReconnectDelay = 1_000L
isManualClose = false
}

/**
* 连接WebSocket服务器
*
* @param url WebSocket服务器地址
* @param isReConnect 是否为重连操作 false 新建连接 true 重连
*/
fun connect(url: String = "", isReConnect: Boolean = false) {
if (url.isNotEmpty()) {
wsUrl = url
}

if (webSocket != null) {
webSocket?.close(1000, "Client closed")
webSocket = null
}
//新建连接
if (!isReConnect) {
resetParas()
}

if (wsUrl.isEmpty()) {
return
}

val request = Request.Builder().url(wsUrl).build()
webSocket = client.newWebSocket(request, object : okhttp3.WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
Log.w("WebSocket", "连接成功")
notifyOnConnected()
}

override fun onMessage(webSocket: WebSocket, text: String) {
notifyOnMessage(text)
}

override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
Log.e("WebSocket", "连接失败: ${t.message}", t)
this@ZWsManager.webSocket = null

if (reconnectCount >= MAX_RECONNECT_COUNT) {
Log.w("WebSocket", "已达到最大重连次数 ($MAX_RECONNECT_COUNT),停止重连")
notifyOnFailure(t)
return
}

notifyOnFailure(t)

if (!isManualClose) {
scheduleReconnect()
} else {
Log.w("WebSocket", "正常退出不再重连")
}
}

override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
Log.w("WebSocket", "连接关闭: $code - $reason")
this@ZWsManager.webSocket = null
notifyOnClosed(code, reason)

if (reconnectCount >= MAX_RECONNECT_COUNT) {
Log.w("WebSocket", "已达到最大重连次数,停止重连")
return
}

if (!isManualClose) {
scheduleReconnect()
} else {
Log.w("WebSocket", "正常退出不再重连")
}
}
})
}

fun disconnect() {
isManualClose = true

val oldJob = reconnectJob
reconnectJob = null
oldJob?.cancel()

webSocket?.close(1000, "Client closed")
webSocket = null

Log.w("WebSocket", "连接已断开")
}

fun sendMessage(text: String): Boolean {
return webSocket?.send(text) ?: false
}

fun registerListener(listener: ZWsListener) {
if (!listeners.contains(listener)) {
listeners.add(listener)
}
}

fun unregisterListener(listener: ZWsListener) {
listeners.remove(listener)
}

private fun notifyOnConnected() {
Log.w("WebSocket", "通知监听器:连接成功")
listeners.forEach { it.onConnected() }
}

private fun notifyOnMessage(message: String) {
listeners.forEach { it.onMessageReceived(message) }
}

private fun notifyOnFailure(throwable: Throwable) {
Log.e("WebSocket", "通知监听器:连接失败", throwable)
listeners.forEach { it.onFailure(throwable) }
}

private fun notifyOnClosed(code: Int, reason: String) {
Log.w("WebSocket", "通知监听器:连接关闭 ($code - $reason)")
listeners.forEach { it.onClosed(code, reason) }
}

@OptIn(DelicateCoroutinesApi::class)
private fun scheduleReconnect() {
if (isManualClose) return

reconnectCount++
Log.w("WebSocket", "计划在 ${currentReconnectDelay}ms 后重连 (第 $reconnectCount 次)")

val newJob = GlobalScope.launch(Dispatchers.IO) {
try {
delay(currentReconnectDelay)

if (isManualClose) {
Log.w("WebSocket", "重连被取消")
return@launch
}

if (webSocket != null) {
Log.w("WebSocket", "已有活跃连接,跳过重连")
return@launch
}

if (reconnectCount >= MAX_RECONNECT_COUNT) {
Log.w("WebSocket", "已达到最大重连次数,停止重连")
return@launch
}

currentReconnectDelay = min(currentReconnectDelay * 2, reconnectMaxDelay)
connect(isReConnect = true)

} catch (e: InterruptedException) {
Log.w("WebSocket", "重连被中断")
Thread.currentThread().interrupt()
} catch (e: Exception) {
Log.e("WebSocket", "重连任务取消")
reconnectCount--
currentReconnectDelay = min(currentReconnectDelay * 2, reconnectMaxDelay)
}
}

reconnectJob = newJob
}
}

interface ZWsListener {
fun onConnected() {}
fun onMessageReceived(message: String) {}
fun onFailure(error: Throwable) {}
fun onClosed(code: Int, reason: String) {}
}

连接

实现ZWsListener接口

1
2
3
4
fun actionInit() {
ZWsManager.connect(ConfigData.wsUrl)
ZWsManager.registerListener(this)
}

服务端

为了方便本地测试,这里使用NodeJS运行一个WS服务端。

以下是一个 本地最简单的 WebSocket 服务端,使用 Node.js + ws 实现,适合开发调试。

初始化项目

1
2
3
mkdir z-websocket-server
cd z-websocket-server
npm init -y

安装依赖

1
npm install ws

ws 是 Node.js 社区最流行、轻量且符合 RFC6455 标准的 WebSocket 库。

创建服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// server.js
const WebSocket = require('ws');

// 创建 WebSocket 服务器,监听 8080 端口
const wss = new WebSocket.Server({ port: 8080 });

console.log('WebSocket 服务器启动,监听 ws://localhost:8080');

wss.on('connection', (ws, req) => {
const clientIP = req.socket.remoteAddress;
console.log(`新客户端连接: ${clientIP}`);

// 向客户端发送欢迎消息
ws.send(JSON.stringify({ type: 'welcome', message: '已连接到服务器!' }));

// 监听客户端发来的消息
ws.on('message', (data) => {
console.log('收到消息:', data.toString());

try {
const msg = JSON.parse(data);
// 回显消息(可自定义逻辑)
ws.send(JSON.stringify({
type: 'echo',
original: msg,
timestamp: Date.now()
}));
} catch (e) {
// 非 JSON 消息,直接回显
ws.send(`你发送了: ${data}`);
}
});

// 客户端断开连接
ws.on('close', () => {
console.log('客户端断开连接');
});

// 发生错误
ws.on('error', (err) => {
console.error('WebSocket 错误:', err);
});
});

// 可选:处理进程退出,优雅关闭
process.on('SIGINT', () => {
console.log('\n正在关闭 WebSocket 服务器...');
wss.close(() => {
console.log('服务器已关闭');
process.exit(0);
});
});

启动服务

1
node server.js

输出:

1
WebSocket 服务器启动,监听 ws://localhost:8080

测试连接

使用浏览器控制台(仅限简单文本)

打开浏览器开发者工具 → Console,输入:

1
2
3
4
5
6
7
8
9
10
const ws = new WebSocket('ws://localhost:8080');

ws.onopen = () => {
console.log('连接成功');
ws.send('Hello Server!');
};

ws.onmessage = (event) => {
console.log('收到:', event.data);
};

扩展建议(按需添加)

支持跨域(CORS)

ws 默认不限制 Origin,但若需显式允许:

1
2
3
4
5
6
7
const wss = new WebSocket.Server({
port: 8080,
verifyClient: (info, done) => {
// 允许所有来源(开发环境)
done(true);
}
});

广播消息给所有客户端

1
2
3
4
5
6
// 在 message 处理中广播
wss.clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send('广播消息');
}
});