Netty 实现 WebSocket 聊天功能

HollySherra 8年前
   <p>现在,我们要坐下修改,加入 WebSocket 的支持,使它可以在浏览器里进行文本聊天。</p>    <h2><strong>准备</strong></h2>    <ul>     <li>JDK 7+</li>     <li>Maven 3.2.x</li>     <li><a href="/misc/goto?guid=4959623105292431062" rel="nofollow,noindex">Netty</a> 4.x</li>     <li><a href="/misc/goto?guid=4958544132183877406" rel="nofollow,noindex">Eclipse</a> 4.x</li>    </ul>    <p><!– more –></p>    <h2><strong>WebSocket</strong></h2>    <p>WebSocket 通过“ <a href="/misc/goto?guid=4959715881766290667" rel="nofollow,noindex">Upgrade handshake</a> (升级握手)”从标准的 HTTP 或HTTPS 协议转为 WebSocket。因此,使用 WebSocket 的应用程序将始终以 HTTP/S 开始,然后进行升级。在什么时候发生这种情况取决于具体的应用;它可以是在启动时,或当一个特定的 URL 被请求时。</p>    <p>在我们的应用中,当 URL 请求以“/ws”结束时,我们才升级协议为WebSocket。否则,服务器将使用基本的 HTTP/S。一旦升级连接将使用的WebSocket 传输所有数据。</p>    <p>整个服务器逻辑如下:</p>    <p style="text-align:center"><img src="https://simg.open-open.com/show/459cae37e588d4f247205a4d68db55b7.jpg"></p>    <p>1.客户端/用户连接到服务器并加入聊天</p>    <p>2.HTTP 请求页面或 WebSocket 升级握手</p>    <p>3.服务器处理所有客户端/用户</p>    <p>4.响应 URI “/”的请求,转到默认 html 页面</p>    <p>5.如果访问的是 URI“/ws” ,处理 WebSocket 升级握手</p>    <p>6.升级握手完成后 ,通过 WebSocket 发送聊天消息</p>    <h2><strong>服务端</strong></h2>    <p>让我们从处理 HTTP 请求的实现开始。</p>    <h3><strong>处理 HTTP 请求</strong></h3>    <h3><strong>HttpRequestHandler.java</strong></h3>    <pre>  <code class="language-java">public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> { //1      private final String wsUri;      private static final File INDEX;        static {          URL location = HttpRequestHandler.class.getProtectionDomain().getCodeSource().getLocation();          try {              String path = location.toURI() + "WebsocketChatClient.html";              path = !path.contains("file:") ? path : path.substring(5);              INDEX = new File(path);          } catch (URISyntaxException e) {              throw new IllegalStateException("Unable to locate WebsocketChatClient.html", e);          }      }        public HttpRequestHandler(String wsUri) {          this.wsUri = wsUri;      }        @Override      public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {          if (wsUri.equalsIgnoreCase(request.getUri())) {              ctx.fireChannelRead(request.retain());                  //2          } else {              if (HttpHeaders.is100ContinueExpected(request)) {                  send100Continue(ctx);                               //3              }                RandomAccessFile file = new RandomAccessFile(INDEX, "r");//4                HttpResponse response = new DefaultHttpResponse(request.getProtocolVersion(), HttpResponseStatus.OK);              response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");                boolean keepAlive = HttpHeaders.isKeepAlive(request);                if (keepAlive) {                                        //5                  response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, file.length());                  response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);              }              ctx.write(response);                    //6                if (ctx.pipeline().get(SslHandler.class) == null) {     //7                  ctx.write(new DefaultFileRegion(file.getChannel(), 0, file.length()));              } else {                  ctx.write(new ChunkedNioFile(file.getChannel()));              }              ChannelFuture future = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);           //8              if (!keepAlive) {                  future.addListener(ChannelFutureListener.CLOSE);        //9              }                file.close();          }      }        private static void send100Continue(ChannelHandlerContext ctx) {          FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);          ctx.writeAndFlush(response);      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)              throws Exception {          Channel incoming = ctx.channel();          System.out.println("Client:"+incoming.remoteAddress()+"异常");          // 当出现异常就关闭连接          cause.printStackTrace();          ctx.close();      }  }</code></pre>    <p>1.扩展 SimpleChannelInboundHandler 用于处理 FullHttpRequest信息</p>    <p>2.如果请求是 WebSocket 升级,递增引用计数器(保留)并且将它传递给在 ChannelPipeline 中的下个 ChannelInboundHandler</p>    <p>3.处理符合 HTTP 1.1的 “100 Continue” 请求</p>    <p>4.读取默认的 WebsocketChatClient.html 页面</p>    <p>5.判断 keepalive 是否在请求头里面</p>    <p>6.写 HttpResponse 到客户端</p>    <p>7.写 index.html 到客户端,判断 SslHandler 是否在 ChannelPipeline 来决定是使用 DefaultFileRegion 还是 ChunkedNioFile</p>    <p>8.写并刷新 LastHttpContent 到客户端,标记响应完成</p>    <p>9.如果 keepalive 没有要求,当写完成时,关闭 Channel</p>    <p>HttpRequestHandler 做了下面几件事,</p>    <ul>     <li>如果该 HTTP 请求被发送到URI “/ws”,调用 FullHttpRequest 上的 retain(),并通过调用 fireChannelRead(msg) 转发到下一个 ChannelInboundHandler。retain() 是必要的,因为 channelRead() 完成后,它会调用 FullHttpRequest 上的 release() 来释放其资源。 (请参考我们先前的 SimpleChannelInboundHandler 在第6章中讨论)</li>     <li>如果客户端发送的 HTTP 1.1 头是“Expect: 100-continue” ,将发送“100 Continue”的响应。</li>     <li>在 头被设置后,写一个 HttpResponse 返回给客户端。注意,这是不是 FullHttpResponse,唯一的反应的第一部分。此外,我们不使用 writeAndFlush() 在这里 – 这个是在最后完成。</li>     <li>如果没有加密也不压缩,要达到最大的效率可以是通过存储 index.html 的内容在一个 DefaultFileRegion 实现。这将利用零拷贝来执行传输。出于这个原因,我们检查,看看是否有一个 SslHandler 在 ChannelPipeline 中。另外,我们使用 ChunkedNioFile。</li>     <li>写 LastHttpContent 来标记响应的结束,并终止它</li>     <li>如果不要求 keepalive ,添加 ChannelFutureListener 到 ChannelFuture 对象的最后写入,并关闭连接。注意,这里我们调用 writeAndFlush() 来刷新所有以前写的信息。</li>    </ul>    <h3><strong>处理 WebSocket frame</strong></h3>    <p>WebSockets 在“帧”里面来发送数据,其中每一个都代表了一个消息的一部分。一个完整的消息可以利用了多个帧。 WebSocket “Request for Comments” (RFC) 定义了六中不同的 frame; Netty 给他们每个都提供了一个 POJO 实现 ,而我们的程序只需要使用下面4个帧类型:</p>    <ul>     <li>CloseWebSocketFrame</li>     <li>PingWebSocketFrame</li>     <li>PongWebSocketFrame</li>     <li>TextWebSocketFrame</li>    </ul>    <p>在这里我们只需要显示处理 TextWebSocketFrame,其他的会由 WebSocketServerProtocolHandler 自动处理。</p>    <p>下面代码展示了 ChannelInboundHandler 处理 TextWebSocketFrame,同时也将跟踪在 ChannelGroup 中所有活动的 WebSocket 连接</p>    <p>TextWebSocketFrameHandler.java</p>    <pre>  <code class="language-java">public class TextWebSocketFrameHandler extends          SimpleChannelInboundHandler<TextWebSocketFrame> {        public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);        @Override      protected void channelRead0(ChannelHandlerContext ctx,              TextWebSocketFrame msg) throws Exception { // (1)          Channel incoming = ctx.channel();          for (Channel channel : channels) {              if (channel != incoming){                  channel.writeAndFlush(new TextWebSocketFrame("[" + incoming.remoteAddress() + "]" + msg.text()));              } else {                  channel.writeAndFlush(new TextWebSocketFrame("[you]" + msg.text() ));              }          }      }        @Override      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  // (2)          Channel incoming = ctx.channel();          for (Channel channel : channels) {              channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 加入"));          }          channels.add(ctx.channel());          System.out.println("Client:"+incoming.remoteAddress() +"加入");      }        @Override      public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  // (3)          Channel incoming = ctx.channel();          for (Channel channel : channels) {              channel.writeAndFlush(new TextWebSocketFrame("[SERVER] - " + incoming.remoteAddress() + " 离开"));          }          System.out.println("Client:"+incoming.remoteAddress() +"离开");          channels.remove(ctx.channel());      }        @Override      public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)          Channel incoming = ctx.channel();          System.out.println("Client:"+incoming.remoteAddress()+"在线");      }        @Override      public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)          Channel incoming = ctx.channel();          System.out.println("Client:"+incoming.remoteAddress()+"掉线");      }        @Override      public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)              throws Exception {          Channel incoming = ctx.channel();          System.out.println("Client:"+incoming.remoteAddress()+"异常");          // 当出现异常就关闭连接          cause.printStackTrace();          ctx.close();      }    }</code></pre>    <p>1.TextWebSocketFrameHandler 继承自 <a href="/misc/goto?guid=4959715881855036571" rel="nofollow,noindex">SimpleChannelInboundHandler</a> ,这个类实现了  <a href="/misc/goto?guid=4959715881937704676" rel="nofollow,noindex">ChannelInboundHandler</a> 接口,ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。</p>    <p>2.覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端连接时,客户端的 Channel 存入 <a href="/misc/goto?guid=4959715882023469497" rel="nofollow,noindex">ChannelGroup</a> 列表中,并通知列表中的其他客户端 Channel</p>    <p>3.覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel</p>    <p>4.覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()</p>    <p>5.覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动</p>    <p>6.覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动</p>    <p>7.exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。</p>    <p>上面显示了 TextWebSocketFrameHandler 仅作了几件事:</p>    <ul>     <li>当WebSocket 与新客户端已成功握手完成,通过写入信息到 ChannelGroup 中的 Channel 来通知所有连接的客户端,然后添加新 Channel 到 ChannelGroup</li>     <li>如果接收到 TextWebSocketFrame,调用 retain() ,并将其写、刷新到 ChannelGroup,使所有连接的 WebSocket Channel 都能接收到它。和以前一样,retain() 是必需的,因为当 channelRead0()返回时,TextWebSocketFrame 的引用计数将递减。由于所有操作都是异步的,writeAndFlush() 可能会在以后完成,我们不希望它来访问无效的引用。</li>    </ul>    <p>由于 Netty 处理了其余大部分功能,唯一剩下的我们现在要做的是初始化 ChannelPipeline 给每一个创建的新的 Channel 。做到这一点,我们需要一个ChannelInitializer</p>    <h3><strong>WebsocketChatServerInitializer.java</strong></h3>    <pre>  <code class="language-java">public class WebsocketChatServerInitializer extends          ChannelInitializer<SocketChannel> {    //1        @Override      public void initChannel(SocketChannel ch) throws Exception {//2           ChannelPipeline pipeline = ch.pipeline();            pipeline.addLast(new HttpServerCodec());          pipeline.addLast(new HttpObjectAggregator(64*1024));          pipeline.addLast(new ChunkedWriteHandler());          pipeline.addLast(new HttpRequestHandler("/ws"));          pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));          pipeline.addLast(new TextWebSocketFrameHandler());        }  }</code></pre>    <p>1.扩展 ChannelInitializer</p>    <p>2.添加 ChannelHandler 到 ChannelPipeline</p>    <p>initChannel() 方法设置 ChannelPipeline 中所有新注册的 Channel,安装所有需要的 ChannelHandler。</p>    <p>WebsocketChatServer.java</p>    <p>编写一个 main() 方法来启动服务端。</p>    <pre>  <code class="language-java">public class WebsocketChatServer {        private int port;        public WebsocketChatServer(int port) {          this.port = port;      }        public void run() throws Exception {            EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)          EventLoopGroup workerGroup = new NioEventLoopGroup();          try {              ServerBootstrap b = new ServerBootstrap(); // (2)              b.group(bossGroup, workerGroup)               .channel(NioServerSocketChannel.class) // (3)               .childHandler(new WebsocketChatServerInitializer())  //(4)               .option(ChannelOption.SO_BACKLOG, 128)          // (5)               .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)                System.out.println("WebsocketChatServer 启动了");                // 绑定端口,开始接收进来的连接              ChannelFuture f = b.bind(port).sync(); // (7)                // 等待服务器  socket 关闭 。              // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。              f.channel().closeFuture().sync();            } finally {              workerGroup.shutdownGracefully();              bossGroup.shutdownGracefully();                System.out.println("WebsocketChatServer 关闭了");          }      }        public static void main(String[] args) throws Exception {          int port;          if (args.length > 0) {              port = Integer.parseInt(args[0]);          } else {              port = 8080;          }          new WebsocketChatServer(port).run();        }  }</code></pre>    <p>1. <a href="/misc/goto?guid=4959715882102356811" rel="nofollow,noindex">NioEventLoopGroup</a> 是用来处理I/O操作的多线程事件循环器,Netty 提供了许多不同的  <a href="/misc/goto?guid=4959715882180496339" rel="nofollow,noindex">EventLoopGroup</a> 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。如何知道多少个线程已经被使用,如何映射到已经创建的  <a href="/misc/goto?guid=4959715882267352179" rel="nofollow,noindex">Channel</a> 上都需要依赖于 EventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。</p>    <p>2. <a href="/misc/goto?guid=4959715882350841623" rel="nofollow,noindex">ServerBootstrap</a> 是一个启动 NIO 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。</p>    <p>3.这里我们指定使用 <a href="/misc/goto?guid=4959715882431891470" rel="nofollow,noindex">NioServerSocketChannel</a> 类来举例说明一个新的 Channel 如何接收进来的连接。</p>    <p>4.这里的事件处理类经常会被用来处理一个最近的已经接收的 Channel。SimpleChatServerInitializer 继承自 <a href="/misc/goto?guid=4959715882509740872" rel="nofollow,noindex">ChannelInitializer</a> 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。也许你想通过增加一些处理类比如 SimpleChatServerHandler 来配置一个新的 Channel 或者其对应的 <a href="/misc/goto?guid=4959715882590995377" rel="nofollow,noindex">ChannelPipeline</a> 来实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。</p>    <p>5.你可以设置这里指定的 Channel 实现的配置参数。我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。请参考 <a href="/misc/goto?guid=4959715882675554635" rel="nofollow,noindex">ChannelOption</a> 和详细的  <a href="/misc/goto?guid=4959715882755036569" rel="nofollow,noindex">ChannelConfig</a> 实现的接口文档以此可以对ChannelOption 的有一个大概的认识。</p>    <p>6.option() 是提供给 <a href="/misc/goto?guid=4959715882431891470" rel="nofollow,noindex">NioServerSocketChannel</a> 用来接收进来的连接。childOption() 是提供给由父管道 <a href="/misc/goto?guid=4959715882845853601" rel="nofollow,noindex">ServerChannel</a> 接收到的连接,在这个例子中也是 NioServerSocketChannel。</p>    <p>7.我们继续,剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的 8080 端口。当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。</p>    <p>恭喜!你已经完成了基于 Netty 聊天服务端程序。</p>    <h2><strong>客户端</strong></h2>    <p>在程序的 resources 目录下,我们创建一个 WebsocketChatClient.html 页面来作为客户端</p>    <h3>WebsocketChatClient.html</h3>    <pre>  <code class="language-java"><!DOCTYPE html>  <html>  <head>  <meta charset="UTF-8">  <title>WebSocket Chat</title>  </head>  <body>      <script type="text/javascript">          var socket;          if (!window.WebSocket) {              window.WebSocket = window.MozWebSocket;          }          if (window.WebSocket) {              socket = new WebSocket("ws://localhost:8080/ws");              socket.onmessage = function(event) {                  var ta = document.getElementById('responseText');                  ta.value = ta.value + '\n' + event.data              };              socket.onopen = function(event) {                  var ta = document.getElementById('responseText');                  ta.value = "连接开启!";              };              socket.onclose = function(event) {                  var ta = document.getElementById('responseText');                  ta.value = ta.value + "连接被关闭";              };          } else {              alert("你的浏览器不支持 WebSocket!");          }            function send(message) {              if (!window.WebSocket) {                  return;              }              if (socket.readyState == WebSocket.OPEN) {                  socket.send(message);              } else {                  alert("连接没有开启.");              }          }      </script>      <form onsubmit="return false;">          <h3>WebSocket 聊天室:</h3>          <textarea id="responseText" style="width: 500px; height: 300px;"></textarea>          <br>           <input type="text" name="message"  style="width: 300px" value="Welcome to www.waylau.com">          <input type="button" value="发送消息" onclick="send(this.form.message.value)">          <input type="button" onclick="javascript:document.getElementById('responseText').value=''" value="清空聊天记录">      </form>      <br>       <br>       <a href="http://www.waylau.com/" >更多例子请访问 www.waylau.com</a>  </body>  </html></code></pre>    <p>逻辑比较简单,不累述。</p>    <h2><strong>运行效果</strong></h2>    <p>先运行 WebsocketChatServer,再打开多个浏览器页面实现多个 客户端访问 <a href="/misc/goto?guid=4958968112813957045" rel="nofollow,noindex">http://localhost:8080</a></p>    <p><img src="https://simg.open-open.com/show/84bafffd898344f8822586d65ba9cad8.jpg"></p>    <p> </p>    <h2>参考</h2>    <ul>     <li> <p>Netty 4.x 用户指南  <a href="/misc/goto?guid=4959653346148130796" rel="nofollow,noindex">https://github.com/waylau/netty-4-user-guide</a></p> </li>     <li> <p>Netty 实战(精髓)  <a href="/misc/goto?guid=4959653346233187453" rel="nofollow,noindex">https://github.com/waylau/essential-netty-in-action</a></p> </li>    </ul>    <p> </p>    <p>来自:http://www.importnew.com/21561.html</p>    <p> </p>