Jva NIO框架 Apache Mina 2.x 简易入门解析

openkk 12年前

        最近使用Mina开发一个Java的NIO服务端程序,因此也特意学习了Apache的这个Mina框架。

        首先,Mina是个什么东西?看下项目主页(http://mina.apache.org/)对它的解释:

        Apache的Mina(Multipurpose Infrastructure Networked Applications)是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它提供了一个抽象的、事件驱动的异步API,使 Java NIO在各种传输协议(如TCP/IP,UDP/IP协议等)下快速高效开发。

        Apache Mina也称为:

  • NIO框架
  • 客户端/服务端框架(典型的C/S架构)
  • 网络套接字(networking socket)类库
  • 事件驱动的异步API(注意:在JDK7中也新增了异步API)

总之:我们简单理解它是一个封装底层IO操作,提供高级操作API的通讯框架!

 

在Mina的官网、以及网上都有比较丰富的文档了,这里我就稍微简单说一下Mina的结构和示例代码。

因为Mina2.X改进了Mina的代码结构和包结构,降低了使用的复杂性和增强了健壮性,所以使得API发生了比较大的改变,有许多地方已经和Mina1.x不兼容了。

这里使用的是Mina2.0.4

1.Mina的结构

Jva NIO框架 Apache Mina 2.x 简易入门解析

 

Mina的通信流程大致如上图所示,各个组件功能有:
(1.)  IoService:这个接口在一个线程上负责套接字的建立,拥有自己的Selector,监
听是否有连接被建立。

(Mina底层使用JAVA NIO, 因此它是典型的使用Reactor模式架构的,采用事件驱动编程 , Mina运行用户自定义线程模型,可以是单线程、多线程、线程池等 ,

   跟JAVA Socket不一样, Mina是非阻塞的Socket,它内部已经保证了对各个连接(session)的业务和数据的隔离,采用轮询机制为各个session分配CPU资源,

        所以,你就不需要再去考虑不同Socket连接需要用不同的线程去操纵的问题了。)

(2.)  IoProcessor:这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是
说它也拥有自己的Selector,这是与我们使用JAVA NIO 编码时的一个不同之处,
通常在JAVA NIO 编码中,我们都是使用一个Selector,也就是不区分IoService
与 IoProcessor 两个功能接口。另外,IoProcessor 负责调用注册在IoService 上
的过滤器,并在过滤器链之后调用IoHandler。


(3.)  IoFilter:这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤、
数据的编码(write 方向)与解码(read 方向)等功能,其中数据的encode 与 decode
是最为重要的、也是你在使用Mina 时最主要关注的地方。


(4.)  IoHandler:这个接口负责编写业务逻辑,也就是接收、发送数据的地方。
 

2. Mina编程的大致过程.

    2.1 总体流程

        建立服务器端的资源: 包括 Acceptor的建立,之后为Acceptor配置相应的Filter(可以是Mina自带的Filter或者自定义的Filter),

之后再配置相应基于事件驱动的处理业务逻辑的IoHandler.

       建立客户端的资源: Mina采用了统一的编程模型,所以建立客户端的过程和建立服务器端的过程大致上是相似的,不过这里建立的是Connector.

   2.2 示例程序。(使用jar包为 mina-core-2.0.4.jar)

            下面通过一个简单的示例程序来进一步理解Mina的运行机制。

                 该程序实现简单的即时通讯功能。 即,多个客户端可以同时脸上服务器,并进行类似于聊天室一样的通信。

         

       2.2.1 建立自定义的TextLineCodecFacotry

             为了了解Mina的代码功能以及运行机制,我们模拟实现了类似Mina自带TextLineCodecFactory。

                该CodecFactory功能是: 配合ProtocolCodecFilter 进行对底层数据(binary二进制数据流)和高层数据(特定类型的数据对象信息,例如String)之间的转换。

                                                                这里实现了一个断行读取功能,即遇到'\n'的时候,就认为是一个String Line , 将这段数据流封装成String,之后再交给下一个Filter或者Handler处理。

                                                                CodecFactory是一个工厂方法,底层通过一个Decoder和Encoder来提供对数据进行解、编码的操作,载体是IoBuffer

                                                                (解码:将二进制数据转换成高层数据(对象)                编码:将高层数据(对象)转换成二进制数据流)  )

                 1)建立Decoder (MyTextLineDecoder)

                        实现了ProtocolDecoder接口

