Netty Starter Netty:https://gitee.com/Yeauty/netty-websocket-spring-boot-starter
注意
服务端主动关闭连接,服务端也会接收到连接关闭的回调。
依赖 基于Netty的Websocket的框架
1 2 3 4 5 6 7 8 9 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-autoconfigure</artifactId > </dependency > <dependency > <groupId > org.yeauty</groupId > <artifactId > netty-websocket-spring-boot-starter</artifactId > <version > 0.9.5</version > </dependency >
代码 添加新类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 import io.netty.handler.codec.http.HttpHeaders;import io.netty.handler.timeout.IdleStateEvent;import org.springframework.util.MultiValueMap;import org.yeauty.annotation.*;import org.yeauty.pojo.Session;import java.io.IOException;import java.util.Map;@ServerEndpoint(path = "${ws.path}", host = "${ws.host}", port = "${ws.port}", optionConnectTimeoutMillis = "${ws.timeout}") public class MyWebSocket { @BeforeHandshake public void handshake ( Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap ) { session.setSubprotocols("stomp" ); if (req != null ) { if (!req.equals("ok" )) { System.out.println("Authentication failed!" ); session.close(); } } } @OnOpen public void onOpen ( Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap ) { System.out.println("连接打开 客户端ID:" + session.hashCode()); } @OnClose public void onClose (Session session) throws IOException { System.out.println("连接关闭 客户端ID:" + session.hashCode()); } @OnError public void onError (Session session, Throwable throwable) { throwable.printStackTrace(); } @OnMessage public void onMessage (Session session, String message) { if (message.equals("close" )) { session.close(); } else { System.out.println("收到消息:" + message + " 客户端ID:" + session.hashCode()); session.sendText("回发:" + message); } } @OnBinary public void onBinary (Session session, byte [] bytes) { System.out.println("接收Binary" ); } @OnEvent public void onEvent (Session session, Object evt) { if (evt instanceof IdleStateEvent) { IdleStateEvent idleStateEvent = (IdleStateEvent) evt; switch (idleStateEvent.state()) { case READER_IDLE: System.out.println("read idle" ); break ; case WRITER_IDLE: System.out.println("write idle" ); break ; case ALL_IDLE: System.out.println("all idle" ); break ; default : break ; } } } }
接下来即可在application.properties中配置
1 2 3 4 ws.path=/ ws.host=0.0.0.0 ws.port=8888 ws.timeout=20000
用户标记 我们可以在onMessage中绑定业务逻辑中的ID
1 session.setAttribute("userId" , userId);
在onMessage、onError、onClose时我们可用重新取到之前绑定的值,进行业务逻辑处理。
1 session.getAttribute("userId" )
onClose移除时
1 2 3 4 5 6 Session msession = xxx;String device = session.getAttribute("device" );if (msession!=null && !msession.isActive() && msession.getAttribute("device" ).equals(device)){ }
之所以要判断msession.isActive()是防止用户短时间重连、断开重连、其它设备登录时msession已经被更新,msession.getAttribute("device").equals(device)是为了防止其它设备登录时msession已经被更新。
发送消息时
1 2 3 4 Session msession = xxx;if (msession!=null && msession.isActive() ){ }
t-io Starter t-io官方示例:https://gitee.com/psvmc/tio-websocket-showcase
官方文档:https://www.tiocloud.com/doc/tio/318?pageNumber=1
注意
不建议使用该库了,因为客户端长时间断网,服务端也收不到连接关闭的回调,这样有些业务逻辑就没法实现。
使用t-io的时候一定要注意
客户端一定不要发送空的消息,只要发送空消息,客户端会直接断开连接,并且服务端收不到断开的事件。
不支持ws地址后缀path的设置。
服务端主动关闭连接,服务端不会接收到连接关闭的回调。
客户端长时间断网,服务端也收不到连接关闭的回调。
依赖 删除原有的starter添加t-io的starter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 <properties > <java.version > 1.8</java.version > <project.build.sourceEncoding > UTF-8</project.build.sourceEncoding > <project.reporting.outputEncoding > UTF-8</project.reporting.outputEncoding > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-autoconfigure</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-logging</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > <version > 1.18.20</version > </dependency > <dependency > <groupId > org.t-io</groupId > <artifactId > tio-websocket-spring-boot-starter</artifactId > <version > 3.6.0.v20200315-RELEASE</version > </dependency > </dependencies >
代码示例 主类
1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;import org.tio.websocket.starter.EnableTioWebSocketServer;@SpringBootApplication @EnableTioWebSocketServer public class WsServerZApplication { public static void main (String[] args) { SpringApplication.run(WsServerZApplication.class, args); } }
主要就是添加了
@EnableTioWebSocketServer
消息监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 import org.springframework.stereotype.Component;import org.tio.core.ChannelContext;import org.tio.core.Tio;import org.tio.http.common.HttpRequest;import org.tio.http.common.HttpResponse;import org.tio.websocket.common.WsRequest;import org.tio.websocket.common.WsResponse;import org.tio.websocket.server.handler.IWsMsgHandler;@Component public class MyWebSocketMsgHandler implements IWsMsgHandler { @Override public HttpResponse handshake (HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext) throws Exception { return httpResponse; } @Override public void onAfterHandshaked ( HttpRequest httpRequest, HttpResponse httpResponse, ChannelContext channelContext ) throws Exception { System.out.println("连接成功" + " 客户端ID:" + channelContext.getId()); } @Override public Object onBytes (WsRequest wsRequest, byte [] bytes, ChannelContext channelContext) throws Exception { System.out.println("接收到bytes消息" ); return null ; } @Override public Object onText (WsRequest wsRequest, String s, ChannelContext channelContext) throws Exception { String msg = "服务器返回:" + s + " 客户端ID:" + channelContext.getId(); System.out.println(msg); Tio.send(channelContext, WsResponse.fromText(msg, "utf-8" )); return null ; } @Override public Object onClose (WsRequest wsRequest, byte [] bytes, ChannelContext channelContext) throws Exception { System.out.println("连接关闭" + " 客户端ID:" + channelContext.getId()); return null ; } }
给对应的客户端发送消息
1 Tio.send(channelContext, WsResponse.fromText(msg, "utf-8" ));
接下来即可在application.properties中配置
1 2 tio.websocket.server.port=8888 tio.websocket.server.heartbeat-timeout=30000
发送消息 1 2 3 4 5 6 7 8 9 Tio.send(channelContext, WsResponse.fromText(msg, "utf-8" )); Tio.sendToGroup( channelContext.getTioConfig(), "group-001" , WsResponse.fromText(msg, "utf-8" ) );
业务绑定 方式1 静态方法绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Tio.bindUser(channelContext,"userid-001" ); Tio.unbindUser(channelContext); Tio.unbindUser(channelContext.tioConfig,"userid-001" ); Tio.bindToken(channelContext,"token-001" ); Tio.unbindToken(channelContext); Tio.bindBsId(channelContext,"bsid-001" ); Tio.unbindBsId(channelContext); Tio.bindGroup(channelContext,"group-001" ); Tio.bindGroup(channelContext,"group-002" ); Tio.unbindGroup("group-001" ,channelContext); Tio.unbindGroup(channelContext.tioConfig,"userid-001" ,"group-001" ); Tio.unbindGroup(channelContext);
方式2 对象属性绑定
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 channelContext.tioConfig.users.bind("userid-001" , channelContext); channelContext.setUserid("userid-001" ); String userid = channelContext.userid;System.out.println("userid:" + userid); channelContext.tioConfig.users.unbind(channelContext); TioConfig tioConfig = channelContext.tioConfig;tioConfig.users.unbind(tioConfig,userid); channelContext.tioConfig.tokens.bind("token-002" , channelContext); channelContext.setToken("token-002" ); String token = channelContext.getToken();System.out.println("token:" + token); channelContext.tioConfig.tokens.unbind(channelContext); channelContext.tioConfig.bsIds.bind(channelContext, "bsid-003" ); channelContext.setBsId("bsid-003" ); String bsid = channelContext.getBsId();System.out.println("bsid:" + bsid); channelContext.tioConfig.bsIds.unbind(channelContext); channelContext.tioConfig.groups.bind("group-001" , channelContext); channelContext.tioConfig.groups.bind("group-002" , channelContext); SetWithLock<String> groups = channelContext.getGroups(); System.out.println("groups.size:" + groups.size()); channelContext.tioConfig.groups.unbind(group, channelContext); channelContext.tioConfig.groups.unbind(channelContext);
方式3 对象属性绑定
1 2 3 channelContext.set("userid" , 123 ); channelContext.get("userid" ); channelContext.remove("userid" );
SSL配置 配置
1 2 3 4 tio.websocket.ssl.enabled=true tio.websocket.ssl.key-store=classpath:config/ssl/cert-psvmc.jks tio.websocket.ssl.trust-store=classpath:config/ssl/cert-psvmc.jks tio.websocket.ssl.password=123456
证书生成 下载SSL证书
如果是阿里云申请的SSL证书,下载tomcat版本的,里面会有cert.pfx 和password.txt
生成jks证书
添加Java到环境变量的Path中
D:\Program Files\Java\jdk1.8.0_102\bin
执行
1 keytool -importkeystore -srckeystore .\cert.pfx -destkeystore .\cert-psvmc.jks -srcstoretype PKCS12 -deststoretype JKS
输入两次新口令,再输入一次下载时带的密码文件中的密码就会生成文件cert-psvmc.jks,我们在配置时用我们设置的新口令。
集群 集群添加如下配置
集群是通过redis的Pub/Sub实现,所以需要配置Redis
1 2 3 4 5 6 7 tio.websocket.cluster.enabled=false tio.websocket.cluster.redis.ip=127.0.0.1 tio.websocket.cluster.redis.port=6379 tio.websocket.cluster.all=true tio.websocket.cluster.group=true tio.websocket.cluster.ip=true tio.websocket.cluster.user=true
SpringBoot Starter 这是SpringBoot官方提供的Webscoket框架
1 2 3 4 5 6 7 8 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-autoconfigure</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-websocket</artifactId > </dependency >
具体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.web.socket.config.annotation.EnableWebSocket;import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration @EnableWebSocket public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpoint () { return new ServerEndpointExporter (); } }
WsServerEndpoint
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import java.io.IOException;import org.springframework.stereotype.Component;import javax.websocket.*;import javax.websocket.server.ServerEndpoint;@ServerEndpoint("/ws") @Component public class MyWebSocket { @OnOpen public void onOpen (Session session) { System.out.println("连接成功" ); } @OnClose public void onClose (Session session) { System.out.println("连接关闭" ); } @OnError public void onError (Session session, Throwable error) { System.out.println("发生错误" ); error.printStackTrace(); } @OnMessage public String onMessage (String text, Session session) throws IOException { return "servet 发送:" + text; } @OnMessage public void onMessage (byte [] messages, Session session) throws IOException { } }
说明:
这里有几个注解需要注意一下,首先是他们的包都在 **javax.websocket **下。并不是 spring 提供的,而 jdk 自带的,下面是他们的具体作用。
@ServerEndpoint
通过这个 spring boot 就可以知道你暴露出去的 ws 应用的路径,有点类似我们经常用的@RequestMapping。比如你的启动端口是8080,而这个注解的值是ws,那我们就可以通过 ws://127.0.0.1:8888/ws 来连接你的应用
@OnOpen
当 websocket 建立连接成功后会触发这个注解修饰的方法,注意它有一个 Session 参数
@OnClose
当 websocket 建立的连接断开后会触发这个注解修饰的方法,注意它有一个 Session 参数
@OnMessage
当客户端发送消息到服务端时,会触发这个注解修改的方法,它有一个 String 入参表明客户端传入的值
@OnError
当 websocket 建立连接时出现异常会触发这个注解修饰的方法,注意它有一个 Session 参数
另外一点就是服务端如何发送消息给客户端,服务端发送消息必须通过上面说的 Session 类,通常是在@OnOpen 方法中,当连接成功后把 session 存入 Map 的 value,key 是与 session 对应的用户标识,当要发送的时候通过 key 获得 session 再发送,这里可以通过 session.getBasicRemote().sendText() 来对客户端发送消息。
接下来即可在application.properties中配置
Netty原生 Netty简介 那么Netty到底是何方神圣?
用一句简单的话来说就是:Netty封装了JDK的NIO,让你用得更爽,你不用再写一大堆复杂的代码了。
用官方正式的话来说就是:Netty是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。
下面是我总结的使用Netty不使用JDK原生NIO的原因
使用JDK自带的NIO需要了解太多的概念,编程复杂
Netty底层IO模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从NIO模型变身为IO模型
Netty自带的拆包解包,异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑
Netty解决了JDK的很多包括空轮询在内的bug
Netty底层对线程,selector做了很多细小的优化,精心设计的reactor线程模型做到非常高效的并发处理
自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
Netty已经历各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
Websocket服务端 引入Netty
1 2 3 4 5 <dependency > <groupId > io.netty</groupId > <artifactId > netty-all</artifactId > <version > 5.0.0.Alpha1</version > </dependency >
初始化代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private static void initWebScoket () { EventLoopGroup bossGroup = new NioEventLoopGroup (); EventLoopGroup wokerGroup = new NioEventLoopGroup (); try { ServerBootstrap serverBootstrap = new ServerBootstrap (); serverBootstrap .group(bossGroup, wokerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler (LogLevel.INFO)) .option(ChannelOption.SO_KEEPALIVE, true ) .childHandler(new ChannelInitializer <SocketChannel>(){ @Override protected void initChannel (SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new HttpServerCodec ()); pipeline.addLast(new ChunkedWriteHandler ()); pipeline.addLast(new HttpObjectAggregator (1024 * 1024 * 1024 )); pipeline.addLast(new WebSocketServerProtocolHandler ("/ws" )); pipeline.addLast(new WebSocketHandle ()); } }); ChannelFuture channelFuture = serverBootstrap .bind(new InetSocketAddress (8899 )) .sync(); channelFuture .channel() .closeFuture() .sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } }
bossGroup对应接受新连接线程,主要负责创建新连接
wokerGroup对应负责读取数据的线程,主要用于读取数据以及业务逻辑处理
WebSocketHandle
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;public class WebSocketHandle extends SimpleChannelInboundHandler <Object> { @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { System.out.println("用户连接" ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { System.out.println("用户断开" ); } @Override protected void messageReceived (ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof TextWebSocketFrame) { String receive_msg = ((TextWebSocketFrame) msg).text(); System.out.println("收到消息:" + receive_msg); String send_msg = "服务器返回:" + receive_msg; System.out.println("ctx.channel.id:" +ctx.channel().id()); ctx.channel().writeAndFlush(new TextWebSocketFrame (send_msg)); } else if (msg instanceof BinaryWebSocketFrame) { System.out.println("收到二进制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes()); BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame (Unpooled.buffer().writeBytes("xxx" .getBytes())); ctx.channel().writeAndFlush(binaryWebSocketFrame); } } @Override public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause ) throws Exception { super .exceptionCaught(ctx, cause); } }
Websocket客户端 这样在JS中既可以这样连接
1 2 3 4 5 6 7 8 9 10 11 12 13 var socket = new WebSocket ("ws://127.0.0.1:8899/ws" );socket.onopen = function ( ) { }; socket.onmessage = function (evt ) { }; socket.onclose = function (evt ) { } socket.onerror = function (evt ) { }
客户端测试 HTML 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 <!DOCTYPE html > <html > <head > <meta name ="viewport" content ="width=device-width, initial-scale=1" /> <title > 聊天客户端</title > <style type ="text/css" > h1 , h2 , h3 { margin-top : 6px ; margin-bottom : 6px ; } body { margin : 0 ; padding : 0 ; overflow : hidden; } .outer { width : 100vw ; height : 100vh ; display : flex; flex-direction : column; } #content { height : 0 ; flex : auto; width : 100% ; padding : 10px ; overflow-y : auto; } #msg { width : 100% ; height : 40px ; line-height : 40px ; flex : none; } </style > </head > <body > <div class ="outer" > <div id ="content" > </div > <input type ="text" style ="width: 100%" id ="msg" /> </div > <script type ="text/javascript" src ="http://apps.bdimg.com/libs/jquery/2.1.4/jquery.min.js" > </script > <script type ="text/javascript" src ="js/index.js" > </script > </body > </html >
JS 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 let timeunix = 0 ;var socket = null ;var isopen = false ;function connect_ws ( ) { socket = new WebSocket ("ws://127.0.0.1:8888" ); timeunix = parseInt (new Date ().getTime () / 1000 ); socket.onopen = function ( ) { $("#content" ).append ("<h3>连接成功!</h3>" ); isopen = true ; scrollBottom (); timeunix = parseInt (new Date ().getTime () / 1000 ); heartCheck (); }; socket.onmessage = function (evt ) { $("#content" ).append ("<div>" + evt.data + "</div>" ); scrollBottom (); }; socket.onclose = function (evt ) { if (isopen) { $("#content" ).append ("<h3>连接关闭!</h3>" ); $("#content" ).append ("<div>连接时长" + (parseInt (new Date ().getTime () / 1000 ) - timeunix) + "秒</div>" ); } scrollBottom (); isopen = false ; } socket.onerror = function (evt ) { $("#content" ).append ("<h3>" + "连接出错!" + "</h3>" ); scrollBottom (); } } $(function ( ) { connect_ws (); }) function scrollBottom ( ) { var content_div = $("#content" )[0 ]; content_div.scrollTop = content_div.scrollHeight ; } function encodeScript (data ) { if (null == data || "" == data) { return "" ; } return data.replace ("<" , "<" ).replace (">" , ">" ); } function send_msg_click ( ) { if (socket && socket.readyState == socket.OPEN ) { var text = encodeScript ($("#msg" ).val ()); if (text) { socket.send (text); $("#content" ).append ("<div style='color: #" + "CECECE" + "; font-size: " + 12 + ";'>" + text + "</div>" ); $("#msg" ).val ("" ); } } else { connect_ws (); } } function heartCheck ( ) { if (window .ws_inter ) { clearInterval (window .ws_inter ); } window .ws_inter = setInterval (function ( ) { if (socket && socket.readyState == socket.OPEN ) { let buffer = new ArrayBuffer (2 ); let dataView = new DataView (buffer); dataView.setInt16 (0 , 1 ); socket.send (dataView); console .info ("发送心跳" , " 时间(s):" + parseInt (new Date ().getTime () / 1000 )); } else { connect_ws (); } }, 10000 ); } document .onkeydown = function (event ) { var e = event || window .event || arguments .callee .caller .arguments [0 ]; if (e && e.keyCode == 13 ) { send_msg_click (); } };
注意
客户端一定不要发送空的消息,只要发送空消息,如果使用t-io,客户端会直接断开连接,并且服务端收不到断开的事件。
Maven依赖无法下载 比如我要下载的包为tio-websocket-server
到中央仓库地址:http://mvnrepository.com/
搜索到的依赖为
1 2 3 4 5 <dependency > <groupId > org.t-io</groupId > <artifactId > tio-websocket-server</artifactId > <version > 3.7.1.v20210106-RELEASE</version > </dependency >
对应的地址为
https://mvnrepository.com/artifact/org.t-io/tio-websocket-server/3.7.1.v20210106-RELEASE
输入命令 需要修改的 url、groupId、artifactId、version
mvn dependency:get -DremoteRepositories=url -DgroupId=groupId -DartifactId=artifactId -Dversion=version
即
mvn dependency:get -DremoteRepositories=https://mvnrepository.com/artifact/org.t-io/tio-websocket-server -DgroupId=org.t-io -DartifactId=tio-websocket-server -Dversion=3.7.1.v20210106-RELEASE