Java NIO 实现进程通讯,解决用户自定义数据的组包和拆分粘包的问题

jopen 11年前

TCP通讯过程中,由于网络原因或者其他原因,经常出现粘包和半包现象。所以在具体编程中需要考虑。

下边的 java 代码是用 NIO 实现的一个Server端,消息的通讯格式为:


4字节int类型 [包头] + 包体.

包头描述出包体的长度。


package com.sof.nio;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.ServerSocketChannel;  import java.nio.channels.SocketChannel;    import java.util.Set;    import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    public class Reactor implements Runnable  {     private static Logger logger = LoggerFactory.getLogger(Reactor.class);     final Selector selector;   final ServerSocketChannel serverSocket;     public Reactor(String ip, int port) throws IOException   {    selector = Selector.open();    serverSocket = ServerSocketChannel.open();    serverSocket.socket().bind(new InetSocketAddress(port));    serverSocket.configureBlocking(false);    SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);    sk.attach(new Acceptor());   }     public void run()   {    try    {     while (!Thread.interrupted())     {      logger.debug("selector is waitting  event....");      selector.select();      Set<SelectionKey> keys = selector.selectedKeys();      if (keys.size() == 0)      {       logger.debug("nothing happened");       continue;      }        for (SelectionKey key : keys)      {       if (key.isAcceptable())       {        logger.debug("Acceptable event happened");       }       else if (key.isReadable())       {        logger.debug("Readable event happened");       }       else if (key.isWritable())       {        logger.debug("Writeable event happened");       }       else       {        logger.debug("others event happened");       }       dispatch((SelectionKey) key);      }      keys.clear();     }    }    catch (IOException ex)    {     logger.error(ex.getMessage());     ex.printStackTrace();    }   }     void dispatch(SelectionKey k)   {    Runnable r = (Runnable) (k.attachment());    if (r != null)    {     r.run();    }   }     public class Acceptor implements Runnable   {    public synchronized void run()    {     try     {      SocketChannel c = serverSocket.accept();      logger.info("got a new connection from:  " + c.socket().toString());      if (c != null)      {       new Handler(selector, c);      }     }     catch (IOException ex)     {      logger.error(ex.getMessage());      ex.printStackTrace();     }    }   }  }    package com.sof.nio;    import java.io.IOException;  import java.nio.ByteBuffer;  import java.nio.channels.SelectionKey;  import java.nio.channels.Selector;  import java.nio.channels.SocketChannel;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    import com.sof.bas.Bytes2util;  import com.sof.bas.Util2Bytes;    final public class Handler implements Runnable  {   private static Logger logger = LoggerFactory.getLogger(Handler.class);   final SocketChannel socket;   final SelectionKey sk;     static final int MESSAGE_LENGTH_HEAD = 4;   byte[] head = new byte[4];   int bodylen = -1;     Handler(Selector selector, SocketChannel socket) throws IOException   {    this.socket = socket;    socket.configureBlocking(false);    sk = socket.register(selector, 0);    sk.attach(this);    sk.interestOps(SelectionKey.OP_READ);    selector.wakeup();   }     public void run()   {    try    {     read();    }    catch (IOException ex)    {     try     {      socket.close();     }     catch (IOException e)     {      e.printStackTrace();     }     logger.info("got a disconnect from " + socket.socket().toString());     sk.cancel();    }   }     public synchronized void read() throws IOException   {    ByteBuffer input = ByteBuffer.allocate(1024);    socket.read(input);    input.flip();        //读取数据的原则: 要么读取一个完整的包头,要么读取一个完整包体。不满足这两种情况,不对ByteBuffer进行任何的get操作    //但是要注意可能发生上次读取了一个完整的包头,下次读才读取一个完整包体情况。    //所以包头部分必须用类的成员变量进行暂时的存储,当完整读取包头和包体后,在给业务处理部分。    logger.debug("1: remain=" + input.remaining() + " bodylen=" + bodylen);    while(input.remaining() > 0)    {     if (bodylen < 0) //还没有生成完整的包头部分, 该变量初始值为-1,并且在拼凑一个完整的消息包以后,再将该值设置为-1     {      if ( input.remaining() >= MESSAGE_LENGTH_HEAD) //ByteBuffer缓冲区的字节数够拼凑一个包头      {       input.get(head, 0, 4);       bodylen = Util2Bytes.bytes2bigint(head);       logger.debug("2: remain=" + input.remaining() + " bodylen=" + bodylen);      }      else//ByteBuffer缓冲区的字节数不够拼凑一个包头,什么操作都不做,退出这次处理,继续等待      {       logger.debug("3: remain=" + input.remaining() + " bodylen=" + bodylen);       break;      }     }     else if(bodylen > 0) //包头部分已经完整生成.      {      if (input.remaining() >= bodylen) //缓冲区的内容够一个包体部分      {       byte[] body = new byte[bodylen];       input.get(body, 0, bodylen);       byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];       System.arraycopy(head, 0, headandbody, 0, head.length);       System.arraycopy(body,0, headandbody, head.length, body.length);          bodylen = -1;       logger.debug("4: remain=" + input.remaining() + " bodylen=" + bodylen);       Bytes2util.outputHex(headandbody, 16);      }      else  ///缓冲区的内容不够一个包体部分,继续等待,跳出循环等待下次再出发该函数      {       System.out.println("5: remain=" + input.remaining() + " bodylen=" + bodylen);       break;      }     }     else if(bodylen == 0) //没有包体部分,仅仅有包头的情况     {      byte[] headandbody = new byte[MESSAGE_LENGTH_HEAD + bodylen];      System.arraycopy(head, 0, headandbody, 0, head.length);      Bytes2util.outputHex(headandbody, 16);      bodylen = -1;     }    }        sk.interestOps(SelectionKey.OP_READ);   }  }