Java NIO框架 Netty教程


Java NIO 框架 Netty 教程(一) – Hello Netty 先啰嗦两句,如果你还不知道 Netty 是做什么的能做什么。那可以先简单的搜索了解一下。我只能说 Netty 是一个 NIO 的框架,可以用 于开发分布式的 Java 程序。具体能做什么,各位可以尽量发挥想象。技术,是服务于人而不是局限住人的。 Netty 的简介和下载可参考:《开源 Java 高性能 NIO 框架推荐 – Netty》。注意,此时的最新版已经为 3.5.2.Final。 如果你已经万事具备,那么我们先从一段代码开始。程序员们习惯的上手第一步,自然是"Hello world",不过 Netty 官网的例子却偏偏抛 弃了"Hello world"。那我们就自己写一个最简单的"Hello world"的例子,作为上手。 ? HelloServer.java 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 /** * Netty 服务端代码 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class HelloServer { public static void main(String args[]) { // Server 服务启动器 ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理客户端消息和各种消息事件的类(Handler) bootstrap .setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels .pipeline(new HelloServerHandler()); } }); // 开放 8000 端口供客户端访问。 bootstrap.bind(new InetSocketAddress(8000)); } private static class HelloServerHandler extends SimpleChannelHandler { /** 35 36 37 38 39 40 41 42 43 44 45 46 * 当有客户端绑定到服务端的时候触发,打印"Hello world, I'm server." * * @alia OneCoder * @author lihzh */ @Override public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e) { System.out.println("Hello world, I'm server."); } } } ? HelloClient.java 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 /** * Netty 客户端代码 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class HelloClient { public static void main(String args[]) { // Client 服务启动器 ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new HelloClientHandler()); } }); // 连接到本地的 8000 端口的服务端 bootstrap.connect(new InetSocketAddress( "127.0.0.1", 8000)); } private static class HelloClientHandler extends SimpleChannelHandler { 30 31 32 33 34 35 36 37 38 39 40 41 42 43 /** * 当绑定到服务端的时候触发,打印"Hello world, I'm client." * * @alia OneCoder * @author lihzh */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { System.out.println("Hello world, I'm client."); } } } 既然是分布式的,自然要分多个服务。Netty 中,需要区分 Server 和 Client 服务。所有的 Client 都是绑定在 Server 上的,他们之间是不 能通过 Netty 直接通信的。(自己采用的其他手段,不包括在内。)。白话一下这个通信过程,Server 端开放端口,供 Client 连接,Client 发起请求,连接到 Server 指定的端口,完成绑定。随后便可自由通信。其实就是普通 Socket 连接通信的过程。 Netty 框架是基于事件机制的,简单说,就是发生什么事,就找相关处理方法。就跟着火了找 119,抢劫了找 110 一个道理。所以,这里, 我们处理的是当客户端和服务端完成连接以后的这个事件。什么时候完成的连接,Netty 知道,他告诉我了,我就负责处理。这就是框架 的作用。Netty,提供的事件还有很多,以后会慢慢的接触和介绍。 你应该已经可以上手了:) Java NIO 框架 Netty 教程(二) – 白话概念 "Hello World"的代码固然简单,不过其中的几个重要概念(类)和 Netty 的工作原理还是需要简单明确一下,至少知道其是负责什。方 便自己以后更灵活的使用和扩展。 声明,笔者一介码农,不会那么多专业的词汇和缩写,只能以最简单苍白的话来形容个人的感受和体会。如果您觉得这太不专业,笔者 首先只能抱歉。然后,笔者曾转过《Netty 代码分析》,您可参考。  ChannelEvent 先说这个 ChannelEvent,因为 Netty 是基于事件驱动的,就是我们上文提到的,发生什么事,就通知"有关部门"。所以,不难理解,我 们自己的业务代码中,一定有跟这些事件相关的处理。在样例代码,我们处理的事件,就是 channelConnected。以后,我们还会处理更 多的事件。  ChannelPipeline Pipeline,翻译成中文的意思是:管道,传输途径。也就是说,在这里他是控制 ChannelEvent 事件分发和传递的。事件在管道中流转, 第一站到哪,第二站到哪,到哪是终点,就是用这个 ChannelPipeline 处理的。比如:开发事件。先给 A 设计,然后给 B 开发。一个流转 图,希望能给你更直观的感觉。  ChannelHandler 刚说 Pipeline 负责把事件分发到相应的站点,那个这个站点在 Netty 里,就是指 ChannelHandler。事件到了 ChannelHandler 这里,就要 被具体的进行处理了,我们的样例代码里,实现的就是这样一个处理事件的―站点‖,也就是说,你自己的业务逻辑一般都是从这里开始的。  Channel 有了个部门的协调处理,我们还需要一个从整体把握形势的,所谓―大局观‖的部门,channel。 channel,能够告诉你当前通道的状态,是连同还是关闭。获取通道相关的配置信息。得到 Pipeline 等。是一些全局的信息。Channel 自 然是由 ChannelFactory 产生的。Channel 的实现类型,决定了你这个通道是同步的还是异步的(nio)。例如,我们样例里用的是 NioServerSocketChannel。 这些基本的概念,你懂了吧。 Java NIO 框架 Netty 教程(三)- 字符串消息收发 了解了 Netty 的基本概念,开发起来应该会顺手很多。 在―Hello World‖代码中,我们只是在完成绑定的时候,在各自的本地打印了简单 的信息,并没有客户端和服务端的消息传递。这个肯定是最基本的功能。在上代码之前,先补充一个 Netty 中重要的概念,ChannelBuffer。  ChannelBuffer Netty 中的消息传递,都必须以字节的形式,以 ChannelBuffer 为载体传递。简单的说,就是你想直接写个字符串过去,对不起,抛异常。 虽然,Netty 定义的 writer 的接口参数是 Object 的,这可能也是会给新上手的朋友容易造成误会的地方。Netty 源码中,是这样判断的: ? 01 02 03 04 05 06 07 08 09 10 SendBuffer acquire(Object message) { if (message instanceof ChannelBuffer) { return acquire((ChannelBuffer) message); } else if (message instanceof FileRegion) { return acquire((FileRegion) message); } throw new IllegalArgumentException( "unsupported message type: " + message.getClass()); } 所以,我们要想传递字符串,那么就必须转换成 ChannelBuffer。明确了这一点,接下来我们上代码: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 /** * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class MessageServer { public static void main(String args[]) { // Server 服务启动器 ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理客户端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 return Channels.pipeline(new MessageServerHandler()); } }); // 开放 8000 端口供客户端访问。 bootstrap.bind(new InetSocketAddress(8000)); } private static class MessageServerHandler extends SimpleChannelHandler { /** * 用户接受客户端发来的消息,在有客户端消息到达时触发 * * @author lihzh * @alia OneCoder */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); System.out.println(buffer.toString(Charset.defaultCharset())); } } } ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 /** * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class MessageClient { public static void main(String args[]) { // Client 服务启动器 ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new MessageClientHandler()); } 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 }); // 连接到本地的 8000 端口的服务端 bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)); } private static class MessageClientHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 * * @alia OneCoder * @author lihzh */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 将字符串,构造成 ChannelBuffer,传递给服务端 String msg = "Hello, I'm client."; ChannelBuffer buffer = ChannelBuffers.buffer(msg.length()); buffer.writeBytes(msg.getBytes()); e.getChannel().write(buffer); } } } 与 ―Hello World‖ 样例代码不同的是,客户端在 channel 连通后,不是在本地打印,而是将消息转换成 ChannelBuffer 传递给服务端,服 务端接受到 ChannelBuffer 后,解码成字符串打印出来。 同时,通过对比可以发现,变动的只是 Handler 里的代码,启动服务和绑定服务的代码没有变化,也就是我们在概念介绍里提到了,关 注 Handler,在 Handler 里处理我们自己的业务。所以,以后我们会只给出业务中关键代码,不会在上重复的代码:) 由于在 Netty 中消息的收发全依赖于 ChannelBuffer,所以,下一章我们将会详细的介绍 ChannelBuffer 的使用。我们一起学习。 Java NIO 框架 Netty 教程(四)- ChannelBuffer 在学字符串消息收发的时候,已经提到过。ChannelBuffer 是 Netty 中非常重要的概念。所有消息的收发都依赖于这个 Buffer。我们通过 Netty 的官方的文档来了解一下,基于流的消息传递机制。 In a stream-based transport such as TCP/IP, received data is stored into a socket receive buffer. Unfortunately, the buffer of a stream-based transport is not a queue of packets but a queue of bytes. It means, even if you sent two messages as two independent packets, an operating system will not treat them as two messages but as just a bunch of bytes. Therefore, there is no guarantee that what you read is exactly what your remote peer wrote. For example, let us assume that the TCP/IP stack of an operating system has received three packets: +—–+—–+—–+ | ABC | DEF | GHI | +—–+—–+—–+ Because of this general property of a stream-based protocol, there's high chance of reading them in the following fragmented form in your application: +—-+——-+—+—+ | AB | CDEFG | H | I | +—-+——-+—+—+ Therefore, a receiving part, regardless it is server-side or client-side, should defrag the received data into one or more meaningful frames that could be easily understood by the application logic. In case of the example above, the received data should be framed like the following: +—–+—–+—–+ | ABC | DEF | GHI | +—–+—–+—–+ 不知道您理解了没,简单翻译一下就是说。在 TCP/IP 这种基于流传递的协议中。他识别的不是你每一次发送来的消息,不是分包的。而 是,只认识一个整体的流,即使分三次分别发送三段话:ABC、DEF、GHI。在传递的过程中,他就是一个具有整体长度的流。在读流的 过程中,如果我一次读取的长度选择的不是三个,我可以收到类似 AB、CDEFG、H、I 这样的信息。这显然是我们不想看到的。所以说, 在你写的消息收发的系统里,需要预先定义好这种解析机制,规定每帧(次)读取的长度。通过代码来理解一下: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 /** * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class ServerBufferHandler extends SimpleChannelHandler { /** * 用户接受客户端发来的消息,在有客户端消息到达时触发 * * @author lihzh * @alia OneCoder */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); // 五位读取 while (buffer.readableBytes() >= 5) { ChannelBuffer tempBuffer = buffer.readBytes(5); System.out.println(tempBuffer.toString(Charset.defaultCharset())); } // 读取剩下的信息 System.out.println(buffer.toString(Charset.defaultCharset())); } } ? 01 02 03 04 /** * @author lihzh * @alia OneCoder * @blog http://www.coderli.com 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 */ public class ClientBufferHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 * * @alia OneCoder * @author lihzh */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 分段发送信息 sendMessageByFrame(e); } /** * 将"Hello, I'm client."分成三份发送。
* Hello,
* I'm
* client.
* * @param e * Netty 事件 * @author lihzh */ private void sendMessageByFrame(ChannelStateEvent e) { String msgOne = "Hello, "; String msgTwo = "I'm "; String msgThree = "client."; e.getChannel().write(tranStr2Buffer(msgOne)); e.getChannel().write(tranStr2Buffer(msgTwo)); e.getChannel().write(tranStr2Buffer(msgThree)); } /** * 将字符串转换成{@link ChannelBuffer},私有方法不进行字符串的非空判 断。 * * @param str * 待转换字符串,要求非 null * @return 转换后的 ChannelBuffer * @author lihzh */ 49 50 51 52 53 private ChannelBuffer tranStr2Buffer(String str) { ChannelBuffer buffer = ChannelBuffers.buffer(str.length()); buffer.writeBytes(str.getBytes()); return buffer; } } 服务端输出结果: ? 1 2 3 4 Hello , I'm clie nt. 这里其实,服务端是否分段发送并不会影响输出结果,也就是说,你一次性的把"Hi, I'm client."这段信息发送过来,输出的结果也是一样 的。这就是开头说的,传输的是流,不分包。而只在于你如何分段读写。 Java NIO 框架 Netty 教程(五)- 消息收发次数不匹配的问题 本来周末是最好的学习时间,不过这周末收房子,可想而知事情自然也不会少。这段时间的周末,可能很少有时间学习了。见缝插针吧。 不说废话了,好好学习。上回通过代码理解了 Netty 底层信息的流的传递机制,不过只是一个感性上的认识。教会你应该如何使用和使 用的时候应该注意的方面。但是有一些细节的问题,并没有提及。比如在上回(《Java NIO 框架 Netty 教程(四)- ChannelBuffer》) 的代码里,我们通过: ? 1 2 3 4 5 6 7 8 private void sendMessageByFrame(ChannelStateEvent e) { String msgOne = "Hello, "; String msgTwo = "I'm "; String msgThree = "client."; e.getChannel().write(tranStr2Buffer(msgOne)); e.getChannel().write(tranStr2Buffer(msgTwo)); e.getChannel().write(tranStr2Buffer(msgThree)); } 这样的方式,连续返送三次消息。但是如果你在服务端进行接收计数的话,你会发现,大部分时候都是接收到两次的事件请求。不过消 息都是完整的。网上也有人提到过,进行 10000 次的连续放松,往往接受到的消息个数是 999X 的,总是就是消息数目上不匹配,这又 是为何呢?笔者也只能通过阅读 Netty 的源码来找原因,我们一起来慢慢分析吧。 起点自然是选择在 e.getChannel().writer()方法上。一路跟踪首先来到了:AbstractNioWorker.java 类 ? 001 002 003 004 005 006 007 008 009 010 011 012 013 014 015 016 017 018 019 020 021 022 023 024 protected void write0(AbstractNioChannel channel) { boolean open = true; boolean addOpWrite = false; boolean removeOpWrite = false; boolean iothread = isIoThread(channel); long writtenBytes = 0; final SocketSendBufferPool sendBufferPool = this.sendBufferPool; final WritableByteChannel ch = channel.channel; final Queue writeBuffer = channel.writeBufferQueue; final int writeSpinCount = channel.getConfig().getWriteSpinCount(); synchronized (channel.writeLock) { channel.inWriteNowLoop = true; for (;;) { MessageEvent evt = channel.currentWriteEvent; SendBuffer buf; if (evt == null) { if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) { removeOpWrite = true; channel.writeSuspended = false; break; } 025 026 027 028 029 030 031 032 033 034 035 036 037 038 039 040 041 042 043 044 045 046 047 048 049 050 051 052 053 054 055 056 057 058 059 060 061 062 063 064 065 066 067 068 channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage()); } else { buf = channel.currentWriteBuffer; } ChannelFuture future = evt.getFuture(); try { long localWrittenBytes = 0; for (int i = writeSpinCount; i > 0; i --) { localWrittenBytes = buf.transferTo(ch); if (localWrittenBytes != 0) { writtenBytes += localWrittenBytes; break; } if (buf.finished()) { break; } } if (buf.finished()) { // Successful write - proceed to the next message. buf.release(); channel.currentWriteEvent = null; channel.currentWriteBuffer = null; evt = null; buf = null; future.setSuccess(); } else { // Not written fully - perhaps the kernel buffer is full. addOpWrite = true; channel.writeSuspended = true; if (localWrittenBytes > 0) { // Notify progress listeners if necessary. future.setProgress( localWrittenBytes, buf.writtenBytes(), buf.totalBytes()); } 069 070 071 072 073 074 075 076 077 078 079 080 081 082 083 084 085 086 087 088 089 090 091 092 093 094 095 096 097 098 099 100 101 102 103 104 105 106 107 108 break; } } catch (AsynchronousCloseException e) { // Doesn't need a user attention - ignore. } catch (Throwable t) { if (buf != null) { buf.release(); } channel.currentWriteEvent = null; channel.currentWriteBuffer = null; buf = null; evt = null; future.setFailure(t); if (iothread) { fireExceptionCaught(channel, t); } else { fireExceptionCaughtLater(channel, t); } if (t instanceof IOException) { open = false; close(channel, succeededFuture(channel)); } } } channel.inWriteNowLoop = false; // Initially, the following block was executed after releasing // the writeLock, but there was a race condition, and it has to be // executed before releasing the writeLock: // // https://issues.jboss.org/browse/NETTY-410 // if (open) { if (addOpWrite) { setOpWrite(channel); } else if (removeOpWrite) { clearOpWrite(channel); } } } if (iothread) { fireWriteComplete(channel, writtenBytes); } else { fireWriteCompleteLater(channel, writtenBytes); } } 这里, buf.transferTo(ch);的就是调用底层 WritableByteChannel 的 write 方法,把 buffer 写到管道里,传递过去。通过 Debug 可以看到, 没调用一次这个方法,服务端的 messageReceived 方法就会进入断点一次。当然这个也只是表相,或者说也是在预料之内的。因为笔者 从开始就怀疑是连续写入过快导致的问题,所以测试过每次 write 后停顿 1 秒。再 write 下一次。结果一切正常。 那么我们跟到这里的意义何在呢?笔者的思路是先证明不是在 write 端出现的写覆盖的问题,这样就可以从 read 端寻找问题。这里笔者 也在这里加入了一个计数,测试究竟 transferTo 了几次。结果确实是 3 次。 ? 1 2 3 for (int i = writeSpinCount; i > 0; i --) { localWrittenBytes = buf.transferTo(ch); System.out.println(++count); 接下来就从接收端找找原因,在 NioWorker 的 read 方法,实现如下: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Override protected boolean read(SelectionKey k) { final SocketChannel ch = (SocketChannel) k.channel(); final NioSocketChannel channel = (NioSocketChannel) k.attachment(); final ReceiveBufferSizePredictor predictor = channel.getConfig().getReceiveBufferSizePredictor(); final int predictedRecvBufSize = predictor.nextReceiveBufferSize(); int ret = 0; int readBytes = 0; boolean failure = true; ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { while ((ret = ch.read(bb)) > 0) { readBytes += ret; if (!bb.hasRemaining()) { break; } } failure = false; } catch (ClosedChannelException e) { // Can happen, and does not need a user attention. } catch (Throwable t) { fireExceptionCaught(channel, t); } if (readBytes > 0) { bb.flip(); 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); buffer.setBytes(0, bb); buffer.writerIndex(readBytes); recvBufferPool.release(bb); // Update the predictor. predictor.previousReceiveBufferSize(readBytes); // Fire the event. fireMessageReceived(channel, buffer); } else { recvBufferPool.release(bb); } if (ret < 0 || failure) { k.cancel(); // Some JDK implementations run into an infinite loop without this. close(channel, succeededFuture(channel)); return false; } return true; } 在这个方法的外层是一个循环,不停的遍历,如果有 SelectionKey k 存在,则进入此方法读取 buffer 中的数据。这个 SelectionKey 区分 只是一种类型,这个设计到 Java NIO 中的 Seletor 机制,这个笔者准备下讲穿插一下。属于 Netty 底层的一个重要的机制。 messageReceived 事件的触发,是在读取完当前缓冲池中所有的信息之后在触发的。这倒是可以解释,为什么即使我们收到事件的次数 少,但是消息是完整的。 从目前来看,Netty 通过 Java 的 NIO 机制传递数据,数据读写跟事件没有严格的绑定机制。数据是以流的形式独立存在,读写都有一个 缓冲池。 不过,这些还远未解决笔者的疑惑。笔者决定先了解一下 Seletor 机制,再回头来探索这个问题。 待解决……,如果您知道,热切期待您的指导。 Java NIO 框架 Netty 教程(六)-Java NIO Selector 模式 看到标题,您可能觉得,这跟 Netty 有什么关系呢?确实,如果你完全是使用 Netty 的,那么可能你可以完全不需要了解 Selector。但是, 不得不提的是,Netty 底层关于 NIO 的实现也是基于 Java 的 Selector 的,是对 Selector 的封装。所以,我个人认为理解好 Selector 对于 使用和理解 Netty 都是很多有帮助的。当然,如果您确实不关心这些,只想会用 Netty 就可以了。那么下文,您可以略过:) 笔者对于 Selector 也是新上手学习的。之前很多新人跟我交流,都会提到一个新框架或者一个新开源工具的使用和上手的问题。他们会 觉得上手困难,耗费事件。不过笔者,从来没有此种感觉。这里正好,借用 Selector 的学习过程,跟大家交流一下,我上手的过程。 想要使用一个工具,自然是先了解其定位,解决问题的原理或者工作流程。所以,笔者先从网上了解了一下 Selector 大概的工作流程。 NIO 有一个主要的类 Selector,这个类似一个观察者,只要我们把需要探知的 socketchannel 告诉 Selector,我们接着做别的事情,当有事 件发生时,他会通知我们,传回一组 SelectionKey,我们读取这些 Key,就会获得我们刚刚注册过的 socketchannel,然后,我们从这个 Channel 中读取数据,放心,包准能够读到,接着我们可以处理这些数据。 这是笔者摘录的一小段总结,就这一小段基本已经可以说明问题了。接下来,我们要考虑的就是,要实现这个过程,我们需要做什么? 顺着描述,我们可以想象,需要选择器,需要消息传送的通道,需要注册一个事件,用于识别。通道自然需要绑定到一个地址。有了这 样大概的想法,我们就可以去 API 里找相关的接口。 Selector 服务端样例代码: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 /** * Java NIO Select 模式服务端样例代码 * * @author lihzh * @alia OneCoder * @Blog http://www.coderli.com * @date 2012-7-16 下午 9:22:53 */ public class NioSelectorServer { /** * @author lihzh * @throws IOException * @alia OneCoder * @date 2012-7-16 下午 9:22:53 */ public static void main(String[] args) throws IOException { // 创建一个 selector 选择器 Selector selector = Selector.open(); 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 // 打开一个通道 ServerSocketChannel socketChannel = ServerSocketChannel.open(); // 绑定到 9000 端口 socketChannel.socket().bind(new InetSocketAddress(8000)); // 使设定 non-blocking 的方式。 socketChannel.configureBlocking(false); // 向 Selector 注册 Channel 及我们有兴趣的事件 socketChannel.register(selector, SelectionKey.OP_ACCEPT); for (;;) { // 选择事件 selector.select(); // 当有客户端准备连接到服务端时,便会出发请求 for (Iterator keyIter = selector.selectedKeys() .iterator(); keyIter.hasNext();) { SelectionKey key = keyIter.next(); keyIter.remove(); System.out.println(key.readyOps()); if (key.isAcceptable()) { System.out.println("Accept"); // 接受连接到此 Channel 的连接 socketChannel.accept(); } } } } } Selector 客户端样例代码: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 /** * Java NIO Selector 模式,客户端代码 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class NioSelectorClient { /** * @author lihzh * @throws IOException * @alia OneCoder */ public static void main(String[] args) throws IOException { 16 17 18 19 20 SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress("127.0.0.1", 8000)); } } 代码很简单,服务端接受到客户端的连接请求后,会打印出"Accept"信息。 简单概括就是,整一个通道,通道加个选择过滤器,看来的事件是不是我想要的,不想要的干脆不管,想要的,我就存起来,留着慢慢 处理。 现在感觉是不是 Netty 确实跟这个机制比较想,如果让你去实现 Netty 先有的功能,也有思路可想了吧。 Java NIO 框架 Netty 教程(七)-再谈收发信息次数问题 在《Java NIO 框架 Netty 教程(五)- 消息收发次数不匹配的问题》里我们试图分析一个消息收发次数不匹配的问题。当时笔者还是 心存疑惑的。所以决定先学习一下 Java NIO 的 Selector 机制。 经过简单的了解,笔者大胆的猜测和―武断‖一下该问题的原因。 首先,Selector 机制让我们注册一个感兴趣的时间,然后只要有该时间发生,就会传递给接收端。我们写了三次,接收端一定会出发三 次的。 然后,Netty 实现机制里,有个 Buffer 缓冲池,把收到的信息都缓存在里面,通过一个线程统一处理。也就是我们看到的那个 buffer 的处理过程。 Netty 的设置中,有一个一次性最多读取字节大小的设定。并且,事件的触发是在处理过缓冲池中的消息之后。我们再来回顾一下 Netty 中读取信息的那段代码: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 ByteBuffer bb = recvBufferPool.acquire(predictedRecvBufSize); try { while ((ret = ch.read(bb)) > 0) { readBytes += ret; if (!bb.hasRemaining()) { break; } } failure = false; } catch (ClosedChannelException e) { // Can happen, and does not need a user attention. } catch (Throwable t) { fireExceptionCaught(channel, t); } if (readBytes > 0) { bb.flip(); final ChannelBufferFactory bufferFactory = channel.getConfig().getBufferFactory(); 21 22 23 24 25 26 27 28 29 30 31 32 33 34 final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes); buffer.setBytes(0, bb); buffer.writerIndex(readBytes); recvBufferPool.release(bb); // Update the predictor. predictor.previousReceiveBufferSize(readBytes); // Fire the event. fireMessageReceived(channel, buffer); } else { recvBufferPool.release(bb); } 可以看到,如果没有读取到字节是不会触发事件的,所以我们可能会收到 2 次或者 3 次信息。(如果发的快,解析的慢,后两次信息, 一次性读取了,就 2 次,如果发送间隔长,分次解析,就收到 3 次。)原因应该就是如此。跟我们开始猜的差不多,只是不敢确认。 Java NIO 框架 Netty 教程(八)-Object 对象传递 说了这么多废话,才提到对象的传输,不知道您是不是已经不耐烦了。一个系统内部的消息传递,没有对象传递是不太现实的。下面就 来说说,怎么传递对象。 如果,您看过前面的介绍,如果您善于专注本质,勤于思考。您应该也会想到,我们说过,Netty 的消息传递都是基于流,通过 ChannelBuffer 传递的,那么自然,Object 也需要转换成 ChannelBuffer 来传递。好在 Netty 本身已经给我们写好了这样的转换工具。 ObjectEncoder 和 ObjectDecoder。 工具怎么用?再一次说说所谓的本质,我们之前也说过,Netty 给我们处理自己业务的空间是在灵活的可子定义的 Handler 上的,也就 是说,如果我们自己去做这个转换工作,那么也应该在 Handler 里去做。而 Netty,提供给我们的 ObjectEncoder 和 Decoder 也恰恰 是一组 Handler。于是,修改 Server 和 Client 的启动代码: ? 服务端 01 02 03 04 05 06 07 08 09 10 // 设置一个处理客户端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline( new ObjectDecoder(ClassResolvers.cacheDisabled(this .getClass().getClassLoader())), new ObjectServerHandler()); } }); ? 客户端 1 2 3 // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override 4 5 6 7 8 public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new ObjectEncoder(), new ObjectClientHandler()); } }); 要传递对象,自然要有一个被传递模型,一个简单的 Pojo,当然,实现序列化接口是必须的。 ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 /** * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class Command implements Serializable { private static final long serialVersionUID = 7590999461767050471L; private String actionName; public String getActionName() { return actionName; } public void setActionName(String actionName) { this.actionName = actionName; } } 服务端和客户端里,我们自定义的 Handler 实现如下: ? ObjectServerHandler .java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 /** * 对象传递服务端代码 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class ObjectServerHandler extends SimpleChannelHandler { /** * 当接受到消息的时候触发 */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 16 17 18 19 20 Command command = (Command) e.getMessage(); // 打印看看是不是我们刚才传过来的那个 System.out.println(command.getActionName()); } } ? ObjectClientHandler .java 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 /** * 对象传递,客户端代码 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class ObjectClientHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 * * @author lihzh * @alia OneCoder */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 向服务端发送 Object 信息 sendObject(e.getChannel()); } /** * 发送 Object * * @param channel * @author lihzh * @alia OneCoder */ private void sendObject(Channel channel) { Command command = new Command(); command.setActionName("Hello action."); channel.write(command); } } 启动后,服务端正常打印结果:Hello action. 简单梳理一下思路: 1. 通过 Netty 传递,都需要基于流,以 ChannelBuffer 的形式传递。所以,Object -> ChannelBuffer. 2. Netty 提供了转换工具,需要我们配置到 Handler。 3. 样例从客户端 -> 服务端,单向发消息,所以在客户端配置了编码,服务端解码。如果双向收发,则需要全部配置 Encoder 和 Decoder。 这里需要注意,注册到 Server 的 Handler 是有顺序的,如果你颠倒一下注册顺序: ? 1 2 3 4 5 6 7 8 9 bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new ObjectServerHandler(), new ObjectDecoder(ClassResolvers.cacheDisabled(this .getClass().getClassLoader())) ); } }); 结果就是,会先进入我们自己的业务,再进行解码。这自然是不行的,会强转失败。至此,你应该会用 Netty 传递对象了吧。 Java NIO 框架 Netty 教程(九)-Object 对象编/解码 看到题目,有的同学可能会想,上回不是说过对象传递了吗?是的,只是在《Java NIO 框架 Netty 教程(八)-Object 对象传递》中, 我们只是介绍如何使用 Netty 提供的编/解码工具,完成对象的序列化。这节是想告诉你 Netty 具体是怎么做的,也许有的同学想自己 完成序列化呢?况且,对象的序列化,随处可用:) 先看怎么编码。 ? ObjectEncoder.java 01 02 03 04 05 06 07 08 09 10 11 12 13 @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { ChannelBufferOutputStream bout = new ChannelBufferOutputStream(dynamicBuffer( estimatedLength, ctx.getChannel().getConfig().getBufferFactory())); bout.write(LENGTH_PLACEHOLDER); ObjectOutputStream oout = new CompactObjectOutputStream(bout); oout.writeObject(msg); oout.flush(); oout.close(); ChannelBuffer encoded = bout.buffer(); encoded.setInt(0, encoded.writerIndex() - 4); 14 15 return encoded; } 其实你早已经应该想到了,在 Java 中对对象的序列化自然跑不出 ObjectOutputStream 了。Netty 这里只是又做了一层包装,在流的 开头增加了一个 4 字节的标志位。所以,Netty 声明,该编码和解码的类必须配套使用,与单纯的 ObjectIntputStream 不兼容。 * An encoder which serializes a Java object into a {@link ChannelBuffer}. *