package com.mai.mina.diyCodecFilter;    import java.nio.charset.Charset;    import org.apache.mina.core.buffer.IoBuffer;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolDecoder;  import org.apache.mina.filter.codec.ProtocolDecoderOutput;    public class MyTextLineDecoder implements ProtocolDecoder{        Charset charset = Charset.forName("UTF-8");      IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);            @Override      public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput output)              throws Exception {          // TODO Auto-generated method stub          while(in.hasRemaining()){              byte b = in.get();              if(b == '\n'){                  buf.flip();                  byte[] bytes = new byte[buf.limit()];                  buf.get(bytes);                  String message = new String(bytes,charset);                  buf = IoBuffer.allocate(100).setAutoExpand(true);                                    output.write(message);              }else{                  buf.put(b);              }          }      }        @Override      public void dispose(IoSession arg0) throws Exception {          // TODO Auto-generated method stub                }        @Override      public void finishDecode(IoSession arg0, ProtocolDecoderOutput arg1)              throws Exception {          // TODO Auto-generated method stub                }    }

                2)建立Encoder (MyTextLineEncoder)

                        实现了ProtocolEncoder接口

package com.mai.mina.diyCodecFilter;    import java.nio.charset.Charset;    import org.apache.mina.core.buffer.IoBuffer;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolEncoder;  import org.apache.mina.filter.codec.ProtocolEncoderOutput;  import org.apache.mina.filter.codec.textline.LineDelimiter;    public class MyTextLineEncoder implements ProtocolEncoder{        Charset charset = Charset.forName("UTF-8");            @Override      public void dispose(IoSession session) throws Exception {          // TODO Auto-generated method stub                }        @Override      public void encode(IoSession session, Object message, ProtocolEncoderOutput output)              throws Exception {          // TODO Auto-generated method stub          IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);                    buf.putString(message.toString(), charset.newEncoder());                    buf.putString(LineDelimiter.DEFAULT.getValue(), charset.newEncoder());          buf.flip();                    output.write(buf);                }    }

                3)建立MyTextLineCodecFactory

                        实现了ProtocolCodecFactory接口

package com.mai.mina.diyCodecFilter;    import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolCodecFactory;  import org.apache.mina.filter.codec.ProtocolDecoder;  import org.apache.mina.filter.codec.ProtocolEncoder;    public class MyTextLineCodecFactory implements ProtocolCodecFactory{        @Override      public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {          // TODO Auto-generated method stub          return new MyTextLineDecoder();      }        @Override      public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {          // TODO Auto-generated method stub          return new MyTextLineEncoder();      }    }
         2.2.2 建立服务器端资源(包括Acceptor的配置、Handler建立)

                1). 建立自定义IoHandler(MyServerHandleDemo1)

                        实现了IoHandler接口。

package com.mai.mina.diyChat;    import java.text.DateFormat;  import java.text.SimpleDateFormat;  import java.util.Collection;  import java.util.Date;  import java.util.logging.Logger;    import org.apache.mina.core.future.CloseFuture;  import org.apache.mina.core.future.IoFuture;  import org.apache.mina.core.future.IoFutureListener;  import org.apache.mina.core.service.IoHandler;  import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.core.session.IoSession;    public class MyServerHandleDemo1 implements IoHandler{        private Logger logger = Logger.getLogger(this.getClass().getName());            @Override      public void exceptionCaught(IoSession session, Throwable arg1)              throws Exception {          // TODO Auto-generated method stub          logger.warning("服务器启动发生异常,have a exception : " + arg1.getMessage());      }        @Override      public void messageReceived(IoSession session, Object message) throws Exception {          // TODO Auto-generated method stub          String messageStr = message.toString();          DateFormat format = new SimpleDateFormat("yyyy-MM-dd H:m:s");          String dateStr = format.format(new Date());                    logger.info(messageStr + "\t" + dateStr);                    Collection sessions = session.getService().getManagedSessions().values();          for(IoSession tempSession : sessions){              tempSession.write(messageStr + "\t" + dateStr);          }      }        @Override      public void messageSent(IoSession session, Object message) throws Exception {          // TODO Auto-generated method stub          logger.info("服务器成功发送信息: " + message.toString());      }        @Override      public void sessionClosed(IoSession session) throws Exception {          // TODO Auto-generated method stub          logger.info("there is a session closed");          CloseFuture future = session.close(true);          future.addListener(new IoFutureListener(){              public void operationComplete(IoFuture future){                  if(future instanceof CloseFuture){                      ((CloseFuture)future).setClosed();                      logger.info("have do the future set to closed");                  }              }          });      }        @Override      public void sessionCreated(IoSession session) throws Exception {          // TODO Auto-generated method stub          logger.info("there is a session created");          session.write("welcome to the chat room");      }        @Override      public void sessionIdle(IoSession session, IdleStatus arg1) throws Exception {          // TODO Auto-generated method stub          logger.info(session.getId() + "(SesssionID) is idle in the satate-->" + arg1);      }        @Override      public void sessionOpened(IoSession arg0) throws Exception {          // TODO Auto-generated method stub                }          }
            2).建立Acceptor ,同时也充当Server端的启动类 (SimpleMinaServer)
package com.mai.mina.diyChat;    import java.io.IOException;  import java.net.InetSocketAddress;  import java.nio.charset.Charset;    import org.apache.mina.core.session.IdleStatus;  import org.apache.mina.filter.codec.ProtocolCodecFilter;  import org.apache.mina.filter.codec.textline.LineDelimiter;  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;  import org.apache.mina.filter.logging.LogLevel;  import org.apache.mina.filter.logging.LoggingFilter;  import org.apache.mina.transport.socket.SocketAcceptor;  import org.apache.mina.transport.socket.nio.NioSocketAcceptor;    public class SimpleMinaServer {            SocketAcceptor acceptor = null;            SimpleMinaServer(){          acceptor = new NioSocketAcceptor();      }            public boolean bind(){          acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(                  new MyTextLineCodecFactory()); //配置CodecFactory          LoggingFilter log = new LoggingFilter();          log.setMessageReceivedLogLevel(LogLevel.INFO);          acceptor.getFilterChain().addLast("logger", log);          acceptor.setHandler(new MyServerHandleDemo1());        //配置handler          acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);                    try {              acceptor.bind(new InetSocketAddress(8888));              return true;          } catch (IOException e) {              // TODO Auto-generated catch block              e.printStackTrace();              return false;          }                }            public static void main(String args[]){          SimpleMinaServer server = new SimpleMinaServer();          if(!server.bind()){              System.out.println("服务器启动失败");          }else{              System.out.println("服务器启动成功");          }      }    }
         2.2.3 建立Client端资源:

                1)自定义IoHandler(MyClientHandleDemo1)

                        实现IoHandler接口

package com.mai.mina.diyChat;    import java.util.logging.Logger;    import org.apache.mina.core.service.IoHandlerAdapter;  import org.apache.mina.core.session.IoSession;    public class MyClientHandleDemo1 extends IoHandlerAdapter{      private ChatPanel messagePanel = null;      private Logger logger = Logger.getLogger(this.getClass().getName());        MyClientHandleDemo1(){                }            MyClientHandleDemo1(ChatPanel messagePanel){          this.messagePanel = messagePanel;      }                  public void messageReceived(IoSession session, Object message) throws Exception {          // TODO Auto-generated method stub          String messageStr = message.toString();          logger.info("receive a message is : " + messageStr);                    if(messagePanel != null)          messagePanel.showMsg(messageStr);      }            public void messageSent(IoSession session , Object message) throws Exception{          logger.info("客户端发了一个信息:" + message.toString());      }        }
                2) 建立Connector   (SimpleMinaClient)
package com.mai.mina.diyChat;    import java.net.InetSocketAddress;  import java.nio.charset.Charset;    import org.apache.mina.core.future.CloseFuture;  import org.apache.mina.core.future.ConnectFuture;  import org.apache.mina.core.session.IoSession;  import org.apache.mina.filter.codec.ProtocolCodecFilter;  import org.apache.mina.filter.codec.textline.LineDelimiter;  import org.apache.mina.filter.codec.textline.TextLineCodecFactory;  import org.apache.mina.filter.logging.LogLevel;  import org.apache.mina.filter.logging.LoggingFilter;  import org.apache.mina.transport.socket.SocketConnector;  import org.apache.mina.transport.socket.nio.NioSocketConnector;    public class SimpleMinaClient {        public SocketConnector connector = null;      public ConnectFuture future;      public IoSession session = null;      private ChatPanel messagePanel = null;            SimpleMinaClient(){                }            SimpleMinaClient(ChatPanel messagePanel){          this.messagePanel = messagePanel;      }                  boolean connect(){          try{              connector = new NioSocketConnector();              connector.setConnectTimeoutMillis(3000);              connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(                  new MyTextLineCodecFactory());              LoggingFilter log = new LoggingFilter();              log.setMessageReceivedLogLevel(LogLevel.INFO);              connector.getFilterChain().addLast("logger", log);              connector.setHandler(new MyClientHandleDemo1(messagePanel));                            future = connector.connect(new InetSocketAddress("127.0.0.1" , 8888));              future.awaitUninterruptibly();              session = future.getSession();              return true;          }catch(Exception e){              e.printStackTrace();              return false;          }                }                  public void setAttribute(Object key , Object value){          session.setAttribute(key, value);      }                  void sentMsg(String message){          session.write(message);      }                  boolean close(){          CloseFuture future = session.getCloseFuture();          future.awaitUninterruptibly(1000);          connector.dispose();          return true;      }            public SocketConnector getConnector() {          return connector;      }        public IoSession getSession() {          return session;      }        /**       * @param args       */      public static void main(String[] args) {          // TODO Auto-generated method stub          SimpleMinaClient client = new SimpleMinaClient();          if(client.connect()){              client.sentMsg("hello , sever !");              client.close();          }                }    }

        到这里,基本的Mina通信基础就建立好了。

        接下来实现一个客户端的GUI界面,方便实际功能的建立和信息交互的演示。

 

        2.2.4 Client Gui界面的建立。(ChatPanel -通过使用SimpleMinaClient来提供实际通信功能)

package com.mai.mina.diyChat;  import java.awt.BorderLayout;  import java.awt.event.ActionEvent;  import java.awt.event.ActionListener;  import java.awt.event.WindowAdapter;  import java.awt.event.WindowEvent;    import javax.swing.JButton;  import javax.swing.JFrame;  import javax.swing.JLabel;  import javax.swing.JPanel;  import javax.swing.JScrollPane;  import javax.swing.JTextArea;  import javax.swing.JTextField;    import org.apache.commons.lang.math.RandomUtils;    public class ChatPanel extends javax.swing.JPanel {      private JPanel northPanel;      private JLabel headLabel;      private JScrollPane jScrollPane1;      private JScrollPane jScrollPane2;      private JButton exitB;      private JButton clearMsgB;      private JButton sentB;      private JButton connectB;      private JTextArea messageText;      private JTextField nameText;      private JLabel nameLabel;      private JTextArea messageArea;      private JPanel southPanel;      private SimpleMinaClient client = null;      private boolean connected = false;      private String username = null;        {          //Set Look & Feel          try {              javax.swing.UIManager.setLookAndFeel("com.sun.java.swing.plaf.windows.WindowsLookAndFeel");          } catch(Exception e) {              e.printStackTrace();          }      }        public void connect(){          if(client.connect()){              username = nameText.getText().trim();              if(username == null || "".equals(username)){                  username = "游客" + RandomUtils.nextInt(1000);                  nameText.setText(username);              }              connected = true;              dealUIWithFlag();          }else{              connected = false;                 dealUIWithFlag();              showMsg("连接服务器失败。。。");          }      }                  public void showMsg(String msg){          messageArea.append(msg);          messageArea.append("\n");          messageArea.selectAll();          messageArea.lostFocus(null, this);      }                  public void sentMsg(){          String message = username + ":" + messageText.getText();          client.sentMsg(message);          messageText.setText("");          messageText.requestFocus();      }            public void dealUIWithFlag(){          if(connected){              nameText.setEnabled(false);              connectB.setEnabled(false);              sentB.setEnabled(true);              clearMsgB.setEnabled(true);              exitB.setEnabled(true);          }else{              nameText.setEnabled(true);              connectB.setEnabled(true);              sentB.setEnabled(false);              clearMsgB.setEnabled(false);              exitB.setEnabled(false);          }      }                  public void closeTheClient(){          if(client.close()){              showMsg("连接已断开...");              connected = false;              dealUIWithFlag();          }else{              showMsg("无法断开连接...");          }      }                    public static void main(String[] args) {          JFrame frame = new JFrame();          frame.getContentPane().add(new ChatPanel());          frame.addWindowListener(new WindowAdapter(){              public void windowClosing(WindowEvent e){                  System.exit(0);              }          });          frame.pack();          frame.setLocationByPlatform(true);          frame.setVisible(true);      }            public ChatPanel() {          super();          client = new SimpleMinaClient(this);          initGUI();          dealUIWithFlag();      }            private void initGUI() {          try {              this.setPreferredSize(new java.awt.Dimension(400, 339));              this.setLayout(null);              {                  northPanel = new JPanel();                  BorderLayout northPanelLayout = new BorderLayout();                  northPanel.setLayout(northPanelLayout);                  this.add(northPanel);                  northPanel.setBounds(0, 0, 400, 188);                  {                      headLabel = new JLabel();                      northPanel.add(headLabel, BorderLayout.NORTH);                      headLabel.setText("\u6b22\u8fce\u4f7f\u7528 (\u6d4b\u8bd5Ip:port --> 127.0.0.1:8888)");                      headLabel.setPreferredSize(new java.awt.Dimension(397, 19));                  }                  {                      jScrollPane1 = new JScrollPane();                      northPanel.add(jScrollPane1, BorderLayout.CENTER);                      jScrollPane1.setPreferredSize(new java.awt.Dimension(400, 169));                      {                          messageArea = new JTextArea();                          jScrollPane1.setViewportView(messageArea);                          messageArea.setPreferredSize(new java.awt.Dimension(398, 145));                          messageArea.setEditable(false);                          messageArea.setLineWrap(true);                          messageArea.setWrapStyleWord(true);                      }                  }              }              {                  southPanel = new JPanel();                  this.add(southPanel);                  southPanel.setBounds(0, 194, 400, 145);                  southPanel.setLayout(null);                  {                      nameLabel = new JLabel();                      southPanel.add(nameLabel);                      nameLabel.setText("\u6635\u79f0:");                      nameLabel.setBounds(10, 12, 35, 15);                  }                  {                      nameText = new JTextField();                      southPanel.add(nameText);                      nameText.setText("\u6e38\u5ba2");                      nameText.setBounds(45, 9, 96, 21);                  }                  {                        jScrollPane2 = new JScrollPane();                      southPanel.add(jScrollPane2);                      jScrollPane2.setBounds(15, 37, 364, 69);                      {                          messageText = new JTextArea();                          jScrollPane2.setViewportView(messageText);                          messageText.setBounds(101, 72, 362, 75);                          messageText.setPreferredSize(new java.awt.Dimension(362, 54));                          messageText.setLineWrap(true);                          messageText.setWrapStyleWord(true);                      }                  }                  {                      connectB = new JButton();                      southPanel.add(connectB);                      connectB.setText("\u8fde\u63a5\u670d\u52a1\u5668");                      connectB.setBounds(179, 8, 93, 23);                      connectB.addActionListener(new ActionListener() {                          public void actionPerformed(ActionEvent evt) {                              System.out.println("connectB.actionPerformed, event="+evt);                              //TODO add your code for connectB.actionPerformed                              connect();                          }                      });                  }                  {                      sentB = new JButton();                      southPanel.add(sentB);                      sentB.setText("\u53d1\u9001");                      sentB.setBounds(261, 116, 57, 23);                      sentB.addActionListener(new ActionListener() {                          public void actionPerformed(ActionEvent evt) {                              System.out.println("sentB.actionPerformed, event="+evt);                              //TODO add your code for sentB.actionPerformed                              sentMsg();                          }                      });                  }                  {                      clearMsgB = new JButton();                      southPanel.add(clearMsgB);                      clearMsgB.setText("\u6e05\u7a7a");                      clearMsgB.setBounds(324, 116, 57, 23);                      clearMsgB.addActionListener(new ActionListener() {                          public void actionPerformed(ActionEvent evt) {                              System.out.println("clearMsgB.actionPerformed, event="+evt);                              //TODO add your code for clearMsgB.actionPerformed                              messageText.setText("");                          }                      });                  }                  {                      exitB = new JButton();                      southPanel.add(exitB);                      exitB.setText("\u6ce8\u9500");                      exitB.setBounds(282, 8, 57, 23);                      exitB.addActionListener(new ActionListener() {                          public void actionPerformed(ActionEvent evt) {                              System.out.println("exitB.actionPerformed, event="+evt);                              //TODO add your code for exitB.actionPerformed                              closeTheClient();                          }                      });                  }              }          } catch (Exception e) {              e.printStackTrace();          }       }    }
3. 运行结果

        首先启动服务器端,即运行SimpleMinaServer类 , 启动成功时会在控制台中打印出“服务器启动成功"

        接下来运行客户端ChatPanel。

Jva NIO框架 Apache Mina 2.x 简易入门解析



note: 上面只是一个简单的信息交互,其实使用Mina比较常用的还是在自定义协议处理这块。

                所以,比较应该注重学习的是Filter这块。有时间大家可以去看看源码。


来自:http://www.cnblogs.com/mailingfeng/archive/2012/02/08/2342522.html