java aio 编程

jopen 9年前

Java NIO (JSR 51)定义了Java new I/O API,提案2000年提出,2002年正式发布。 JDK 1.4起包含了相应的API实现。
JAVA NIO2 (JSR 203)定义了更多的 New I/O APIs, 提案2003提出,直到2011年才发布, 最终在JDK 7中才实现。
JSR 203除了提供更多的文件系统操作API(包括可插拔的自定义的文件系统), 还提供了对socket和文件的异步 I/O操作。 同时实现了JSR-51提案中的socket channel全部功能,包括对绑定, option配置的支持以及多播multicast的实现。

当前很多的项目还停留在JAVA NIO的实现上, 对JAVA AIO(asynchronous I/O)着墨不多。 本文整理了一些关于JAVA AIO的介绍,以及netty对AIO的支持。
以下内容只针对socket的I/O操作, 不涉及对文件的处理。

JDK AIO API

首先介绍以下I/O模型。
Unix定义了五种I/O模型, 下图是五种I/O模型的比较。

  • 阻塞I/O
  • 非阻塞I/O
  • I/O复用(select、poll、linux 2.6种改进的epoll)
  • 信号驱动IO(SIGIO)
  • 异步I/O(POSIX的aio_系列函数)
    unix_io_model.jpg

POSIX把I/O操作划分成两类:

  • 同步I/O: 同步I/O操作导致请求进程阻塞,直至操作完成
  • 异步I/O: 异步I/O操作不导致请求阻塞
    所以Unix的前四种I/O模型都是同步I/O, 只有最后一种才是异步I/O。

传统的Java BIO (blocking I/O)是Unix I/O模型中的第一种。
Java NIO中如果不使用select模式,而只把channel配置成nonblocking则是第二种模型。
Java NIO select实现的是一种多路复用I/O。 底层使用epoll或者相应的poll系统调用, 参看我以前整理的一篇文章: java 和netty epoll实现
第四种模型JDK应该是没有实现。
Java NIO2增加了对第五种模型的支持,也就是AIO。

OpenJDK在不同平台上的AIO实现

在不同的操作系统上,AIO由不同的技术实现。
通用实现可以查看这里
Windows上是使用完成接口(IOCP)实现,可以参看WindowsAsynchronousServerSocketChannelImpl,
其它平台上使用aio调用UnixAsynchronousServerSocketChannelImpl, UnixAsynchronousSocketChannelImpl, SolarisAsynchronousChannelProvider

常用类

  • AsynchronousSocketChannel
    • Asynchronous connect
    • Asynchronous read/write
    • Asynchronous scatter/gather (multiple buffers)
    • Read/write operations support timeout
    • failed method invoked with timeout exception
    • Implements NetworkChannel for binding, setting socket options, etc

AsynchronousServerSocketChannel
还实现了Asynchronous accept

AsynchronousDatagramChannel

  • Asynchronous read/write (connected)
  • Asynchronous receive/send (unconnected)
  • Implements NetworkChannel for binding, setting socket options, etc.
  • Implements MulticastChannel
  • CompletionHandler

Java AIO 例子

异步channel API提供了两种方式监控/控制异步操作(connect,accept, read,write等)。第一种方式是返回java.util.concurrent.Future对象, 检查Future的状态可以得到操作是否完成还是失败,还是进行中, future.get阻塞当前进程。
第二种方式为操作提供一个回调参数java.nio.channels.CompletionHandler,这个回调类包含completed,failed两个方法。
channel的每个I/O操作都为这两种方式提供了相应的方法, 你可以根据自己的需要选择合适的方式编程。

下面以一个最简单的Time服务的例子演示如何使用异步I/O。 客户端连接到服务器后服务器就发送一个当前的时间字符串给客户端。 客户端毋须发送请求。 逻辑很简单。

Server实现

import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.CharBuffer;  import java.nio.channels.AsynchronousChannelGroup;  import java.nio.channels.AsynchronousServerSocketChannel;  import java.nio.channels.AsynchronousSocketChannel;  import java.nio.channels.CompletionHandler;  import java.nio.charset.Charset;  import java.nio.charset.CharsetEncoder;  import java.util.Date;  import java.util.concurrent.ExecutionException;  import java.util.concurrent.Executors;  import java.util.concurrent.Future;  import java.util.concurrent.TimeUnit;  public class Server {  private static Charset charset = Charset.forName("US-ASCII");  private static CharsetEncoder encoder = charset.newEncoder();  public static void main(String[] args) throws Exception {  AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(4));  AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress("0.0.0.0", 8013));  server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {  @Override  public void completed(AsynchronousSocketChannel result, Void attachment) {  server.accept(null, this); // 接受下一个连接  try {  String now = new Date().toString();  ByteBuffer buffer = encoder.encode(CharBuffer.wrap(now + "\r\n"));  //result.write(buffer, null, new CompletionHandler<Integer,Void>(){...}); //callback or  Future<Integer> f = result.write(buffer);  f.get();  System.out.println("sent to client: " + now);  result.close();  } catch (IOException | InterruptedException | ExecutionException e) {  e.printStackTrace();  }  }  @Override  public void failed(Throwable exc, Void attachment) {  exc.printStackTrace();  }  });  group.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);  }  }

这个例子使用了两种方式。 accept使用了回调的方式, 而发送数据使用了future的方式。

Client实现

public class Client {  public static void main(String[] args) throws Exception {  AsynchronousSocketChannel client = AsynchronousSocketChannel.open();  Future<Void> future = client.connect(new InetSocketAddress("127.0.0.1", 8013));  future.get();  ByteBuffer buffer = ByteBuffer.allocate(100);  client.read(buffer, null, new CompletionHandler<Integer, Void>() {  @Override  public void completed(Integer result, Void attachment) {  System.out.println("client received: " + new String(buffer.array()));  }  @Override  public void failed(Throwable exc, Void attachment) {  exc.printStackTrace();  try {  client.close();  } catch (IOException e) {  e.printStackTrace();  }  }  });  Thread.sleep(10000);  }  }

客户端也使用了两种方式, connect使用了future方式,而接收数据使用了回调的方式。

Netty AIO

Netty也支持AIO并提供了相应的类: AioEventLoopGroup,AioCompletionHandler, AioServerSocketChannel, AioSocketChannelAioSocketChannelConfig
其它使用方法和NIO类似。

参考

  1. [Asynchronous I/O Tricks and Tips](http://openjdk.java.net/projects/nio/presentations/TS-4222.pdf
  2. http://openjdk.java.net/projects/nio/resources/AsynchronousIo.html
来自:http://colobu.com/2014/11/13/java-aio-introduction/