Apache Mina server简单编解码实现

openkk 10年前

协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据
时将JAVA 对象编码二进制数据,而接收数据时将二进制数据解码为JAVA 对象(这个可不是JAVA 对象的序列化、反序列化那么简单的事情)。废话少说直接上代码:

 

Server:

package com.cemso.mina.server;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.charset.Charset;    import org.apache.mina.core.service.IoAcceptor;  import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.filter.codec.ProtocolCodecFilter;  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;  import org.apache.mina.filter.logging.LoggingFilter;  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;    import com.cemso.mina.coder.CmccSipcCodecFactory;    /**   * @author gl65293   */  public class MinaTimeServer {        private static final int PORT = 9123;        public static void main(String[] args) throws IOException{            IoAcceptor acceptor = new NioSocketAcceptor();            acceptor.getFilterChain().addLast("logger", new LoggingFilter());  //        acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));                    acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8"))));                    acceptor.setHandler(new TimeServerHandler());          acceptor.getSessionConfig().setReadBufferSize(2048);          acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);          acceptor.bind(new InetSocketAddress(PORT));        }  }
package com.cemso.mina.server;    /**   * @author gl65293   *   */  import org.apache.mina.core.service.IoHandlerAdapter;  import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.core.session.IoSession;    import com.cemso.mina.dto.SmsObject;    public class TimeServerHandler extends IoHandlerAdapter {        public TimeServerHandler() {      }        public void exceptionCaught(IoSession session, Throwable cause) throws Exception {          cause.printStackTrace();      }        public void messageReceived(IoSession session, Object message) throws Exception {                    SmsObject sms = (SmsObject)message;          System.out.println(sms.getMessage());          System.out.println(sms.getSender());          System.out.println(sms.getReceiver());                }        public void sessionIdle(IoSession session, IdleStatus status) throws Exception {          System.out.println((new StringBuilder()).append("IDLE ").append(session.getIdleCount(status)).toString());      }  }
dto:
package com.cemso.mina.dto;    /**   * @author gl65293   *   */  public class SmsObject {        private String sender;      private String receiver;      private String message;      /**       * @return the sender       */      public String getSender() {          return sender;      }      /**       * @param sender the sender to set       */      public void setSender(String sender) {          this.sender = sender;      }      /**       * @return the receiver       */      public String getReceiver() {          return receiver;      }      /**       * @param receiver the receiver to set       */      public void setReceiver(String receiver) {          this.receiver = receiver;      }      /**       * @return the message       */      public String getMessage() {          return message;      }      /**       * @param message the message to set       */      public void setMessage(String message) {          this.message = message;      }  }
encoder and decoder:
package com.cemso.mina.coder;    import java.nio.charset.Charset;    import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolCodecFactory;  import org.apache.mina.filter.codec.ProtocolDecoder;  import org.apache.mina.filter.codec.ProtocolEncoder;    /**   * @author gl65293   *   */  public class CmccSipcCodecFactory implements ProtocolCodecFactory {        private final CmccSipcEncoder encoder;      private final CmccSipcDecoder decoder;            public CmccSipcCodecFactory(){          this(Charset.defaultCharset());      }      /**       * @param defaultCharset       */      public CmccSipcCodecFactory(Charset charSet) {          this.encoder = new CmccSipcEncoder(charSet);          this.decoder = new CmccSipcDecoder(charSet);      }      /* (non-Javadoc)       * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getDecoder(org.apache.mina.core.session.IoSession)       */      @Override      public ProtocolDecoder getDecoder(IoSession iosession) throws Exception {          // TODO Auto-generated method stub          return decoder;      }        /* (non-Javadoc)       * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getEncoder(org.apache.mina.core.session.IoSession)       */      @Override      public ProtocolEncoder getEncoder(IoSession iosession) throws Exception {          // TODO Auto-generated method stub          return encoder;      }    }
package com.cemso.mina.coder;    import java.nio.charset.Charset;  import java.nio.charset.CharsetDecoder;    import org.apache.mina.core.buffer.IoBuffer;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.CumulativeProtocolDecoder;  import org.apache.mina.filter.codec.ProtocolDecoderOutput;    import com.cemso.mina.dto.SmsObject;    /**   * @author gl65293   *   */  public class CmccSipcDecoder extends CumulativeProtocolDecoder {        private final Charset charset;                  /**       * @param charset       */      public CmccSipcDecoder(Charset charset) {          this.charset = charset;      }        /* (non-Javadoc)       * @see org.apache.mina.filter.codec.CumulativeProtocolDecoder#doDecode(org.apache.mina.core.session.IoSession, org.apache.mina.core.buffer.IoBuffer, org.apache.mina.filter.codec.ProtocolDecoderOutput)       */      @Override      protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {          // TODO Auto-generated method stub                    IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);          CharsetDecoder decoder = charset.newDecoder();                    int matchCount = 0;            String statusLine = "";          String sender = "";          String receiver = "";          String length = "";          String sms="";          int i = 1;                    while(in.hasRemaining()){              byte b = in.get();              buffer.put(b);                            if(b == 10 && i < 5){                  matchCount ++;                  if(i == 1){                      buffer.flip();                      statusLine = buffer.getString(matchCount,decoder);                      statusLine = statusLine.substring(0, statusLine.length()-1);                      matchCount = 0;                      buffer.clear();                  }                  if(i == 2){                      buffer.flip();                      sender = buffer.getString(matchCount,decoder);                      sender = sender.substring(0, sender.length()-1);                      matchCount = 0;                      buffer.clear();                  }                  if(i == 3){                      buffer.flip();                      receiver = buffer.getString(matchCount,decoder);                      receiver = receiver.substring(0, receiver.length()-1);                      matchCount = 0;                      buffer.clear();                  }                  if(i == 4){                      buffer.flip();                      length = buffer.getString(matchCount,decoder);                      length = length.substring(0, length.length()-1);                      matchCount = 0;                      buffer.clear();                  }                                    i++;              }else if(i == 5){                  matchCount ++;                  if(matchCount == Long.parseLong(length.split(": ")[1])){                      buffer.flip();                      sms = buffer.getString(matchCount,decoder);                      i++;                      break;                  }              }else{                  matchCount ++;              }          }                    SmsObject smsObject = new SmsObject();          smsObject.setSender(sender.split(": ")[1]);          smsObject.setReceiver(receiver.split(": ")[1]);          smsObject.setMessage(sms);                    out.write(smsObject);                    return false;      }    }
package com.cemso.mina.coder;      import java.nio.charset.Charset;  import java.nio.charset.CharsetEncoder;    import org.apache.mina.core.buffer.IoBuffer;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolEncoderAdapter;  import org.apache.mina.filter.codec.ProtocolEncoderOutput;    import com.cemso.mina.dto.SmsObject;    /**   * @author gl65293   *   */  public class CmccSipcEncoder extends ProtocolEncoderAdapter {        private final Charset charset;            public CmccSipcEncoder(Charset charset){          this.charset = charset;      }            /* (non-Javadoc)       * @see org.apache.mina.filter.codec.ProtocolEncoder#encode(org.apache.mina.core.session.IoSession, java.lang.Object, org.apache.mina.filter.codec.ProtocolEncoderOutput)       */      @Override      public void encode(IoSession iosession, Object obj, ProtocolEncoderOutput out) throws Exception {          // TODO Auto-generated method stub          SmsObject sms = (SmsObject)obj;          CharsetEncoder charst = charset.newEncoder();          IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);          String statusLine = "M sip:wap.fetion.com.cn SIP-C/2.0";          String sender = sms.getSender();          String recevier = sms.getReceiver();          String smsContent = sms.getMessage();                    buffer.putString(statusLine+'\n', charst);          buffer.putString("S: "+sender+'\n', charst);          buffer.putString("R: "+recevier+'\n', charst);          buffer.putString("L: "+smsContent.getBytes(charset).length+"\n", charst);          buffer.putString(smsContent, charst);                    buffer.flip();          out.write(buffer);        }    }
client:
package com.cemso.mina.client;    /**   * @author gl65293   *   */  import java.net.InetSocketAddress;  import java.nio.charset.Charset;    import org.apache.mina.core.service.IoConnector;  import org.apache.mina.filter.codec.ProtocolCodecFilter;  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;  import org.apache.mina.transport.socket.nio.NioSocketConnector;    import com.cemso.mina.coder.CmccSipcCodecFactory;    // Referenced classes of package org.apache.mina.example.sumup:  //            ClientSessionHandler    public class Client {        public Client() {      }        public static void main(String args[]) throws Throwable {        /*    if (args.length == 0) {              System.out.println("Please specify the list of any integers");              return;          }          // prepare values to sum up          int[] values = new int[args.length];          for (int i = 0; i < args.length; i++) {              values[i] = Integer.parseInt(args[i]);          }            NioSocketConnector connector = new NioSocketConnector();            // Configure the service.          connector.setConnectTimeoutMillis(CONNECT_TIMEOUT);          if (USE_CUSTOM_CODEC) {              connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new SumUpProtocolCodecFactory(false)));          } else {              connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));          }          connector.getFilterChain().addLast("logger", new LoggingFilter());          connector.setHandler(new ClientSessionHandler(values));            IoSession session;          for (;;) {              try {                  ConnectFuture future = connector.connect(new InetSocketAddress(HOSTNAME, PORT));                  future.awaitUninterruptibly();                  session = future.getSession();                  break;              } catch (RuntimeIoException e) {                  System.err.println("Failed to connect.");                  e.printStackTrace();                  Thread.sleep(5000);              }          }            // wait until the summation is done          session.getCloseFuture().awaitUninterruptibly();          connector.dispose();*/                    IoConnector connector = new NioSocketConnector();          connector.setHandler(new ClientSessionHandler("你好!\r\n哈哈!"));          connector.setConnectTimeoutMillis(30000);  //        connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));          connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new CmccSipcCodecFactory(Charset.forName("UTF-8"))));                    connector.connect(new InetSocketAddress("127.0.0.1",9123));                  }    }
package com.cemso.mina.client;    import org.apache.mina.core.service.IoHandlerAdapter;  import org.apache.mina.core.session.IoSession;    import com.cemso.mina.dto.SmsObject;    /**   * @author gl65293   *   */  public class ClientSessionHandler extends IoHandlerAdapter{      private final String values;            public ClientSessionHandler(String values){          this.values = values;      }            public void sessionOpened(IoSession session){          //session.write(values);                    SmsObject sms = new SmsObject();          sms.setSender("18817261072");          sms.setReceiver("15951892458");          sms.setMessage("你好! Hello world!");          session.write(sms);      }  }