使用Java NIO编写高性能的服务器

jopen 11年前

从JDK 1.4开始,Java的标准库中就包含了NIO,即所谓的“New IO”。其中最重要的功能就是提供了“非阻塞”的IO,当然包括了Socket。NonBlocking的IO就是对select(Unix平台下)以及 WaitForMultipleObjects(Windows平台)的封装,提供了高性能、易伸缩的服务架构。

Server:

/**   *    */  package nio.file;    import java.io.FileInputStream;  import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.CharBuffer;  import java.nio.channels.FileChannel;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;  import java.nio.charset.Charset;  import java.nio.charset.CharsetDecoder;  import java.util.Iterator;    /**   * @author ztz   *   */  public class NIOServer {      static int BLOCK=4096;   //处理与客户端的交互   public class HandleClient{    protected FileChannel channel;    protected ByteBuffer buffer;    public HandleClient() throws IOException{     this.channel = new FileInputStream(filename).getChannel();     this.buffer = ByteBuffer.allocate(BLOCK);    }    public ByteBuffer readBlock(){     try {      buffer.clear();      int count = channel.read(buffer);      buffer.flip();      if(count<=0)       return null;     } catch (IOException e) {      e.printStackTrace();     }     return buffer;    }    public void close(){     try {      channel.close();     } catch (IOException e) {      e.printStackTrace();     }    }   }      protected Selector selector;   protected String filename = "XXX";   protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);   protected CharsetDecoder decoder;   public NIOServer(int port) throws IOException{    selector = this.getSelector(port);    Charset charset = Charset.forName("GB2312");    decoder = charset.newDecoder();   }   //获取Selector   protected Selector getSelector(int port) throws IOException{    ServerSocketChannel server = ServerSocketChannel.open();    Selector sel = Selector.open();    server.socket().bind(new InetSocketAddress(port));    server.configureBlocking(false);    server.register(sel, SelectionKey.OP_ACCEPT);    return sel;   }   //监听端口   public void listen(){     try {      for(;;){      selector.select();      Iterator iter = selector.selectedKeys().iterator();      while(iter.hasNext()){       SelectionKey key = (SelectionKey) iter.next();       iter.remove();       handleKey(key);      }      }     } catch (IOException e) {      e.printStackTrace();     }        }   //处理事件   protected void handleKey(SelectionKey key) throws IOException {    if(key.isAcceptable())//接收请求    {     ServerSocketChannel server = (ServerSocketChannel) key.channel();     SocketChannel channel = server.accept();     channel.configureBlocking(false);     channel.register(selector, SelectionKey.OP_READ);    }else if(key.isReadable())//读信息    {     SocketChannel channel = (SocketChannel) key.channel();     int count = channel.read(clientBuffer);     if(count>0){      clientBuffer.flip();      CharBuffer charBuffer = decoder.decode(clientBuffer);      System.out.println("Client>>"+charBuffer.toString());      SelectionKey wkey = channel.register(selector, SelectionKey.OP_WRITE);      wkey.attach(new HandleClient());     }else{      channel.close();      clientBuffer.clear();     }    }else if(key.isWritable())//写事件    {     SocketChannel channel = (SocketChannel) key.channel();     HandleClient handle = (HandleClient) key.attachment();     ByteBuffer block = handle.readBlock();     if(block!=null){      channel.write(block);     }else{      handle.close();      channel.close();     }    }   }   /**    * @param args    */   public static void main(String[] args) {    try {     int port = 12345;     NIOServer server = new NIOServer(port);     System.out.println("Listening on "+port);     while(true){      server.listen();     }    } catch (IOException e) {     e.printStackTrace();    }       }    }

Client:

/**   *    */  package nio.file;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.ByteBuffer;  import java.nio.CharBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import java.nio.charset.Charset;  import java.nio.charset.CharsetEncoder;  import java.util.Iterator;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    /**   * 文件下载客户端   * @author ztz   *   */  public class NIOClient {      static int SIZE = 100;   static InetSocketAddress ip = new InetSocketAddress("localhost",12345);   static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();   static class DownLoad implements Runnable{        int index;    public DownLoad(int index){     this.index = index;    }    @Override    public void run() {     try {      long start = System.currentTimeMillis();      SocketChannel client = SocketChannel.open();      client.configureBlocking(false);      Selector selector = Selector.open();      client.register(selector, SelectionKey.OP_CONNECT);      client.connect(ip);      ByteBuffer buffer = ByteBuffer.allocate(8*1024);      int total = 0;      FOR: for(;;){       selector.select();       Iterator iter = selector.selectedKeys().iterator();       while(iter.hasNext()){        SelectionKey key = (SelectionKey) iter.next();        iter.remove();        if(key.isConnectable()){         SocketChannel channel = (SocketChannel) key.channel();         if(channel.isConnectionPending()){          channel.finishConnect();          channel.write(encoder.encode(CharBuffer.wrap("Hello from "+index)));          channel.register(selector, SelectionKey.OP_READ);                   }        }else if (key.isReadable()){         SocketChannel channel = (SocketChannel) key.channel();         int count = channel.read(buffer);         if(count > 0){          total += count;          buffer.clear();         }else{          client.close();          break FOR;         }        }       }              double last = (System.currentTimeMillis()-start)*1.0/1000;       System.out.println("Thread"+index+" download "+total+" bytes in "+last+"s.");             }     } catch (IOException e) {      e.printStackTrace();     }    }       }     /**    * @param args    */   public static void main(String[] args) {    ExecutorService exec = Executors.newFixedThreadPool(SIZE);    for(int index = 0;index < SIZE;index++){     exec.execute(new DownLoad(index));    }    exec.shutdown();   }    }