* Please note that the serialized form this encoder produces is not * compatible with the standard {@link ObjectInputStream}. Please use * {@link ObjectDecoder} or {@link ObjectDecoderInputStream} to ensure the * interoperability with this encoder. 解码自然是先解析出多余的 4 位,然后再通过 ObjectInputStream 解析。 关于 Java 对象序列化的细节问题,不在文本讨论的范围内,不过不知您是否感兴趣试试自己写一个呢?所谓,多动手嘛。 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 /** * Object 编码类 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class MyObjEncoder implements ChannelDownstreamHandler { @Override public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { // 处理收发信息的情形 if (e instanceof MessageEvent) { MessageEvent mEvent = (MessageEvent) e; Object obj = mEvent.getMessage(); if (!(obj instanceof Command)) { ctx.sendDownstream(e); return; } ByteArrayOutputStream out = new ByteArrayOutputStream(); ObjectOutputStream oos = new ObjectOutputStream(out); oos.writeObject(obj); oos.flush(); oos.close(); ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); buffer.writeBytes(out.toByteArray()); e.getChannel().write(buffer); } else { // 其他事件,自动流转。比如,bind,connected ctx.sendDownstream(e); 32 33 34 } } } ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 /** * Object 解码类 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class MyObjDecoder implements ChannelUpstreamHandler { @Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { if (e instanceof MessageEvent) { MessageEvent mEvent = (MessageEvent) e; if (!(mEvent.getMessage() instanceof ChannelBuffer)) { ctx.sendUpstream(mEvent); return; } ChannelBuffer buffer = (ChannelBuffer) mEvent.getMessage(); ByteArrayInputStream input = new ByteArrayInputStream(buffer.array()); ObjectInputStream ois = new ObjectInputStream(input); Object obj = ois.readObject(); Channels.fireMessageReceived(e.getChannel(), obj); } } } 怎么样,是不是也好用?所谓,模仿,学以致用。 不过,提醒一下大家,这个实现里有很多硬编码的东西,切勿模仿,只是为了展示 Object,编解码的处理方式和在 Netty 中的应用而已。 Java NIO 框架 Netty 教程(十)-Object 对象的连续收发解析分析 如果您一直关注 OneCoder,我们之前有两篇文章介绍关于 Netty 消息连续收发的问题。( 《Java NIO 框架 Netty 教程(五)- 消息 收发次数不匹配的问题 》、《 Java NIO 框架 Netty 教程(七)-再谈收发信息次数问题 》)。如果您经常的―怀疑‖和思考,我们刚介绍 过了 Object 的传递,您是否好奇,在 Object 传递中是否会有这样的问题?如果 Object 流的字节截断错乱,那肯定是会出错的。Netty 一定不会这么傻的,那么 Netty 是怎么做的呢? 我们先通过代码验证一下是否有这样的问题。(有问题的可能性几乎没有。) ? 01 02 /** * 当绑定到服务端的时候触发,给服务端发消息。 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 * * @author lihzh * @alia OneCoder */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 向服务端发送 Object 信息 sendObject(e.getChannel()); } /** * 发送 Object * * @param channel * @author lihzh * @alia OneCoder */ private void sendObject(Channel channel) { Command command = new Command(); command.setActionName("Hello action."); Command commandOne = new Command(); commandOne.setActionName("Hello action. One"); Command command2 = new Command(); command2.setActionName("Hello action. Two"); channel.write(command2); channel.write(command); channel.write(commandOne); } 打印结果: Hello action. Two Hello action. Hello action. One 一切正常。那么 Netty 是怎么分割对象流的呢?看看 ObjectDecoder 怎么做的。 在 ObjectDecoder 的基类 LengthFieldBasedFrameDecoder 中注释中有详细的说明。我们这里主要介绍一下关键的代码逻辑: ? 01 02 03 04 05 06 07 08 09 @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; int localBytesToDiscard = (int) Math.min(bytesToDiscard, buffer.readableBytes()); buffer.skipBytes(localBytesToDiscard); bytesToDiscard -= localBytesToDiscard; 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 this.bytesToDiscard = bytesToDiscard; failIfNecessary(ctx, false); return null; } if (buffer.readableBytes() < lengthFieldEndOffset) { return null; } int actualLengthFieldOffset = buffer.readerIndex() + lengthFieldOffset; long frameLength; switch (lengthFieldLength) { case 1: frameLength = buffer.getUnsignedByte(actualLengthFieldOffset); break; case 2: frameLength = buffer.getUnsignedShort(actualLengthFieldOffset); break; case 3: frameLength = buffer.getUnsignedMedium(actualLengthFieldOffset); break; case 4: frameLength = buffer.getUnsignedInt(actualLengthFieldOffset); break; …… 我们这里进入的是 4,还记得在编码时候的开头的 4 位占位字节吗?跟踪进去发现。 ? 1 2 3 4 5 6 public int getInt(int index) { return (array[index] & 0xff) << 24 | (array[index + 1] & 0xff) << 16 | (array[index + 2] & 0xff) << 8 | (array[index + 3] & 0xff) << 0; } 原来,当初在编码时,在流开头增加的 4 字节的字符是做这个的。他记录了当前了这个对象流的长度,便于在解码时候准确的计算出该 对象流的长度,正确解码。看来,我们如果我们自己写的对象编码解码的工具,要考虑的还有很多啊。 附:LengthFieldBasedFrameDecoder 的 JavaDoc /** * A decoder that splits the received {@link ChannelBuffer}s dynamically by the * value of the length field in the message. It is particularly useful when you * decode a binary message which has an integer header field that represents the * length of the message body or the whole message. *

* {@link LengthFieldBasedFrameDecoder} has many configuration parameters so * that it can decode any message with a length field, which is often seen in * proprietary client-server protocols. Here are some example that will give * you the basic idea on which option does what. * *

2 bytes length field at offset 0, do not strip header

* * The value of the length field in this example is 12 (0x0C) which * represents the length of "HELLO, WORLD". By default, the decoder assumes * that the length field represents the number of the bytes that follows the * length field. Therefore, it can be decoded with the simplistic parameter * combination. *
   * lengthFieldOffset   = 0   * lengthFieldLength   = 2   * lengthAdjustment    = 0   * initialBytesToStrip = 0 (= do not strip header)   *   * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)   * +——–+—————-+      +——–+—————-+   * | Length | Actual Content |—–>| Length | Actual Content |   * | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |   * +——–+—————-+      +——–+—————-+   * 
* *

2 bytes length field at offset 0, strip header

* * Because we can get the length of the content by calling * {@link ChannelBuffer#readableBytes()}, you might want to strip the length * field by specifying initialBytesToStrip. In this example, we * specified 2, that is same with the length of the length field, to * strip the first two bytes. *
   * lengthFieldOffset   = 0   * lengthFieldLength   = 2   * lengthAdjustment    = 0   * initialBytesToStrip = 2 (= the length of the Length field)   *   * BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)   * +——–+—————-+      +—————-+   * | Length | Actual Content |—–>| Actual Content |   * | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |   * +——–+—————-+      +—————-+   * 
* *

2 bytes length field at offset 0, do not strip header, the length field * represents the length of the whole message

* * In most cases, the length field represents the length of the message body * only, as shown in the previous examples. However, in some protocols, the * length field represents the length of the whole message, including the * message header. In such a case, we specify a non-zero * lengthAdjustment. Because the length value in this example message * is always greater than the body length by 2, we specify -2 * as lengthAdjustment for compensation. *
   * lengthFieldOffset   =  0   * lengthFieldLength   =  2   * lengthAdjustment    = -2 (= the length of the Length field)   * initialBytesToStrip =  0   *   * BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)   * +——–+—————-+      +——–+—————-+   * | Length | Actual Content |—–>| Length | Actual Content |   * | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |   * +——–+—————-+      +——–+—————-+   * 
* *

3 bytes length field at the end of 5 bytes header, do not strip header

* * The following message is a simple variation of the first example. An extra * header value is prepended to the message. lengthAdjustment is zero * again because the decoder always takes the length of the prepended data into * account during frame length calculation. *
   * lengthFieldOffset   = 2 (= the length of Header 1)   * lengthFieldLength   = 3   * lengthAdjustment    = 0   * initialBytesToStrip = 0   *   * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)   * +———-+———-+—————-+      +———-+———-+—————-+   * | Header 1 |  Length  | Actual Content |—–>| Header 1 |  Length  | Actual Content |   * |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |   * +———-+———-+—————-+      +———-+———-+—————-+   * 
* *

3 bytes length field at the beginning of 5 bytes header, do not strip header

* * This is an advanced example that shows the case where there is an extra * header between the length field and the message body. You have to specify a * positive lengthAdjustment so that the decoder counts the extra * header into the frame length calculation. *
   * lengthFieldOffset   = 0   * lengthFieldLength   = 3   * lengthAdjustment    = 2 (= the length of Header 1)   * initialBytesToStrip = 0   *   * BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)   * +———-+———-+—————-+      +———-+———-+—————-+   * |  Length  | Header 1 | Actual Content |—–>|  Length  | Header 1 | Actual Content |   * | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |   * +———-+———-+—————-+      +———-+———-+—————-+   * 
* *

2 bytes length field at offset 1 in the middle of 4 bytes header, * strip the first header field and the length field

* * This is a combination of all the examples above. There are the prepended * header before the length field and the extra header after the length field. * The prepended header affects the lengthFieldOffset and the extra * header affects the lengthAdjustment. We also specified a non-zero * initialBytesToStrip to strip the length field and the prepended * header from the frame. If you don't want to strip the prepended header, you * could specify 0 for initialBytesToSkip. *
   * lengthFieldOffset   = 1 (= the length of HDR1)   * lengthFieldLength   = 2   * lengthAdjustment    = 1 (= the length of HDR2)   * initialBytesToStrip = 3 (= the length of HDR1 + LEN)   *   * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)   * +——+——–+——+—————-+      +——+—————-+   * | HDR1 | Length | HDR2 | Actual Content |—–>| HDR2 | Actual Content |   * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |   * +——+——–+——+—————-+      +——+—————-+   * 
* *

2 bytes length field at offset 1 in the middle of 4 bytes header, * strip the first header field and the length field, the length field * represents the length of the whole message

* * Let's give another twist to the previous example. The only difference from * the previous example is that the length field represents the length of the * whole message instead of the message body, just like the third example. * We have to count the length of HDR1 and Length into lengthAdjustment. * Please note that we don't need to take the length of HDR2 into account * because the length field already includes the whole header length. *
   * lengthFieldOffset   =  1   * lengthFieldLength   =  2   * lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)   * initialBytesToStrip =  3   *   * BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)   * +——+——–+——+—————-+      +——+—————-+   * | HDR1 | Length | HDR2 | Actual Content |—–>| HDR2 | Actual Content |   * | 0xCA | 0×0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |   * +——+——–+——+—————-+      +——+—————-+   * 
* * @see LengthFieldPrepender */ Java NIO 框架 Netty 教程(十一)-并发访问测试(上) 之前更新了几篇关于 JVM 研究的文章,其实也是在做本篇文章验证的时候,跑的有点远,呵呵。回归 Netty 教程,这次要讲的其实是针 对一个问题的研究和验证结论。另外,最近工作比较忙,所以可能会分文章更新一些阶段性的成果,而不是全部汇总更新,以免间隔过 久。 起因是一个朋友,通过微博(OneCoder 腾讯微博、OneCoder 新浪微博、OneCoder 网易微博、OneCoder 搜狐微博)私信给我一个问 题,大意是说他在用 Netty 做并发测试的时候,会出现大量的 connection refuse 信息,问我如何解决。 没动手就没有发言权,所以 OneCoder 决定测试一下: ? 01 02 03 04 05 06 07 08 09 10 11 12 /** * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class ConcurrencyNettyTestHandler extends SimpleChannelHandler { private static int count = 0; /** * 当接受到消息的时候触发 */ 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Override public void channelConnected(ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception { for (int i = 0; i < 100000; i++) { Thread t = new Thread(new Runnable() { @Override public void run() { sendObject(e.getChannel()); } }); System.out.println("Thread count: " + i); t.start(); } } /** * 发送 Object * * @author lihzh * @alia OneCoder */ private void sendObject(Channel channel) { count++; Command command = new Command(); command.setActionName("Hello action."); System.out.println("Write: " + count); channel.write(command); } } 运行结果: Hello action.: 99996 Hello action.: 99997 Hello action.: 99998 Hello action.: 99999 Hello action.: 100000 你可能会惊讶,10w 个请求都能通过?呵呵,细心的同学,可能会发现,这其实并不是并发,而只是所谓 10w 个线程的,单 channel 的伪并发,或者说是一种持续的连续访问。并且,如果你跑一下测试用例,会发现,Server 端开始接受处理消息,是在 Client 端 10w 个线程请求都结束之后再开始的。这是为什么? 其实,如果您看过 OneCoder 的《 Java NIO 框架 Netty 教程(七)-再谈收发信息次数问题》,应该会有所联想。不过坦白的说,OneCoder 也是在经过一番周折,一番 Debug 以后,才发现了这个问题。当 OneCoder 在线程内断点以后,放过一个线程,接收端就会有一条信 息出现,这其实是和之前文章里介绍的场景是一样的。所以,呵呵,可能对您来说,看了这篇文章,并没有更多的收获,但是对 OneCoder 来说,确实是经历了不小的周折,绕了挺大的弯子,也算是对代码的再熟悉过程吧。 下篇我们会面对真正并发的问题:) Java NIO 框架 Netty 教程(十二)-并发访问测试(中) 写在前面:对 Netty 并发问题的测试和解决完全超出了我的预期,想说的东西越来越多。所以才出现这个中篇,也就是说,一定会有下 篇。至于问题点的发现,OneCoder 也在努力验证中。 继续并发的问题。在《Java NIO 框架 Netty 教程(十一)-并发访问测试(上)》中,我们测试的其实是一种伪并发的情景。底层实际只有 一个 Channel 在运作,不会出现什么无响应的问题。而实际的并发不是这个意思的,独立的客户端,自然就会有独立的 channel。也就 是一个服务端很可能和很多的客户端建立关系,就有很多的 Channel,进行了多次的绑定。下面我们来模拟一下这种场景。 ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 /** * Netty 并发,多 connection,socket 并发测试 * * @author lihzh(OneCoder) * @alia OneCoder * @Blog http://www.coderli.com * @date 2012-8-13 下午 10:47:28 */ public class ConcurrencyNettyMultiConnection { public static void main(String args[]) { final ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new ObjectEncoder(), new ObjectClientHandler() ); } }); for (int i = 0; i < 3; i++) { bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)); } } } 看到这段代码,你可能会疑问,这是在做什么。这个验证是基于酷壳的文章《Java NIO 类库 Selector 机制解析(上)》。他是基于 Java Selector 直接进行的验证,Netty 是在此之上封装了一层。其实 OneCoder 也做了 Selector 层的验证。 ? 01 02 /** * Selector 打开端口数量测试 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class ConcurrencySelectorOpen { /** * @author lihzh * @alia OneCoder * @throws IOException * @throws InterruptedException * @date 2012-8-15 下午 1:57:56 */ public static void main(String[] args) throws IOException, InterruptedException { for (int i = 0; i < 3; i++) { Selector selector = Selector.open(); SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress("127.0.0.1", 8000)); channel.register(selector, SelectionKey.OP_READ); } Thread.sleep(300000); } } 同样,通过 Process Explorer 去观察端口占用情况,开始跟酷壳大哥的观察到的效果一样。当不启动 Server 端,也就是不实际跟 Server 端建立链接的时候,3 次 selector open,占用了 6 个端口。 当启动 Server 端,进行绑定的时候,占用了 9 个端口 其实,多的三个,就是实际绑定的 8000 端口。其余就是酷壳大哥说的内部 Loop 链接。也就是说,对于我们实际场景,一次链接需要 的端口数是 3 个。操作系统的端口数和资源当然有有限的,也就是说当我们增大这个链接的时候,错误必然会发生了。OneCoder 尝试 一下 3000 次的场景,并没有出现错误,不过这么下去出错肯定是必然的。 再看看服务端的情况, 这个还是直接通过 Selector 连接的时候的服务端的情况,除了两个内部回环端口以外,都是通过监听的 8000 的端口与外界通信,所以, 服务端不会有端口限制的问题。不过,也可以看到,如果对链接不控制,服务端也维持大量的连接耗费系统资源!所以,良好的编码是 多么的重要。 回到我们的主题,Netty 的场景。先使用跟 Selector 测试一样的场景,单线程,一个 bootstrap,连续多次 connect 看看错误和端口 占用的情况。代码也就是开头提供的那段代码。 看看测试结果, 同样连接后还是占用 9 个端口。如果手动调用了 channle.close()方法,则会释放与 8000 链接的端口,也就是变成 6 个端口占用。 增大连续连接数到 10000。 首先没有报错,在每次 close channel 情况下,客户端端口占用情况如图(服务端情况类似)。 可见,并没有像 selector 那样无限的增加下去。这正是 Netty 中 worker 的巨大作用,帮我们管理这些琐碎的链接。具体分析,我们留 待下章,您也可以自己研究一下。(OneCoder 注:如果没关闭 channel,会发现对 8000 端口的占用,会迅速增加。系统会很卡。) 调整测试代码,用一个线程来模拟一个客户端的请求。 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 /** * Netty 并发,多 connection,socket 并发测试 * * @author lihzh(OneCoder) * @alia OneCoder * @Blog http://www.coderli.com * @date 2012-8-13 下午 10:47:28 */ public class ConcurrencyNettyMultiConnection { public static void main(String args[]) { final ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new ObjectEncoder(), new ObjectClientHandler() ); } }); for (int i = 0; i < 1000; i++) { // 连接到本地的 8000 端口的服务端 Thread t = new Thread(new Runnable() { @Override public void run() { bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)); try { Thread.sleep(300000); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } 41 } } 不出所料,跟微博上问我问题的朋友出现的问题应该是一样的: Write: 973 Write: 974 八月 17, 2012 9:57:28 下午 org.jboss.netty.channel.SimpleChannelHandler 警告: EXCEPTION, please implement one.coder.netty.chapter.eight.ObjectClientHandler.exceptionCaught() for proper handling. java.net.ConnectException: Connection refused: no further information 这个问题,笔者确实尚未分析出原因,仅从端口占用情况来看,跟前面的测试用例跑出的效果基本类似。应该说明不是端口不足导致的, 不过 OneCoder 尚不敢妄下结论。待研究后,我们下回分解吧:)您有任何想法,也可以提供给我,我来进行验证。 Java NIO 框架 Netty 教程(十三)-并发访问测试(下) 在上节(《Java NIO 框架 Netty 教程(十二)-并发访问测试(中)》),我们从各个角度对 Netty 并发的场景进行了测试。这节,我们将重 点关注上节最后提到的问题。在多线程并发访问的情况下,会出现 警告: EXCEPTION, please implement one.coder.netty.chapter.eight.ObjectClientHandler.exceptionCaught() for proper handling. java.net.ConnectException: Connection refused: no further information 的错误警告。 之前 OneCoder 层怀疑过是端口数不够的问题,所以还准备了一套修改操作系统端口数限制的配置。 a) windows – 1、打开注册表:regedit 2、HKEY_LOCAL_MACHINE\SYSTEM\CurrentControlSet\ Services\TCPIP\Parameters 3、新建 DWORD 值,name:TcpTimedWaitDelay,value:0(十进制) –> 设置为 0 4、新建 DWORD 值,name:MaxUserPort,value:65534(十进制) –> 设置最大连接数 65534 5、重启系统 b) linux – 1、查看有关的选项 /sbin/sysctl -a|grep net.ipv4.tcp_tw net.ipv4.tcp_tw_reuse = 0 #表示开启重用。允许将 TIME-WAIT sockets 重新用于新的 TCP 连接,默认为 0,表示关闭; net.ipv4.tcp_tw_recycle = 0 #表示开启 TCP 连接中 TIME-WAIT sockets 的快速回收,默认为 0,表示关闭 2、修改 vi /etc/sysctl.conf net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 1 3、使内核参数生效 sysctl -p 这套配置在测试 Restlet 框架并发的时候,起到了明显的效果。 然后,这次即使 OneCoder 修改配置,并发连接也没有明显的上升。 OneCoder 决定换个思路,启动多个进程对同一个服务进行持续 访问,以证明之前的连接拒绝就是因为客户端多线程并发自身的问题(其实 OneCoder 一直非常怀疑是这个问题)还是服务端连接处理的 问题。 修改了一下客户端发动消息的代码,使其在其线程内部,不停的给服务端发送信息。 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 /** * 发送 Object * * @param channel * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ private void sendObject(final Channel channel) { new Thread(new Runnable() { @Override public void run() { // TODO Auto-generated method stub for (;;) { Command command = new Command(); command.setActionName("Hello action."); channel.write(command); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } 启动多个客户端,效果如图: 果然,在单个进程数量控制合理的情况下,服务端可以处理所有请求,不会出现链接拒绝的情况。总连接数轻松达到 4,5k。( OneCoder 注: 以前超过 1000 都容易出错。这里只是测试到以前完全没有办法支持的情形,并没有测试最大压力值。) 对于测试 Netty 服务端压力来说,这样的测试, OneCoder 认为完全可以起到效果,有参考价值。因为即使单进程网络连接方面无上 限,单进程能启动的线程数也是有限制的,效率也一定会受到影响。所以,对于并发测试来说, OneCoder 认为可以采用上面的方式。 对于,单进程多线程的时,拒绝连接的问题。是在 sun.nio.ch.SocketChannelImple 中的 native 方法 checkConnect 中抛出的。这 应该是跟操作系统密切相关的。 OneCoder 没有在 linux 下进行测试,但是猜测在 linux 下,上面的设置参数是可以起到作用的,也就 是会比 windows 下可以开启的并发线程数多。当然这只是猜测。 对于 windows 来说,揪净能开启多个线程的并发,这个数据在 OneCoder 的环境下也是非常不稳定的。最开始的时候是,起 1000 个 线程,成功的 350 个线程左右。后来, OneCoder 怀疑启动的程序过多,尤其是跟网络相关的程序会影响测试结果,关闭了很多程序。 结果,多次 1000 线程都成功连接。 OneCoder 仔细排查了一下,猛然发现 OneCoder 使用了 Proxifier 这个代理。在代理打开的情况下,一般只能跑到 300 左右。关闭 有 1000 个线程基本稳定通过。最多可以跑到 1500 左右。目前被列为最大―嫌疑人‖ 以 OneCoder 目前的知识构成来说,Netty 并发的测试基本可以告一段落了。再简单的总结唠到几句: 1. 如果需要测试并发,可以考虑多进程,进程内多线程的方式测试服务端压力。 2. OneCoder 没有测试 Netty 最大可以支持多少并发,因为从目前测试的效果来看。(5 个进程,每个进程 1000 线程,持续访问同一个 服务),已经完全可以满足 OneCoder 的要求了。您也可以继续测试下去。 3. OneCoder 使用的是 windows7 32 位操作系统,在测试过程其实也修改了注册表中的若干参数,包括上面提到的两个。不知道是否起 到了一定的作用,也就是是否使单进程可以支持的多线程数增加,或者服务端可以支持的连接数增加,您在测试的过程中,可以配合考 虑这些参数。 4. 对于,connection refuse 的具体原因, OneCoder 希望能随着自己知识的慢慢积累,找到其真正的答案。 Java NIO 框架 Netty 教程(十四)-Netty 中 OIO 模型(对比 NIO) OneCoder 这个周末搬家,并且新家目前还没有网络,本周的翻译的任务尚未完 成,下周一起补上,先上一篇 OIO 和 NIO 对比的小研究。 Netty 中不光支持了 Java 中 NIO 模型,同时也提供了对 OIO 模型的支持。(New IO vs Old IO)。 首先,在 Netty 中,切换 OIO 和 NIO 两种模式是非常方便的,只需要初始化不同 的 Channel 工程即可。 ? 1 2 3 4 ServerBootstrap bootstrap = new ServerBootstrap( new OioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newFixedThreadPool(4))); ? 1 2 3 4 ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newFixedThreadPool(4))); 这就是 Netty 框架为我们做的贡献。 再说说,这两种情况的区别。OneCoder 根据网上的资料和自己的理解,总结了一下:在 Netty 中,是通过 worker 执行任务的。 也就是我们在构造 bootstrap 时传入的 worker 线程池。对于传统 OIO 来说,一个 worker 对应的 channel,从读到操作到再到回 写,只要是这个 channel 的操作都通过这个 worker 来完成,对于 NIO 来说,到 MessageRecieved 之后,该 worker 的任务就完 成了。所以,从这个角度来说,非常建议你在 Recieve 之后,立即启动线程去执行耗时逻辑,以释放 worker。 基于这个分析,你可能也发现了,上面的代码中我们用的是 FiexedThreadPool。固定大小为 4,从理论上来说,OIO 支持的 client 数应该是 4。而 NIO 应该不受此影响。测试效果如下图: 8 个 Client 连接: NIOServer 的线程情况: 并且 8 个 Client 的请求都正常处理了。 对于 OIO 来说,如果你对 worker 池没有控制,那么支持 8 个 client 需要 8 个 worker,8 个线程,这也就是传统 OIO 并发数受限的原因,如图: 当 OIO 使用 FixedThreadPool 的时候: 只能处理头四个 client 的请求,他的被堵塞了。 Hello action.: 32 Hello action.: 33 Hello action.: 34 Hello action.: 35 Hello action.: 36 Hello action.: 37 Hello action.: 38 Hello action.: 39 Hello action.: 40 但是,在 Netty 框架中,不论是 OIO 和 NIO 模型,读写端都不会堵塞。客户端写后立即返回,不管服务端是否接收到,接收后 是否处理完成。下一章,我们将会从代码的角度来研究一下 Netty 中对 OIO 和 NIO 这两种模式下 worker 的处理方式。 Java NIO 框架 Netty 教程(十五)-利用 Netty 进行文件传输 如果您持续关注 OneCoder,您可能会问,在《Java NIO 框架 Netty 教程(十四)- Netty 中 OIO 模型(对比 NIO)》中不是说下节介绍 的是,NIO 和 OIO 中的 worker 处理方式吗。这个一定会有的,只是在研究的过程中,OneCoder 发现了之前遗留的文件传输的代码, 所以决定先完成它。 其实,Netty 的样例代码中也提供了文件上传下载的代码样例,不过太过复杂,还包括了 Http 请求的解析等,对 OneCoder 来说,容 易迷惑那些才是文件传输的关键部分。所以 OneCoder 决定根据自己去写一个样例,这个理解就是在最开始提到的,Netty 的传输是基 于流的,我们把文件流化应该就可以传递了。于是有了以下的代码: ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 /** * 文件传输接收端,没有处理文件发送结束关闭流的情景 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class FileServerHandler extends SimpleChannelHandler { private File file = new File("F:/2.txt"); private FileOutputStream fos; public FileServerHandler() { try { if (!file.exists()) { file.createNewFile(); } else { file.delete(); file.createNewFile(); } fos = new FileOutputStream(file); } catch (IOException e) { e.printStackTrace(); } } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); int length = buffer.readableBytes(); buffer.readBytes(fos, length); fos.flush(); buffer.clear(); } 36 37 } ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 /** * 文件发送客户端,通过字节流来发送文件,仅实现文件传输部分,
* 没有对文件传输结束进行处理
* 应该发送文件发送结束标识,供接受端关闭流。 * * @author lihzh * @alia OneCoder * @blog http://www.coderli.com */ public class FileClientHandler extends SimpleChannelHandler { // 每次处理的字节数 private int readLength = 8; @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { // 发送文件 sendFile(e.getChannel()); } private void sendFile(Channel channel) throws IOException { File file = new File("E:/1.txt"); FileInputStream fis = new FileInputStream(file); int count = 0; for (;;) { BufferedInputStream bis = new BufferedInputStream(fis); byte[] bytes = new byte[readLength]; int readNum = bis.read(bytes, 0, readLength); if (readNum == -1) { return; } sendToServer(bytes, channel, readNum); System.out.println("Send count: " + ++count); } } private void sendToServer(byte[] bytes, Channel channel, int length) throws IOException { 41 42 43 44 45 ChannelBuffer buffer = ChannelBuffers.copiedBuffer(bytes, 0, length); channel.write(buffer); } } 待发送的文件 1.txt 内容如下: 运行上述代码,接受到的文件 2.txt 结果: 完全一模一样。成功! 这只是一个简单的文件传输的例子,可以做为样例借鉴。对于大文件传输的情景,本样例并不支持,会出现内存溢出的情景,OneCoder 准备另外单独介绍一下。 Java NIO 框架 Netty 教程(十六)-ServerBootStrap 启动流程源码分析 有一段事件没有更新文章了,各种原因都有吧。搬家的琐事,搬家后的安逸呵呵。不过,OneCoder 明白,绝不能放松。对于 Netty 的 学习,也该稍微深入一点了。 所以,这次 OneCoder 花了几天时间,仔细梳理了一下 Netty 的源码,总结了一下 ServerBootStrap 的启动和任务处理流程,基本涵 盖了 Netty 的关键架构。 OneCoder 总结了一张流程图: 该图是 OneCoder 通过阅读 Netty 源码,逐渐记录下来的。基本可以说明 Netty 服务的启动流程。这里在具体讲解一下。 首先说明,我们这次顺利的流程是基于 NioSocketServer 的。也就是基于 Java NIO Selector 的实现方式。在第六讲《Java NIO 框 架 Netty 教程(六)-Java NIO Selector 模式》中,我们已经知道使用 Selector 的几个关键点,所以这里我们也重点关注一下,这些点 在 Netty 中是如何使用的。 很多看过 Netty 源码的人都说 Netty 源码写的很漂亮。可漂亮在哪呢?Netty 通过一个 ChannelFactory 决定了你当前使用的协议类型 (Nio/oio 等),比如,OneCoder 这里使用的是 NioServerSocket,那么需要声明的 Factory 即为 NioServerSocketChannelFactory, 声明了这个 Factory,就决定了你使用的 Channel,pipeline 以及 pipeline 中,具体处理业务的 sink 的类型。这种使用方式十分简洁 的,学习曲线很低,切换实现十分容易。 Netty 采用的是基于事件的管道式架构设计,事件(Event)在管道(Pipeline)中流转,交由(通过 pipelinesink)相应的处理器(Handler)。 这些关键元素类型的匹配都是由开始声明的 ChannelFactory 决定的。 Netty 框架内部的业务也遵循这个流程,Server 端绑定端口的 binder 其实也是一个 Handler,在构造完 Binder 后,便要声明一个 Pipeline 管道,并赋给新建一个 Channel。Netty 在 newChannel 的过程中,相应调用了 Java 中的 ServerSocketChannel.open 方法,打开一个 channel。然后触发 fireChannelOpen 事件。这个事件的接受是可以复写的。Binder 自身接收了这个事件。在事件的 处理中,继续向下完成具体的端口的绑定。对应 Selector 中的 socketChannel.socket().bind()。然后触发 fireChannelBound 事件。 默认情况下,该事件无人接受,继续向下开始构造 Boss 线程池。我们知道在 Netty 中 Boss 线程池是用来接受和分发请求的核心线程 池。所以在 channel 绑定后,必然要启动 Boss 线城池,随时准备接受 client 发来的请求。在 Boss 构造函数中,第一次注册了 selector 感兴趣的事件类型,SelectionKey.OP_ACCEPT。至此,在第六讲中介绍的使用 Selector 的几个关键步骤都体现在 Netty 中了。在 Boss 中回启动一个死循环来查询是否有感兴趣的事件发生,对于第一次的客户端的注册事件,Boss 会将 Channel 注册给 worker 保存。 这里补充一下,也是图中忽略的部分,就是关于 worker 线程池的初始化时机问题。worker 池的构造,在最开始构造 ChannelFactory 的时候就已经准备好了。在 NioServerSocketChannelFactory 的构造函数里,会 new 一个 NioWorkerPool。在 NioWorkerPool 的基类 AbstractNioWorkerPool 的构造函数中,会调用 OpenSelector 方法,其中也打开了一个 selector,并且启动了 worker 线程 池。 ? 01 02 03 04 05 06 07 08 09 10 11 12 13 14 15 16 private void openSelector() { try { selector = Selector.open(); } catch (Throwable t) { throw new ChannelException("Failed to create a selector.", t); } // Start the worker thread with the new Selector. boolean success = false; try { DeadLockProofWorker.start(executor, new ThreadRenamingRunnable(this, "New I/O worker #" + id)); success = true; } finally { if (!success) { // Release the Selector if the execution fails. try { 17 18 19 20 21 22 23 24 25 26 selector.close(); } catch (Throwable t) { logger.warn("Failed to close a selector.", t); } selector = null; // The method will return to the caller at this point. } } assert selector != null && selector.isOpen(); } 至此,会分线程启动 AbstractNioWorker 中 run 逻辑。同样是循环处理任务队列。 ? 1 2 3 4 processRegisterTaskQueue(); processEventQueue(); processWriteTaskQueue(); processSelectedKeys(selector.selectedKeys()); 这样,设计使事件的接收和处理模块完全解耦。 由此可见,如果你想从 nio 切换到 oio,只需要构造不同的 ChannelFacotry 即可。果然简洁优雅 Netty 学习笔记 (实验篇)二 7 人收藏此文章, 我要收藏发表于 5 个月前(2013-06-16 17:50) , 已有 346 次阅读 ,共 4 个评论 这 次 实 验 要 用 ne tty 实 现 一 个 E choSer ver, Echo Protocol 的 定 义 在 这 里 http://tools.ietf.org/html/rfc862 1.服 务 器 端 监 听 端 口 7 ( 由 于 Linux 下 普 通 用 户 无 法 使 用 1024 以 下 的 端 口 , 因 此 绑 定 7777) 2.客 户 端 连 接 服 务 器 后 发 送 一 条 数 据 3.服务器把接收到的数据直接返回给客户端 代 码 如 下 : 01 package netty.learn.echo; 02 03 import io.netty.bootstrap.ServerBootstrap; 04 import io.netty.channel.*; 05 import io.netty.channel.nio.NioEventLoopGroup; 06 import io.netty.channel.socket.SocketChannel; 07 import io.netty.channel.socket.nio.NioServerSocketChannel; 08 import io.netty.handler.logging.LogLevel; 09 import io.netty.handler.logging.LoggingHandler; 10 11 import java.util.logging.Level; 12 import java.util.logging.Logger; 13 14 /** 15 * This is a EchoServer implements the ECHO protocol 16 * User: mzy 17 * Date: 13-6-16 18 * Time: 下午 3:13 19 * Version:1.0.0 20 */ 21 class EchoHandler extends ChannelInboundHandlerAdapter{ 22 Logger log = Logger.getLogger(EchoHandler.class.getName()); 23 24 @Override 25 public void messageReceived(ChannelHandlerContext ctx, MessageList msgs) throws Exception { 26 ctx.write(msgs); 27 } 28 29 @Override 30 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 31 log.log(Level.WARNING,cause.getMessage()); 32 ctx.close(); 33 } 34 } 35 public class EchoServer { 36 private int port; 37 38 public EchoServer(int port) { 39 this.port = port; 40 } 41 public void run(){ 42 NioEventLoopGroup boss = new NioEventLoopGroup(); 43 NioEventLoopGroup worker = new NioEventLoopGroup(); 44 ServerBootstrap b = new ServerBootstrap(); 45 b.group(boss,worker).channel(NioServerSocketChannel.class) 46 .option(ChannelOption.SO_BACKLOG,100) 47 .childHandler(new ChannelInitializer() { 48 @Override 49 protected void initChannel(SocketChannel socketChannel) throws Exception { 5 0 socketChannel.pipeline().addLast(new LoggingHandler(LogLevel.INFO),new EchoHan dler()); 51 } 52 }); 53 try { 54 b.bind(port).sync().channel().closeFuture().sync(); 55 } catch (InterruptedException e) { 56 e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. 57 } finally { 58 boss.shutdownGracefully(); 59 worker.shutdownGracefully(); 60 } 61 62 } 63 64 public static void main(String[] args) { 65 new EchoServer(7777).run(); 66 } 67 } 这样服务器端就写好了,对比实验一中的 DiscardServer 丌难发现基本上除了 Handler 乊外没有什么变化,这样 TimeServer 中的代码其实可以 做为一个模板来使用。 下 面 我 们 用 nc 来 测 试 一 下 EchoServer 我们通过打印出的日志可以看到,服务器端先接收到了 Hello Kugou 然后又直接写回给了客户端。
还剩53页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享pdf获得金币 ] 0 人已下载

下载pdf

pdf贡献者

songningbo

贡献于2015-08-05

下载需要 8 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!