Java NIO 使用实例

jopen 10年前

     在JDK1.4之前,Java OutputStream的write方法、InputStream的Read方法和ServerSocket的accept()方法都是阻塞方法,JDK1.4之前Java引入了新的输入输出系统(New Input/Out,NIO),非阻塞是Java NIO实现的重要功能之一 。

 1、Buffer

缓冲区,传输数据使用,本质是一个数组,Channel中读数据和写数据都只能通过Buffer传输。

2、Channel

通道,所有的IO流在NIO中都是从Channel开始的,数据可以从Channel读到Buffer中,也可以从Buffer写到Channel中,是Buffer对象的唯一接口。

3、Selector

选择器,它能检测一个或多个通道 (channel) 上的事件,并将事件分发出去。

使用一个 select 线程就能监听多个通道上的事件,并基于事件驱动触发相应的响应。而不需要为每个 channel 去分配一个线程。

4、SelectionKey

包含了事件的状态信息和时间对应的通道的绑定。

使用NIO的步骤:

1\创建一个Selector实例

2\将该实例注册到各种通道,指定每个通道上感兴趣的IO操作

3\重复执行,选择器循环:

    31\调用一种Select()方法

    32\获取已选键值

    33\对于已选中键集中的每一个键:

        331\将已选键从键集中移除

        332\获取信道,并从键中获取附件

        333\确定准备就绪的操作并执行;对于accept操作获得的SocketChannel对象,需将信道设为非阻塞模式,并将其注册到选择器中

        334\根据需要,修改键的兴趣操作集

如下代码:

<span style="font-size: small;"><span>NIOServer.java</span></span>    package org.hadoopinternal.nio;    import java.net.InetSocketAddress;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.util.Iterator;    public class NIOServer {   private static final int TIMEOUT  = 300;   private static final int PORT     = 12112;     public static void main(String[] args) {      try {           Selector selector = Selector.open();          ServerSocketChannel listenChannel = ServerSocketChannel.open();     listenChannel.configureBlocking(false);     listenChannel.socket().bind(new InetSocketAddress(PORT));     listenChannel.register(selector, SelectionKey.OP_ACCEPT);          while(true) {      if(selector.select(TIMEOUT)==0) {       System.out.print(".");       continue;      }            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();      while( iter.hasNext() ) {       SelectionKey key = iter.next();       iter.remove();              //Server socket channel has pending connection request?       if( key.isAcceptable() ) {        SocketChannel channel=listenChannel.accept();        channel.configureBlocking(false);        SelectionKey connkey=channel.register(selector, SelectionKey.OP_READ );        NIOServerConnection conn=new NIOServerConnection(connkey);        connkey.attach(conn);       }              if( key.isReadable() ) {        NIOServerConnection conn=(NIOServerConnection) key.attachment();        conn.handleRead();       }              if( key.isValid() && key.isWritable() ) {        NIOServerConnection conn=(NIOServerConnection) key.attachment();        conn.handleWrite();       }      }         }    } catch (Exception e) {     e.printStackTrace();    }   }  }      <span style="font-size: small;"><span>NIOServerConnection.java:  </span></span>  package org.hadoopinternal.nio;    import java.io.IOException;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.SocketChannel;    public class NIOServerConnection {   private static final int BUFFSIZE = 1024;      SelectionKey key;   SocketChannel channel;   ByteBuffer buffer;      public NIOServerConnection(SelectionKey key) {    this.key=key;    this.channel=(SocketChannel) key.channel();    buffer=ByteBuffer.allocate(BUFFSIZE);   }      public void handleRead() throws IOException {    long bytesRead=channel.read(buffer);        if(bytesRead==-1) {     channel.close();    } else {     key.interestOps( SelectionKey.OP_READ | SelectionKey.OP_WRITE );    }   }        public void handleWrite() throws IOException {       buffer.flip();       channel.write(buffer);              if(!buffer.hasRemaining()) {        key.interestOps( SelectionKey.OP_READ );       }               buffer.compact();   }  }

如下代码为hadoop IPC Server 中的Listener是一个标准的NIO应用:

/** Listens on the socket. Creates jobs for the handler threads*/    private class Listener extends Thread {            private ServerSocketChannel acceptChannel = null; //the accept channel      private Selector selector = null; //the selector that we use for the server      private Reader[] readers = null;      private int currentReader = 0;      private InetSocketAddress address; //the address we bind at      private Random rand = new Random();      private long lastCleanupRunTime = 0; //the last time when a cleanup connec-                                           //-tion (for idle connections) ran      private long cleanupInterval = 10000; //the minimum interval between                                             //two cleanup runs      private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);      private ExecutorService readPool;            public Listener() throws IOException {        address = new InetSocketAddress(bindAddress, port);        // Create a new server socket and set to non blocking mode        acceptChannel = ServerSocketChannel.open();        acceptChannel.configureBlocking(false);          // Bind the server socket to the local host and port        bind(acceptChannel.socket(), address, backlogLength);        port = acceptChannel.socket().getLocalPort(); //Could be an ephemeral port        // create a selector;        selector= Selector.open();        readers = new Reader[readThreads];        readPool = Executors.newFixedThreadPool(readThreads);        for (int i = 0; i < readThreads; i++) {          Selector readSelector = Selector.open();          Reader reader = new Reader(readSelector);          readers[i] = reader;          readPool.execute(reader);        }          // Register accepts on the server socket with the selector.        acceptChannel.register(selector, SelectionKey.OP_ACCEPT);        this.setName("IPC Server listener on " + port);        this.setDaemon(true);      }            private class Reader implements Runnable {        private volatile boolean adding = false;        private Selector readSelector = null;          Reader(Selector readSelector) {          this.readSelector = readSelector;        }        public void run() {          LOG.info("Starting SocketReader");          synchronized (this) {            while (running) {              SelectionKey key = null;              try {                readSelector.select();                while (adding) {                  this.wait(1000);                }                                Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();                while (iter.hasNext()) {                  key = iter.next();                  iter.remove();                  if (key.isValid()) {                    if (key.isReadable()) {                      doRead(key);                    }                  }                  key = null;                }              } catch (InterruptedException e) {                if (running) {                      // unexpected -- log it                  LOG.info(getName() + " caught: " +                           StringUtils.stringifyException(e));                }              } catch (IOException ex) {                LOG.error("Error in Reader", ex);              }            }          }        }          /**         * This gets reader into the state that waits for the new channel         * to be registered with readSelector. If it was waiting in select()         * the thread will be woken up, otherwise whenever select() is called         * it will return even if there is nothing to read and wait         * in while(adding) for finishAdd call         */        public void startAdd() {          adding = true;          readSelector.wakeup();        }                public synchronized SelectionKey registerChannel(SocketChannel channel)                                                            throws IOException {            return channel.register(readSelector, SelectionKey.OP_READ);        }          public synchronized void finishAdd() {          adding = false;          this.notify();                }      }        /** cleanup connections from connectionList. Choose a random range       * to scan and also have a limit on the number of the connections       * that will be cleanedup per run. The criteria for cleanup is the time       * for which the connection was idle. If 'force' is true then all        * connections will be looked at for the cleanup.       */      private void cleanupConnections(boolean force) {        if (force || numConnections > thresholdIdleConnections) {          long currentTime = System.currentTimeMillis();          if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {            return;          }          int start = 0;          int end = numConnections - 1;          if (!force) {            start = rand.nextInt() % numConnections;            end = rand.nextInt() % numConnections;            int temp;            if (end < start) {              temp = start;              start = end;              end = temp;            }          }          int i = start;          int numNuked = 0;          while (i <= end) {            Connection c;            synchronized (connectionList) {              try {                c = connectionList.get(i);              } catch (Exception e) {return;}            }            if (c.timedOut(currentTime)) {              if (LOG.isDebugEnabled())                LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());              closeConnection(c);              numNuked++;              end--;              c = null;              if (!force && numNuked == maxConnectionsToNuke) break;            }            else i++;          }          lastCleanupRunTime = System.currentTimeMillis();        }      }        @Override      public void run() {        LOG.info(getName() + ": starting");        SERVER.set(Server.this);        while (running) {          SelectionKey key = null;          try {            selector.select();            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();            while (iter.hasNext()) {              key = iter.next();              iter.remove();              try {                if (key.isValid()) {                  if (key.isAcceptable())                    doAccept(key);                }              } catch (IOException e) {              }              key = null;            }          } catch (OutOfMemoryError e) {            // we can run out of memory if we have too many threads            // log the event and sleep for a minute and give             // some thread(s) a chance to finish            LOG.warn("Out of Memory in server select", e);            closeCurrentConnection(key, e);            cleanupConnections(true);            try { Thread.sleep(60000); } catch (Exception ie) {}          } catch (Exception e) {            closeCurrentConnection(key, e);          }          cleanupConnections(false);        }        LOG.info("Stopping " + this.getName());          synchronized (this) {          try {            acceptChannel.close();            selector.close();          } catch (IOException e) { }            selector= null;          acceptChannel= null;                    // clean up all connections          while (!connectionList.isEmpty()) {            closeConnection(connectionList.remove(0));          }        }      }        private void closeCurrentConnection(SelectionKey key, Throwable e) {        if (key != null) {          Connection c = (Connection)key.attachment();          if (c != null) {            if (LOG.isDebugEnabled())              LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());            closeConnection(c);            c = null;          }        }      }        InetSocketAddress getAddress() {        return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();      }            void doAccept(SelectionKey key) throws IOException,  OutOfMemoryError {        Connection c = null;        ServerSocketChannel server = (ServerSocketChannel) key.channel();        SocketChannel channel;        while ((channel = server.accept()) != null) {          channel.configureBlocking(false);          channel.socket().setTcpNoDelay(tcpNoDelay);          Reader reader = getReader();          try {            reader.startAdd();            SelectionKey readKey = reader.registerChannel(channel);            c = new Connection(readKey, channel, System.currentTimeMillis());            readKey.attach(c);            synchronized (connectionList) {              connectionList.add(numConnections, c);              numConnections++;            }            if (LOG.isDebugEnabled())              LOG.debug("Server connection from " + c.toString() +                  "; # active connections: " + numConnections +                  "; # queued calls: " + callQueue.size());                    } finally {            reader.finishAdd();           }          }      }        void doRead(SelectionKey key) throws InterruptedException {        int count = 0;        Connection c = (Connection)key.attachment();        if (c == null) {          return;          }        c.setLastContact(System.currentTimeMillis());                try {          count = c.readAndProcess();        } catch (InterruptedException ieo) {          LOG.info(getName() + ": readAndProcess caught InterruptedException", ieo);          throw ieo;        } catch (Exception e) {          LOG.info(getName() + ": readAndProcess threw exception " + e + ". Count of bytes read: " + count, e);          count = -1; //so that the (count < 0) block is executed        }        if (count < 0) {          if (LOG.isDebugEnabled())            LOG.debug(getName() + ": disconnecting client " +                       c + ". Number of active connections: "+                      numConnections);          closeConnection(c);          c = null;        }        else {          c.setLastContact(System.currentTimeMillis());        }      }           synchronized void doStop() {        if (selector != null) {          selector.wakeup();          Thread.yield();        }        if (acceptChannel != null) {          try {            acceptChannel.socket().close();          } catch (IOException e) {            LOG.info(getName() + ":Exception in closing listener socket. " + e);          }        }        readPool.shutdown();      }        // The method that will return the next reader to work with      // Simplistic implementation of round robin for now      Reader getReader() {        currentReader = (currentReader + 1) % readers.length;        return readers[currentReader];      }      }