Apache Mina 2 学习笔记


1 Apache Mina2 学习笔记 作者:李少华 邮箱:xiaosanshaoli@126.com QQ:305409913 2010-12-23 初稿 目录 2 引言..........................................................................................................................................................3 一. Mina 入门................................................................................................................................3 第一步.下载使用的 Jar包..........................................................................................................3 第二步.工程创建配置.................................................................................................................. 4 第三步.服务端程序...................................................................................................................... 4 第四步.使用 telnet 命令来测试服务端...................................................................................6 第五步.客户端程序...................................................................................................................... 9 第六步.长连接 VS短连接..........................................................................................................11 第七步.入门程序 02................................................................................................................... 12 第八步.入门程序 03................................................................................................................... 17 二. Mina 基础..............................................................................................................................21 2.1.IoService 接口................................................................................................................... 22 2.1.1 类结构...................................................................................................................... 22 2.1.2 应用.......................................................................................................................... 23 总结.......................................................................................................................................26 2.2.IoFilter 接口..................................................................................................................... 26 2.2.1 类结构...................................................................................................................... 26 2.2.2 应用.......................................................................................................................... 28 总结.......................................................................................................................................62 2.3.IoHandler 接口................................................................................................................... 62 三. Mina 解析..............................................................................................................................63 3.1 NIO 分析................................................................................................................................63 3.1.1 传统阻塞服务器................................................................................................. 64 3.1.2 多线程阻塞服务器............................................................................................. 68 3.1.3 使用 JDK自带线程池的阻塞服务器................................................................74 3.1.4 NIO 基础知识......................................................................................................77 3.1.5 基于 NIO的阻塞服务器.....................................................................................81 3.1.6 基于 NIO的非阻塞服务器.................................................................................85 3.1.7 多线程的基于 NIO 的非阻塞服务器................................................................85 3.2 异步操作分析..................................................................................................................85 3.3 Mina 内部实现分析.........................................................................................................85 3.4 Mina 的线程模型配置.........................................................................................................85 四. Mina 实例..............................................................................................................................87 五. 后记.......................................................................................................................................88 3 引言 最近使用 Mina 开发一个 Java 的 NIO 服务端程序,因此也特意学习了 Apache 的这个 Mina框架。 首先,Mina 是个什么东西?看下官方网站(http://mina.apache.org/)对 它的解释: Apache 的Mina(Multipurpose Infrastructure Networked Applications) 是一个网络应用框架,可以帮助用户开发高性能和高扩展性的网络应用程序;它 提供了一个抽象的、事件驱动的异步 API,使Java NIO在各种传输协议(如 TCP/IP,UDP/IP 协议等)下快速高效开发。 Apache Mina也称为: � NIO框架 � 客户端/服务端框架(典型的 C/S架构) � 网络套接字(networking socket)类库 � 事件驱动的异步 API(注意:在 JDK7中也新增了异步 API) 总之:我们简单理解它是一个封装底层 IO操作,提供高级操作 API的通讯 框架! (本文所有内容仅针对 Mina2.0 在TCP/IP 协议下的应用开发;源码下载: http://download.csdn.net/source/2959399 ) 一. Mina 入门 先用 Mina 做一个简单的应用程序。 第一步.下载使用的 Jar包 a. 登录 http://mina.apache.org/downloads.html 下载 mina2.0.1.zip,解压 获得 mina-core-2.0.0-M1.jar b. 登录 http://www.slf4j.org/download.html 下载 slf4j1.5.2.zip,解压获 得slf4j-api-1.5.2.jar 与 slf4j-log4j12-1.5.2.jar c. 添加Log4j 的jar包,注意如果使用 slf4j-log4j12-XXX.jar,就需要添加 log4j1.2.X。我这里使用的是 log4j-1.2.14.jar (Logger 和slf配置详情 参见 http://mina.apache.org/first-steps.html ) OK,4个jar都完备了。 第二步.工程创建配置 创建一个 Java Project(默认使用 UTF-8 编码格式),添加 log4j.properties log4j.rootLogger=DEBUG,MINA,file log4j.appender.MINA=org.apache.log4j.ConsoleAppender 4 log4j.appender.MINA.layout=org.apache.log4j.PatternLayout log4j.appender.MINA.layout.ConversionPattern=%d{yyyy-MM-dd HH\:mm\:ss,SSS} %-5p %c{1} %x -%m%n log4j.appender.file=org.apache.log4j.RollingFileAppender log4j.appender.file.File=./log/minademos.log log4j.appender.file.MaxFileSize=5120KB log4j.appender.file.MaxBackupIndex=10 log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[VAMS][%d] %p | %m | [%t] %C.%M(%L)%n 第三步.服务端程序 创建一个简单的服务端程序:(服务端绑定 3005 端口) public class Demo1Server { private static Logger logger = Logger.getLogger(Demo1Server.class); private static int PORT = 3005; public static void main(String[] args) { IoAcceptor acceptor = null;// 创建连接 try { // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 设置过滤器(使用Mina提供的文本换行符编解码器) acceptor.getFilterChain().addLast( //添加消息过滤器 "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); // 设置读取数据的缓冲区大小 acceptor.getSessionConfig().setReadBufferSize(2048); // 读写通道10秒内无操作进入空闲状态 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); // 绑定逻辑处理器 acceptor.setHandler(new Demo1ServerHandler()); // 添加业务处理 // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); logger.info("服务端启动成功... 端口号为:" + PORT); } catch (Exception e) { logger.error("服务端启动异常....", e); e.printStackTrace(); } } 5 } 无需解释,大家看代码的注释就了解一二了; 注意:创建服务端最主要的就是绑定服务端的消息编码解码过滤器和业务逻 辑处理器; 什么是编码与解码哪?大家知道,网络传输的数据都是二进制数据,而我们 的程序不可能直接去操作二进制数据;这时候我们就需要来把接收到的字节数组 转换为字符串,当然完全可以转换为任何一个 java基本数据类型或对象,这就 是解码!而编码恰好相反,就是把要传输的字符串转换为字节;编码是在发送消 息时触发的。 上面使用是 Mina 自带的根据文本换行符编解码的 TextLineCodec 过滤器 ------ 指定参数为根据 windows 的换行符编解码,遇到客户端发送来的消息, 看到 windows 换行符(\r\n)就认为是一个完整消息的结束符了;而发送给客户 端的消息,都会在消息末尾添加上(\r\n)文本换行符; 业务逻辑处理器是 Demo1ServerHandler---看它的具体实现: public class Demo1ServerHandler extends IoHandlerAdapter { public static Logger logger = Logger.getLogger(Demo1ServerHandler.class); @Override public void sessionCreated(IoSession session) throws Exception { logger.info("服务端与客户端创建连接..."); } @Override public void sessionOpened(IoSession session) throws Exception { logger.info("服务端与客户端连接打开..."); } @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString(); logger.info("服务端接收到的数据为:" + msg); if ("bye".equals(msg)) {// 服务端断开连接的条件 session.close(); } Date date = new Date(); session.write(date); } @Override public void messageSent(IoSession session, Object message) throws Exception { logger.info("服务端发送信息成功..."); } @Override 6 public void sessionClosed(IoSession session) throws Exception { } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.info("服务端进入空闲状态..."); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("服务端发送异常...", cause); } } 自定义的业务逻辑处理器继承了 IoHandlerAdapter 类,它默认覆盖了父类 的7个方法,其实我们最关心最常用的只有一个方法:messageReceived() ---- 服务端接收到一个消息后进行业务处理的方法; 解析下它的实现: @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString(); logger.info("服务端接收到的数据为:" + msg); if ("bye".equals(msg)) {// 服务端断开连接的条件 session.close(); } Date date = new Date(); session.write(date); } 接收并打印客户端信息,返回给客户端一个日期字符串;如果客户端传递的 消息为“bye”,就是客户端告诉服务端,可以终止通话了,关闭与客户端的连接。 第四步.使用 telnet 命令来测试服务端 使用我们最常用的 telnet 命令来测试服务端程序。具体步骤如下: a. 启动服务端程序; 2010-12-31 11:06:35,281 INFO Demo1Server - 服务端启动成功... 端口号为:3005 b. Windows 下开始菜单,运行,输入 cmd,回车,进入 DOS界面; 7 c. 输入:telnet 127.0.0.1 3005 回车 d. 连接成功后,服务端程序的后台会打印如下信息:这个就是业务逻辑逻辑处 理器打印的: e. telnet 中随便输入一个字符串,回车;则可以看到返回的日期; f. 输入 bye,回车,提示服务端断开连接 如果需要重新测试,则需要再次重复 C的步骤; 注意: 不要输入中文字符,windows 不会把中文字符用 utf-8 编码再发送给服务端的; 8 这个就是用 Mina实现的服务端程序啦。功能很简单:服务端一直监听 3005 端口,如果有客户端连接上服务端并发送信息,服务端解析信息(以文本换行符 为每条信息的结束符),并返回给服务端一个日期时间。 Mina的底层通信无疑是用 socket 实现的,它封装后提供给我们一个简单易 用的接口。其实基于 Http 协议的通信也是如此,我们完全可以用 mina写一个类 似Servlet 的基类,来响应各种 http请求。上面的第一个例子可以稍加演示: a. 启动服务端程序; 2010-12-31 11:21:33,296 INFO Demo1Server - 服务端启动成功... 端口号为:3005 b. 打开浏览器,输入: http://127.0.0.1:3005/ 回车; c. 服务端响应如下: 2010-12-31 11:21:33,296 INFO Demo1Server - 服务端启动成功... 端口号为:3005 2010-12-31 11:22:06,828 INFO Demo1ServerHandler - 服务端与客户端创建连接... 2010-12-31 11:22:06,828 INFO Demo1ServerHandler - 服务端与客户端连接打开... 2010-12-31 11:22:06,843 INFO Demo1ServerHandler - 服务端接收到的数据为:GET/ HTTP/1.1 2010-12-31 11:22:06,843 INFO Demo1ServerHandler - 服务端接收到的数据为:Accept: image/gif, image/x-xbitmap, image/jpeg, image/pjpeg, application/x-shockwave-flash, application/vnd.ms-excel, application/msword, application/vnd.ms-powerpoint, */* 2010-12-31 11:22:06,859 INFO Demo1ServerHandler - 服务端接收到的数据为: Accept-Language: zh-cn 2010-12-31 11:22:06,859 INFO Demo1ServerHandler - 服务端接收到的数据为: Accept-Encoding: gzip, deflate 2010-12-31 11:22:06,875 INFO Demo1ServerHandler - 服务端接收到的数据为: User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1) 2010-12-31 11:22:06,875 INFO Demo1ServerHandler - 服务端接收到的数据为:Host: 127.0.0.1:3005 2010-12-31 11:22:06,875 INFO Demo1ServerHandler - 服务端接收到的数据为: Connection: Keep-Alive 2010-12-31 11:22:06,875 INFO Demo1ServerHandler - 服务端接收到的数据为: 2010-12-31 11:22:06,875 INFO Demo1ServerHandler - 服务端发送信息成功... 2010-12-31 11:22:06,890 ERROR Demo1ServerHandler - 服务端发送异常... org.apache.mina.common.WriteToClosedSessionException ………异常信息略……… d. 浏览器显示信息如下: 9 仔细看服务端的解析代码会发现,服务器根据 Windows 换行符(\r\n)把 http 请求的头文件给解析啦,并且正确的返回给服务器一个日期字符串。如果我们能 根据 Http协议的解析方式在服务端的业务逻辑中正确解析请求,并且添加上线 程池多线程处理请求,这就是一个基于 Http请求的解析容器啦! Mina够强大吧! 第五步.客户端程序 Mina 能做服务端程序,自然也可以做客户端程度啦。而且可喜的是,客户 端程序和服务端程序写法基本一致,很简单的。 客户端代码: public class MinaClient01 { private static Logger logger = Logger.getLogger(MinaClient01.class); private static String HOST = "127.0.0.1"; private static int PORT = 3005; public static void main(String[] args) { // 创建一个非阻塞的客户端程序 IoConnector connector = new NioSocketConnector(); // 创建连接 // 设置链接超时时间 connector.setConnectTimeout(30000); // 添加过滤器 connector.getFilterChain().addLast( //添加消息过滤器 "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); // 添加业务逻辑处理器类 connector.setHandler(new Demo1ClientHandler());// 添加业务处理 IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST,PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session 10 session.write("我爱你mina");// 发送消息 } catch (Exception e) { logger.error("客户端链接异常...", e); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); } } 和服务端代码极其相似,不同的是服务端是创建 NioSocketAcceptor 对象, 而客户端是创建 NioSocketConnector 对象;同样需要添加编码解码过滤器和业 务逻辑过滤器; 业务逻辑过滤器代码: public class Demo1ClientHandler extends IoHandlerAdapter { private static Logger logger = Logger.getLogger(Demo1ClientHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString(); logger.info("客户端接收到的信息为:" + msg); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("客户端发生异常...", cause); } } 它和服务端的业务逻辑处理类一样,继承了 IoHandlerAdapter 类,因此同 样可以覆盖父类的 7个方法,同样最关心 messageReceived 方法,这里的处理是 接收打印了服务端返回的信息;另一个覆盖的方法是异常信息捕获的方法; 测试服务端与客户端程序! a. 启动服务端,然后再启动客户端(客户端发送的消息是“我爱你 mina”) b. 服务端接收消息并处理成功; 客户端接收响应结果 11 第六步.长连接 VS短连接 此时,查看 windows 的任务管理器,会发现:当前操作系统中启动了 3个 java进程(注意其中一个进程是 myEclipse 的)。 我们知道,java 应用程序的入口是 main()方法,启动一个 main()方法相当 于开始运行一个 java 应用程序,此时会运行一个 Java虚拟机,操作系统中会启 动一个进程,就是刚刚看到的“javaw.exe”。也就是每启动一个 Java 应用程序 就是多出一个 Java进程。因为启动了 Mina服务端和客户端 2个服务端程序,所 有其他 2个进程的出现。 测试一下:再次启动一个客户端程序,查看任务管理器,会发现进程又多出 一个,这个是刚刚启动的客户端进程,它和前一个客户端进程一直存在。这就是 一个典型的长连接。 长连接的现象在网络中非常普遍,比如我们的 QQ客户端程序,登录成功后与 腾讯的服务器建立的就是长连接;除非主动关闭掉 QQ客户端,或者是 QQ服务端 挂了,才会断开连接;看我们的服务端程序,就有关闭连接的条件:如果客户端 发送信息“bye”,服务端就会主动断开连接! @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString(); logger.info("服务端接收到的数据为:" + msg); if ("bye".equals(msg)) {// 服务端断开连接的条件 session.close(); } Date date = new Date(); session.write(date); } 与长连接相对应的是短连接,比如常说的请求/响应模式(HTTP 协议就是典 型的请求/响应模式)-----客户端向服务端发送一个请求,建立连接后,服务端 处理并响应成功,此时就主动断开连接了! 短连接是一个简单而有效的处理方式,也是应用最广的。Mina是Java NIO 实现的应用框架,更倾向于短连接的服务;问题是哪一方先断开连接呢?可以在 服务端,也可以在客户端,但是提倡在服务端主动断开; Mina 的服务端业务逻辑处理类中有一个方法 messageSent,他是在服务端发 送信息成功后调用的: 12 @Override public void messageSent(IoSession session, Object message) throws Exception { logger.info("服务端发送信息成功..."); } 修改后为 @Override public void messageSent(IoSession session, Object message) throws Exception { session.close(); //发送成功后主动断开与客户端的连接 logger.info("服务端发送信息成功..."); } 这时候客户端与服务端就是典型的短连接了;再次测试,会发现客户端发送 请求,接收成功后就自动关闭了,进程只剩下服务端了! 到此为止,我们已经可以运行一个完整的基于 TCP/IP 协议的应用程序啦! 总结: 服务端程序或客户端程序创建过程: 创建连接---����添加消息过滤器(编码解码等)——>添加业务处理 第七步.入门程序 02 完成了第一个简单的应用程序,是不很有成就感呀!现在就用一个具体应用 案例来热身。 案例: 模拟移动公司收发短信的功能! 短信发送和接收是最广泛的手机操作功能。收发短信有两个对象参与:手机 和移动短信服务器(简称服务器);发送短信时候,手机是客户端,服务器是服 务端,手机主动向服务器发送请求;接收短信时候,手机是服务端,服务器是客 户端,服务器主动向手机发送请求。手机和服务器互为服务端和客户端。 我们先模拟一个手机向服务器发送短信的例子! 实现: 1. 短信有最少有 3部分组成:发送人手机号码,接收人手机号码,短信内容; (其他如发送时间/接收时间等等,作为演示就暂时不考虑啦) 发送的内容以字符串的形式发送,格式如下: 发送人号码;接收人号码;短信内容(文本换行符) 我们可以仍然使用 Mina自带的根据文本换行符编解码的方式接收和发送信 息。 2. 模拟手机客户端发短信 仔细看看客户端代码,基本和第一个例子一模一样呀,就是需要制定黄色标 识部分:创建客户端,指定过滤器,指定业务逻辑处理器,写发送的信息。 import java.net.InetSocketAddress; import java.nio.charset.Charset; 13 import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dvn.li.handler.Demo1ClientHandler; /* * 模拟手机发短信 */ public class TestClient03 { private static Logger logger = Logger.getLogger(TestClient03.class); private static String HOST = "127.0.0.1"; private static int PORT = 3005; public static void main(String[] args) { // 创建一个非阻塞的客户端程序 IoConnector connector = new NioSocketConnector(); // 设置链接超时时间 connector.setConnectTimeout(30000); // 设置过滤器 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); // 添加业务逻辑处理器类 connector.setHandler(new Demo1ClientHandler()); IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST,PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session String sendPhone = "13681803609";// 当前发送人的手机号码 String receivePhone = "13721427169";// 接收人手机号码 String message = "测试发送短信,这个是短信信息哦,当然长度是有限制的 哦...."; 14 String msg = sendPhone + ";" + receivePhone + ";" + message; session.write(msg);// 发送给移动服务端 } catch (Exception e) { logger.error("客户端链接异常...", e); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); } } 业务逻辑处理类,这个是重点!业务很简单,如果服务器接收信息成功,就 返回接收信息成功的信息。在今后用 mina 写应用程序时,它是中的之一。 import org.apache.log4j.Logger; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; public class Demo1ClientHandler extends IoHandlerAdapter { private static Logger logger = Logger.getLogger(Demo1ClientHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String msg = message.toString(); logger.info("客户端接收到的信息为:" + msg); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("客户端发生异常...", cause); } } 3. 模拟移动短信服务器收短信 服务端没啥,和第一个例子一样,只需要指定三个地方:创建服务端,指定 过滤器,指定业务逻辑处理器。 import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.textline.LineDelimiter; 15 import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.dvn.li.handler.Demo3ServerHandler; public class TestServer03 { private static Logger logger = Logger.getLogger(TestServer03.class); private static int PORT = 3005; public static void main(String[] args) { IoAcceptor acceptor = null; try { // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 设置过滤器 acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); // 获得IoSessionConfig对象 IoSessionConfig cfg = acceptor.getSessionConfig(); // 读写通道10秒内无操作进入空闲状态 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100); // 绑定逻辑处理器 acceptor.setHandler(new Demo3ServerHandler()); // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); logger.info("服务端启动成功... 端口号为:" + PORT); } catch (Exception e) { logger.error("服务端启动异常....", e); e.printStackTrace(); } } } 业务逻辑处理类,今后写代码的重点之一。逻辑很简单:解析客户端发来的 短信信息,任何做处理,并告诉手机发送成功啦! import org.apache.log4j.Logger; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; public class Demo3ServerHandler extends IoHandlerAdapter { 16 public static Logger logger = Logger.getLogger(Demo3ServerHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { String phoneMes = message.toString(); String[] megs=phoneMes.split(";"); String sendPhone = megs[0]; String receivePhone = megs[1]; String mes = megs[2]; logger.info("发送人手机号码:" + sendPhone); logger.info("接受人手机号码:" + receivePhone); logger.info("发送信息:" + mes); // 短信信息存入移动服务端数据库或者写入手机短信转发队列 //............ session.write("发送成功!");// 告诉手机发送信息成功啦 } @Override public void messageSent(IoSession session, Object message) throws Exception { session.close(); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("服务端发送异常...", cause); } } 4. 运行测试 先启动服务端,然后启动手机客户端,测试成功! 服务器打印: 2010-12-30 16:42:27,156 INFO TestServer03 - 服务端启动成功... 端口号为: 3005 2010-12-30 16:43:01,140 INFO Demo3ServerHandler - 发送人手机号码:13681803609 2010-12-30 16:43:01,140 INFO Demo3ServerHandler - 接受人手机号码:13721427169 2010-12-30 16:43:01,140 INFO Demo3ServerHandler - 发送信息:测试发送短信,这个 是短信信息哦,当然长度是有限制的哦.... 手机客户端打印: 2010-12-30 16:43:01,171 INFO Demo1ClientHandler - 客户端接收到的信息为:发送成 功! 17 注意事项: 客户端发送信息: String sendPhone="13681803609";// 当前发送人的手机号码 String receivePhone="13721427169";// 接收人手机号码 String message="测试发送短信,这个是短信信息哦,当然长度是有限制的 哦...."; String msg=sendPhone+";"+receivePhone+";"+message; session.write(msg);// 发送给移动服务端 此时,调用 session.write(msg)时,我们没有在短信的末尾加上文化换行 符,也不需要添加文本换行符,这些工作是由我们指定的编解码器完成的,他会 自动把发送的信息先转换为底层传输的二进制代码,然后在默认添加上文本换行 符的。 服务端接收信息: public void messageReceived(IoSession session, Object message) throws Exception { String phoneMes = message.toString(); String[] megs=phoneMes.split(";"); String sendPhone = megs[0]; String receivePhone = megs[1]; String mes = megs[2]; logger.info("发送人手机号码:" + sendPhone); logger.info("接受人手机号码:" + receivePhone); logger.info("发送信息:" + mes); 此时,messageReceived()方法就是信息接收到时候调用的,它的参数 Object message 就是手机发送来的信息,但是不是底层的二进制代码了,而且 末尾没有文本换行符了,已经是 java字符串了,因为它被指定的解码器解码啦。 最后: 发送消息,session.write(msg)后,先编码再发送! 接收消息,messageReceived()前,会先解码的。 具体的操作细节,参考第二章 Mina基础部分。 第八步.入门程序 03 仍然研究上面的例子。手机发短信和服务端接收短信时,我们发送的是一个 字符串,它是用“;”号隔开的三部分信息,需要在手机客户端拼装字符串,在 服务端解析字符串;是否可以直接发送一个 java对象呢?当然可以,而且 Mina 已经提供了这个编解码过滤器:ObjectSerializationCodecFactory()接口搞定 一切!不需要你关系对象的序列号问题,它已经实现好啦! 1. 短信对象 把发送短信指定成一个 java 对象,这样就很有面向对象的操作啦。 import java.io.Serializable; /* * 手机短信发送DTO 18 */ public class PhoneMessageDto implements Serializable { private String sendPhone;// 发送人手机号 private String receivePhone;// 接收人手机号 private String message;// 短信信息 public String getSendPhone() { return sendPhone; } public void setSendPhone(String sendPhone) { this.sendPhone = sendPhone; } public String getReceivePhone() { return receivePhone; } public void setReceivePhone(String receivePhone) { this.receivePhone = receivePhone; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } } 2. 模拟手机客户端发短信 实在不想写客户端了,基本没变化: /* * 模拟手机发短信 */ public class TestClient { private static Logger logger = Logger.getLogger(TestClient.class); private static String HOST = "127.0.0.1"; private static int PORT = 3005; public static void main(String[] args) { // 创建一个非阻塞的客户端程序 IoConnector connector = new NioSocketConnector(); 19 // 设置链接超时时间 connector.setConnectTimeout(30000); // 设置过滤器 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); // 添加业务逻辑处理器类 connector.setHandler(new Demo1ClientHandler()); IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST,PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session PhoneMessageDto sendMes = new PhoneMessageDto(); sendMes.setSendPhone("13681803609");// 当前发送人的手机号码 sendMes.setReceivePhone("13721427169");// 接收人手机号码 sendMes.setMessage("测试发送短信,这个是短信信息哦,当然长度是有限制 的哦...."); session.write(sendMes);// 发送给移动服务端 } catch (Exception e) { logger.error("客户端链接异常...", e); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); } } 客户端处理类没变化,略…. 3. 模拟移动短信服务器收短信 服务端也基本没啥变化: public class TestServer { private static Logger logger = Logger.getLogger(TestServer.class); private static int PORT = 3005; public static void main(String[] args) { IoAcceptor acceptor = null; try { // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 直接发送对象 acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( 20 new ObjectSerializationCodecFactory())); // 获得IoSessionConfig对象 IoSessionConfig cfg = acceptor.getSessionConfig(); // 读写通道10秒内无操作进入空闲状态 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100); // 绑定逻辑处理器 acceptor.setHandler(new DemoServerHandler()); // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); logger.info("服务端启动成功... 端口号为:" + PORT); } catch (Exception e) { logger.error("服务端启动异常....", e); e.printStackTrace(); } } } 业务处理类,这个是重点,我们不需要解析字符串了,接收到的数据直接强 制转换为对象就可以! import org.apache.log4j.Logger; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import com.dvn.li.message.PhoneMessageDto; public class DemoServerHandler extends IoHandlerAdapter { public static Logger logger = Logger.getLogger(DemoServerHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { PhoneMessageDto phoneMes = (PhoneMessageDto) message; String sendPhone = phoneMes.getSendPhone(); String receivePhone = phoneMes.getReceivePhone(); String mes = phoneMes.getMessage(); logger.info("发送人手机号码:" + sendPhone); logger.info("接受人手机号码:" + receivePhone); logger.info("发送信息:" + mes); // 短信信息存入移动服务端数据库 //............ session.write("发送成功!"); } 21 @Override public void messageSent(IoSession session, Object message) throws Exception { session.close(); } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("服务端发送异常...", cause); } } 4. 运行测试 啥都不说了,直接运行测试吧,成功的! 其他: 服务端主动给手机发短信的 demo 就不写了,因为到此为止它太 easy啦! 通过上面的练习,基本可以使用 Mina开发小的应用程序啦。但是,如果服 务端和客户端使用不同的语言实现(比如客户端用 C写),则使用 String 或者 Java对象传输数据是不可行的,必须自定义协议,以报文的形式发送二进制数 据。下面就研究 Mina的基础知识;当然,我也是初学者,更多的东西,需要多 查看 mina的官方教程和 API-----它的资料非常丰富! 下面第二章请重点看 IoFilter 接口,它是 Mina的精髓,最好能研究下它的 TextLineCodecFactory()对象的实现源码。 二. Mina 基础 Mina 使用起来多么简洁方便呀,就是不具备 Java NIO的基础,只要了解了 Mina常用的 API,就可以灵活使用并完成应用开发。 首先,看 Mina在项目中所处的位置,如下图: Mina 处于中间层,它不关心底层网络数据如何传输,只负责接收底层数据, 过滤并转换为 Java 对象提供给我们的应用程序,然后把应用程序响应值过滤并 转换为底层识别的字节,提供给底层传输; ------总之:Mina 是底层数据传输和用户应用程序交互的接口! 22 Mina工作流程图如下: 这个流程图不仅很直观的看出了 Mina 的工作流程,也涵盖了 Mina的三个核 心接口:IoService 接口,IoFilter 接口和 IoHandler 接口: � 第一步. 创建服务对象(客户端或服务端) ---IoService 接口实现 � 第二步. 数据过滤(编码解码等) ---IOFilter 接口实现 � 第三步. 业务处理 ---IoHandler 接口实现 Mina的精髓是 IOFilter,它可以进行日志记录,信息过滤,编码解码等操作, 把数据接收发送从业务层独立出来。 创建服务对象,则是把 NIO繁琐的部分进行封装,提供简洁的接口。 业务处理是我们最关心的部分,跟普通的应用程序没任何分别。 2.1.IoService 接口 作用:IoService是创建服务的顶层接口,无论客户端还是服务端,都是从它继 承实现的。 2.1.1 类结构 常用接口为:IoService,IoAcceptor,IoConnector 常用类为:NioSocketAcceptor,NioSocketConnector 类图如下: 23 IoService IoAcceptor IoConnectorAbstractIoService AbstractIoAcceptor AbstractIoConnector 先提出两个问题: 1. 为什么有了 IoService接口还要定义 AbstractIoService 抽象类? 2. AbstractIoService 抽象类与 IoAcceptor(IoConnector)有什么区别? 分析: � IoService接口声明了服务端的共有属性和行为; � IoAcceptor 接口继承了 IoService 接口,并添加了服务端特有的接口属 性及方法,比如 bind()方法,成为典型的服务端接口; � IoConnector 接口同样继承了 IoService 接口,并添加了客户端特有的 接口属性及方法,比如 connect()方法,成为典型的客户端接口; ---- IoService 是IoAcceptor 和IoConnector 父接口,为什么不直接定 义IoAcceptor 和IoConnector 接口呢,因为它们有共同的特点,比如共同属性, 管理服务的方法等,所有 IoService 的出现是为了代码复用。 � AbstractIoService 实现了IoService 中管理服务的方法,比如 getFilterChainBuilder 方法---获得过滤器链; ----为什么有了 IoService 接口还要定义 AbstractIoService 抽象类?一样 为了代码的复用!AbstractIoService 抽象类实现了服务端或客户端的共有 的管理服务的方法,不需要让 IoService接口的子类重复的实现这些方法; � AbstractIoService 抽象类继承了 AbstractIoService 抽象类并实现了 IoAcceptor 接口,成为了拥有管理服务端实现功能的服务端类;我们常用的 NioSocketAcceptor 就是它的子类; � AbstractIoConnector 抽象类继承了 AbstractIoService 抽象类并实现了 IoConnector 接口,成为了拥有管理客户端实现功能的客户端类;我们常用 的NioSocketConnector 就是它的子类; ----AbstractIoService 抽象类与 IoAcceptor(IoConnector)有什么区别? 很清楚,AbstractIoService 抽象类实现的是共有的管理服务的方法,只有管理 功能的一个类;而两个接口却是不同的两个服务角色----一个客户端,一个服务 端。 2.1.2 应用 24 在实际应用中,创建服务端和客户端的代码很简单: 创建服务端: IoAcceptor acceptor = null; try { // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); 创建客户端: // 创建一个非阻塞的客户端程序 IoConnector connector = new NioSocketConnector(); 而我们常常关心的就是服务端和客户端的一些参数信息: 1.IoSessionConfig getSessionConfig() 获得IoSession 的配置对象 IoSessionConfig,通过它可以设置 Socket 连 接的一些选项。 a. void setReadBufferSize(int size) 这个方法设置读取缓冲的字节数,但一般不需要调用这个方法,因为 IoProcessor 会自动调整缓冲的大小。你可以调用 setMinReadBufferSize()、 setMaxReadBufferSize()方法,这样无论 IoProcessor 无论如何自动调整,都 会在你指定的区间。 b. void setIdleTime(IdleStatus status,int idleTime): 这个方法设置关联在通道上的读、写或者是读写事件在指定时间内未发生, 该通道就进入空闲状态。一旦调用这个方法,则每隔 idleTime 都会回调过滤器、 IoHandler 中的 sessionIdle()方法。 c. void setWriteTimeout(int time): 这个方法设置写操作的超时时间。 d. void setUseReadOperation(boolean useReadOperation): 这个方法设置 IoSession 的read()方法是否可用,默认是 false。 // 获得IoSessionConfig对象 IoSessionConfig cfg=acceptor.getSessionConfig(); // 设置读取数据的缓冲区大小() cfg.setReadBufferSize(2048); // 读写通道10秒内无操作进入空闲状态 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10); // 写操作超时时间10秒 cfg.setWriteTimeout(10); 2.DefaultIoFilterChainBuilder getFilterChain() 获得过滤器链,由此来配置过滤器;非常核心的一个配置!(过滤器是 Mina 的核心,我们重点要学习的就是它,就它参见 2.2 IoFilter) // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 设置日志过滤器 acceptor.getFilterChain().addLast( "logger", new LoggingFilter()); 25 // 设置过滤器(使用Mina提供的文本换行符编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); 3.setHandler(IoHandler handler); 向IoService 注册 IoHandler 进行业务处理。这是服务(无聊客户端还是服 务端)必不可少的配置; // 添加业务逻辑处理器类 connector.setHandler(new Demo1ClientHandler()); 4.其他配置 服务端必须指定绑定的端口号: // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); logger.info("服务端启动成功... 端口号为:" + PORT); 客户端必须指定请求的服务器地址和端口号:(该方法是异步执行的) ConnectFuture future = connector.connect(new InetSocketAddress( HOST,PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session session.write("我爱你mina");// 发送消息 5.关闭客户端 因为客户端的连接是异步的,所有必须先连接上服务端获得了 session 才能 通信;同时,一旦需要关闭,必须指定 disponse()方法关闭客户端,如下: // 添加业务逻辑处理器类 connector.setHandler(new Demo1ClientHandler()); IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST,PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session session.write("我爱你mina");// 发送消息 } catch (Exception e) { logger.error("客户端链接异常...", e); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); 26 这是 Mina2 的处理方式,但在 Mina1.1.7 中,必须使用 setWorkerTimeout() 方法关闭客户端: // 在关闭客户端前进入空闲状态的时间为1秒 //Set how many seconds the connection worker thread should remain alive once idle before terminating itself. connector.setWorkerTimeout(1); 总结 IoService是创建服务端和客户端的接口,实际应用中我们大多都是应用它 的实现类来创建服务对象; 但是,在开发中你最关心的是你究竟处于哪个位置----说直白一点就是:你 究竟是开发客户端还是服务端?比如经常跟银行打交道的朋友都知道,开发中往 往银行是服务端,我们要和它打交道就是要知道银行服务端的 IP,端口号和请 求格式,写一个客户端来请求数据; 但有一点可以肯定:服务端的难度比客户端大!服务端首先要制定协议,其 次是考虑并发量;这些都不是很轻松就可以搞定滴。 IoService就这些啦,下面的 IoFilter接口是Mina的精髓,要关键学习。 2.2.IoFilter 接口 Mina 最主要的工作就是把底层传输的字节码转换为 Java 对象,提供给应用 程序;或者把应用程序返回的结果转换为字节码,交给底层传输。这些都是由 IoFilter 完成的,因此 IoFilter 是Mina的精髓所在。 在Mina程序中,IoFilter 是必不可少的;有了它,Mina的层次结构才异常 清晰: IoFilter ---- 消息过滤 IoHandler ---- 业务处理 Filter,过滤器的意思。IoFilter,I/O操作的过滤器。IoFilter和Servlet 中的过滤器一样,主要用于拦截和过滤网络传输中I/O操作的各种消息。在Mina 的官方文档中已经提到了IoFilter 的作用: (1)记录事件的日志(Mina默认提供了LoggingFilter) (2)测量系统性能 (3)信息验证 (4)过载控制 (5)信息的转换(主要就是编码和解码) (6)和其他更多的信息 IoService 实例会绑定一个 DefaultIoFilterChainBuilder ---- 过滤器链, 我们把自定义的各种过滤器(IoFilter)自由的插放在这个过滤器链上了,类似 于一种可插拔的功能! 2.2.1 类结构 常用接口为:IoFilter,IoFilterChainBuilder 常用类为:IoFilterAdapter,DefaultIoFilterChainBuilder ProtocolCodecFilter,LoggingFilter 类图如下: 27 IoFilter IoFilterAdapter ReferenceCountingFilter ExecutorFilter LoggingFilter ProtocolCodecFilter 同上面,先提出两个问题: 1. 在IoService 中如何添加多个 IoFilter? 2. 如何自定义协议编解码器? 分析: a. IoFilter 有2个实现类:IoFilterAdapter 是个抽象的适配器类,我们可以 根据需要扩展这个类,并且有选择的覆盖过滤器的方法;所有方法的默认把事件 转发到下一个过滤器;查看源码如下: public void sessionOpened(NextFilter nextFilter, IoSession session)throws Exception { nextFilter.sessionOpened(session); } b .ReferenceCountingFilter 封装了 IoFilter 实例,监看调用该 filter 的对 象的个数,如果没有任何对象调用该 IoFilter,就自动销毁 IoFilter;查看源 码如下: public class ReferenceCountingFilter implements IoFilter { private final IoFilter filter; private int count = 0; public ReferenceCountingFilter(IoFilter filter) { this.filter = filter; } public void init() throws Exception { // no-op, will init on-demand in pre-add if count == 0 } public void destroy() throws Exception { }……………略 c. 实现 IoFilterAdapter 的类有多个,但是我们使用最多的就是 28 ProtocolCodecFilter----它是我们自定义编解码器的入口。 2.2.2 应用 我们在应用中解释上面提述的两个问题! 添加过滤器 ----在IoService 中如何添加多个 IoFilter?如何代码,我添加了 2个过 滤器:LoggingFilter 和TextLineCodecFactory(源码为入门的服务端程序) // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 设置日志过滤器 acceptor.getFilterChain().addLast("logger",new LoggingFilter()); // 设置过滤器(使用Mina提供的文本换行符编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); // 获得IoSessionConfig对象 IoSessionConfig cfg = acceptor.getSessionConfig(); // 读写通道10秒内无操作进入空闲状态 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 10); // 绑定逻辑处理器 acceptor.setHandler(new Demo1ServerHandler()); // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); logger.info("服务端启动成功... 端口号为:" + PORT); 运行主程序: 执行 telnet 127.0.0.1 3005,输入 a,回车,后台打印信息如下: 2010-12-16 16:39:27,937 INFO TestServer01 - 服务端启动成功... 端口号为: 3005 2010-12-16 16:39:31,328 INFO LoggingFilter -CREATED 2010-12-16 16:39:31,328 INFO Demo1ServerHandler - 服务端与客户端创建连接... 2010-12-16 16:39:31,328 INFO LoggingFilter -OPENED 2010-12-16 16:39:31,328 INFO Demo1ServerHandler - 服务端与客户端连接打开... 2010-12-16 16:39:32,296 INFO LoggingFilter -RECEIVED: HeapBuffer[pos=0 lim=1 cap=2048: 61] 2010-12-16 16:39:32,718 INFO LoggingFilter -RECEIVED: HeapBuffer[pos=0 lim=2 cap=2048: 0D 0A] 2010-12-16 16:39:32,734 INFO Demo1ServerHandler - 服务端接收到的数据为:a 2010-12-16 16:39:32,750 INFO LoggingFilter -SENT: HeapBuffer[pos=0 lim=30 cap=31: 54 68 75 20 44 65 63 20 31 36 20 31 36 3A 33 39...] 2010-12-16 16:39:32,750 INFO LoggingFilter -SENT: HeapBuffer[pos=0 lim=0 cap=0: 29 empty] 2010-12-16 16:39:32,750 INFO Demo1ServerHandler - 服务端发送信息成功... 2010-12-16 16:39:32,750 INFO LoggingFilter -CLOSED 注意:LoggerFilter 的日志(红色部分) 修改代码,交换 LoggingFilter 和TextLineCodecFactory 的位置,如下所示: // 设置过滤器(使用Mina提供的文本换行符编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); // 设置日志过滤器 acceptor.getFilterChain().addLast("logger",new LoggingFilter()); 启动服务端,执行 telnet 127.0.0.1 3005,后台打印信息如下: 2010-12-16 16:41:36,125 INFO TestServer01 - 服务端启动成功... 端口号为: 3005 2010-12-16 16:41:38,296 INFO LoggingFilter -CREATED 2010-12-16 16:41:38,296 INFO Demo1ServerHandler - 服务端与客户端创建连接... 2010-12-16 16:41:38,296 INFO LoggingFilter -OPENED 2010-12-16 16:41:38,296 INFO Demo1ServerHandler - 服务端与客户端连接打开... 2010-12-16 16:41:39,296 INFO LoggingFilter -RECEIVED: a 2010-12-16 16:41:39,296 INFO Demo1ServerHandler - 服务端接收到的数据为:a 2010-12-16 16:41:39,328 INFO LoggingFilter -SENT: Thu Dec 16 16:41:39 CST 2010 2010-12-16 16:41:39,328 INFO Demo1ServerHandler - 服务端发送信息成功... 2010-12-16 16:41:39,328 INFO LoggingFilter -CLOSED 对比上下日志,会发现,如果 LoggingFilter 在编码器前,它会在编码器处 理前打印请求值和返回值的二进制信息,在编码器之后就不会打印! 在FilterChain 中都是 addLast()的方式添加在过滤链的最后面,这时候, 把那个过滤器放在前面,就会先执行那个过滤器! 同addLast()方法一样,还提供了 addFirst(),addBefore()等方法供 使用。此时,就不难知道如何添加过滤器了吧!它们的顺序如何,就看你的设置 的位置了! 同时发现,日志过滤器是根据IoSession的状态(创建、开启、发送、接收、 异常等等)来记录会话的事件信息的!这对我们跟踪IoSession很有用。当地,也 可以自定义logger的日志级别,定义记录那些状态的日志。比如: // 设置日志过滤器 LoggingFilter lf=new LoggingFilter(); lf.setMessageReceivedLogLevel(LogLevel.DEBUG); acceptor.getFilterChain().addLast("logger",lf); 自定义编解码器 ----如何自定义协议编解码器? 协议编解码器是在使用 Mina 的时候最需要关注的对象,因为网络传输的数 30 据都是二进制数据(byte),而在程序中面向的是 JAVA 对象,这就需要在发送 数据时将 JAVA 对象编码二进制数据,接收数据时将二进制数据解码为 JAVA 对 象。 编解码器同样是以过滤器的形式安插在过滤器链上,如下所示: // 设置过滤器(使用Mina提供的文本换行符编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new TextLineCodecFactory(Charset .forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()))); 协议编解码器是通过 ProtocolCodecFilter 过滤器构造的,看它的构造方 法,它需要一个 ProtocolCodecFactory 对象: public ProtocolCodecFilter(ProtocolCodecFactory factory) { if (factory == null){ throw new NullPointerException("factory"); } this.factory = factory; } ProtocolCodecFactory 接口非常直接,通过 ProtocolEncoder 和 ProtocolDecoder 对象来构建! public interface ProtocolCodecFactory { /** * Returns a new (or reusable) instance of {@link ProtocolEncoder} which * encodes message objects into binary or protocol-specific data. */ ProtocolEncoder getEncoder(IoSession session) throws Exception; /** * Returns a new (or reusable) instance of {@link ProtocolDecoder} which * decodes binary or protocol-specific data into message objects. */ ProtocolDecoder getDecoder(IoSession session) throws Exception; } ProtocolEncoder 和ProtocolDecoder 接口是 Mina 负责编码和解码的顶级 接口! 编码和解码的前提就是协议的制定:比如上面我们使用的 Mina自带的根据 文本换行符解码的 TextLineCodecFactory(),如果遇到文本换行符就开始编解 码! 为什么要制定协议呢?常用的协议制定方法有哪些? 我们知道,底层传输的都是二进制数据,服务端和客户端建立连接后进行数 据的交互,接受这对方发送来的消息,如何判定发送的请求或者响应的数据结束 了呢?总不能一直傻等着,或者随意的就结束消息接收吧。这就需要一个规则! 比如 QQ聊天工具,当输入完一个消息后,点击发送按钮向对方发送时,此时系 31 统就会在在你的消息后添加一个文本换行符,接收方看到这个文本换行符就认为 这是一个完整的消息,解析成字符串显示出来。而这个规则,就称之为协议! 制定协议的方法: � 定长 消息 法 定长 消息 法 定长 消息 法 定长 消息 法 :这种方式是使用长度固定的数据发送,一般适用于指令发 送。譬如:数据发送端规定发送的数据都是双字节,AA 表示启动、BB 表 示关闭等等。 � 字符 定界 法 字符 定界 法 字符 定界 法 字符 定界 法 :这种方式是使用特殊字符作为数据的结束符,一般适用于 简单数据的发送。譬如:在消息的结尾自动加上文本换行符(Windows 使用\r\n,Linux 使用\n),接收方见到文本换行符就认为是一个完整的 消息,结束接收数据开始解析。注意:这个标识结束的特殊字符一定要 简单,常常使用 ASCII 码中的特殊字符来标识。 � 定长 报文 头法 定长 报文 头法 定长 报文 头法 定长 报文 头法 :使用定长报文头,在报文头的某个域指明报文长度。该 方法最灵活,使用最广。譬如:协议为 – 协议编号(1字节)+数据长 度(4个字节)+真实数据。请求到达后,解析协议编号和数据长度,根 据数据长度来判断后面的真实数据是否接收完整。HTTP 协议的消息报 头中的 Content-Length 也是表示消息正文的长度,这样数据的接收端 就知道到底读到多长的字节数就不用再读取数据了。 根据协议,把二进制数据转换成 Java 对象称为解码(也叫做拆包);把Java 对象转换为二进制数据称为编码(也叫做打包); 我们这里重点讲解下后面两个协议的具体使用! IoBuffer常用方法: Mina中传输的所有二进制信息都存放在 IoBuffer 中,IoBuffer 是对Java NIO中ByteBuffer 的封装(Mina2.0 以前版本这个接口也是 ByteBuffer),提供 了更多操作二进制数据,对象的方法,并且存储空间可以自增长,用起来非常方 便;简单理解,它就是个可变长度的 byte 数组! 1. static IoBuffer allocate(int capacity,boolean useDirectBuffer) 创建 IoBuffer 实例,第一个参数指定初始化容量,第二个参数指定使用直 接缓冲区还是 JAVA 内存堆的缓存区,默认为 false。 2.IoBuffer setAutoExpand(boolean autoExpand) 这个方法设置 IoBuffer 为自动扩展容量,也就是前面所说的长度可变,那 么可以看出长度可变这个特性默认是不开启的。 3. IoBuffer flip() limit=position , position=0,重置 mask,为了读取做好准备,一般是结 束buf操作,将 buf写入输出流时调用;这个必须要调用,否则极有可能 position!=limit,导致 position 后面没有数据;每次写入数据到输出流时,必 须确保 position=limit。 4. IoBuffer clear()与IoBuffer reset() clear:limit=capacity , position=0,重置mark;它是不清空数据,但从 头开始存放数据做准备---相当于覆盖老数据。 reset就是清空数据 5. int remaining()与boolean hasRemaining() 32 这两个方法一般是在调用了flip()后使用的,remaining()是返回 limt-position的值!hasRemaining()则是判断当前是否有数据,返回position < limit的boolean值! Demo1:模拟根据文本换行符编解码 第一步:编写解码器 实现 ProtocolDecoder 接口,覆盖 decode()方法; import java.nio.charset.Charset; import org.apache.mina.common.IoBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class MyTextLineCodecDecoder implements ProtocolDecoder { private Charset charset = Charset.forName("UTF-8"); IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { while (in.hasRemaining()) { byte b = in.get(); buf.put(b); if (b == '\n'){ buf.flip(); byte[] msg = new byte[buf.limit()]; buf.get(msg); String message = new String(msg, charset); //解码成功,把buf重置 buf = IoBuffer.allocate(100).setAutoExpand(true); out.write(message); } } } public void dispose(IoSession session) throws Exception { } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } } 33 方法解释: public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { decode 方法的参数 IoBuffer 是建立连接后接收数据的字节数组;我们不断 的从它里面读数据,直到遇上\r\n就停止读取数据,把上面累加的所有数据转 换为一个字符串,输出! 第二步:编写编码器: 实现 ProtocolEncoder 接口,覆盖 encode()方法; import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import org.apache.mina.common.IoBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public class MyTextLineCodecEncoder implements ProtocolEncoder { private Charset charset = Charset.forName("UTF-8"); public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); CharsetEncoder ce = charset.newEncoder(); buf.putString(message.toString(), ce); // buf.put(message.toString().getBytes(charset)); buf.put((byte)'\r'); buf.put((byte)'\n'); buf.flip(); out.write(buf); } public void dispose(IoSession session) throws Exception { } } 实现很简单,把要编码的数据放进一个 IoBuffer 中,并在 IoBuffer 结尾添 加\r\n,输出。 第三步:编辑编解码器工厂类 实现ProtocolCodecFactory 接口,覆盖其 getDecoder()和getEncoder() 方法; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; 34 public class MyTextLineCodecFactory implements ProtocolCodecFactory { public ProtocolDecoder getDecoder(IoSession session) throws Exception { return new MyTextLineCodecDecoder(); } public ProtocolEncoder getEncoder(IoSession session) throws Exception { return new MyTextLineCodecEncoder(); } } 第四步:测试 到现在,一个简单的根据\r\n 换行符编解码的过滤器实现了;添加到一个 服务端中,测试: // 设置过滤器(使用Mina提供的文本换行符编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new MyTextLineCodecFactory())); 启动服务端,用 telnet 测试一把,成功编解码啦! 也可以把编解码器绑定和服务端和客户端,测试后也无疑是成功的! Demo2:改进 Demo1的代码 Demo1 作为一个简单的例子,虽然实现了根据\r\n换行符编解码的功能,但 是却存在以下问题: 1. 编解码器中编码类型 Charset 硬编码,不便调整; 2. 只能根据 Windows 的换行符\r\n解码,没有考虑其他操作系统的换行符,不 灵活; 3. 解码器中定义了成员变量 IoBuffer,但 Decoder 实例是单例的,因此Decoder 实例中的成员变量可以被多线程共享访问,可能会因为变量的可见性而造成 数据异常; 第3个bug是致命的,因此,必须首先解决; 为什么要定义成员变量 IoBuffer 呢?因为数据接收并不是一次完成的;比 如客户端发送一个请求有 400个字节,先发送了 200个字节,这时暂停某段时间, 然后又发送了剩余 200字节;在解码时,Decode 的IoBuffer 中先解码 200 个接 收到的字节,此时,解码工作并未完成;但因为使用了 java NIO,发生 IO阻塞 时会处理其他请求,此时就需要把先接收到的数据暂存在某个变量中,当剩余数 据到达时,解码后追加在原来数据后面; 这就是我们定义成员变量 IoBuffer 的理由! IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { 此时,问题出现了! 每个IoSession都需要有自己的解码器实例;MINA确保同一时刻只有一个线 35 程在执行decode() 函数——不允许多线程并发地执行解码函数,但它并不能保 证每次解码过程都是同一线程在执行(两次解码用的可能是不同的线程)。假设 第一块数据被线程1管理,这时还没接收到足够的数据以供解码,当接收到第二 块数据时,被另一个线程2管理,此时可能会出现变量的可视化(Visibility)问 题。 因此,每个IoSession都需要独立保存解码器所解码时未完成的数据。办法 就是保存在IoSession的属性中,每次解码时,都先从它的属性中拿出上次未完 成的任务数据,把新数据追加在它的后面; 源码如下: public class MyTextLineCodecDecoder implements ProtocolDecoder { private Charset charset = Charset.forName("utf-8"); // 定义常量值,作为每个IoSession中保存解码内容的key值 private static String CONTEXT = MyTextLineCodecDecoder.class.getName() + ".context"; public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); decodeAuto(ctx, in, out); } private Context getContext(IoSession session) { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null){ ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } private void decodeAuto(Context ctx, IoBuffer in, ProtocolDecoderOutput out) throws CharacterCodingException { boolean mark = false; while (in.hasRemaining()) { byte b = in.get(); switch (b) { case '\r': break; case '\n': mark = true; break;// 跳出switch default: ctx.getBuf().put(b); } 36 if (mark) { IoBuffer t_buf = ctx.getBuf(); t_buf.flip(); try { out.write(t_buf.getString(charset.newDecoder())); } finally { t_buf.clear(); } } } } public void dispose(IoSession session) throws Exception { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx != null){ session.removeAttribute(CONTEXT); } } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } private class Context { private IoBuffer buf; public Context() { buf = IoBuffer.allocate(100).setAutoExpand(true); } public IoBuffer getBuf() { return buf; } } } 代码解释: 1. 在解码器中定义一个内部类,内部类中有一个成员变量IoBuffer,用来存储 每个IoSesssion解码的内容; private class Context { private IoBuffer buf; public Context() { buf = IoBuffer.allocate(100).setAutoExpand(true); 37 } public IoBuffer getBuf() { return buf; } } 2. 当IoSession使用解码实例时,第一次使用则新建一个Context对象,保存在 IoSession的Attribute中,把解码内容保存在Context对象的成员变量 IoBuffer中;如果解码没结束,第二次使用解码实例时,从IoSession的 Attribute取出Context对象,保额解码内容追加在Context对象的成员变量 IoBuffer中; 注意:IoSession的Attribute使用用一个同步的HashMap 保存对象,因此定 义了常量CONTEXT作为保存Context对象的Key值; private static String CONTEXT = MyTextLineCodecDecoder.class.getName() + ".context"; public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); decodeAuto(ctx, in, out); } private Context getContext(IoSession session) { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null){ ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } 3. 解码时,解码内容保存在Context对象的成员变量IoBuffer中,因此,一旦解 码成功,要把成员变量IoBuffer重置;-----现在是请求/响应的单模式,不 存在一个请求过来发送了多条记录的情况,所有重置前其实IoBuffer缓存内 容已经为空; private void decodeAuto(Context ctx, IoBuffer in, ProtocolDecoderOutput out) throws CharacterCodingException { boolean mark = false; while (in.hasRemaining()) { byte b = in.get(); switch (b) { case '\r': break; case '\n': mark = true; 38 break;// 跳出switch default: ctx.getBuf().put(b); } if (mark) { IoBuffer t_buf = ctx.getBuf(); t_buf.flip(); try { out.write(t_buf.getString(charset.newDecoder())); } finally { t_buf.clear(); } } } } 4. 解码成功,则从IoSession的Attribute删除Context对象; public void dispose(IoSession session) throws Exception { Context ctx = (Context) session.getAttribute(CONTEXT); if (ctx != null){ session.removeAttribute(CONTEXT); } } 查看 Mina 对TextLineCodec 的实现源码会发现,根据换行符解码的消息最 大长度是受限制的,默认最大长度是 1024,相当于缓冲区最大能存放 1K的数据。 因此使用时,建议调整参数为 2K; 如果我们希望根据我们自己定义的文本换行符及编码格式编解码,则需要把 它们作为参数传递给编解码器;完整代码如下: 解码器: import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import org.apache.mina.common.IoBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolDecoderOutput; public class MyTextLineCodecDecoderII implements ProtocolDecoder { private Charset charset;// 编码格式 private String delimiter;// 文本分隔符 private IoBuffer delimBuf;// 文本分割符匹配的变量 39 // 定义常量值,作为每个IoSession中保存解码任务的key值 private static String CONTEXT = MyTextLineCodecDecoder.class.getName() + ".context"; // 构造函数,必须指定Charset和文本分隔符 public MyTextLineCodecDecoderII(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { Context ctx = getContext(session); if (delimiter == null || "".equals(delimiter)){// 如果文本换行符未指定, 使用默认值 delimiter = "\r\n"; } if (charset == null){ charset = Charset.forName("utf-8"); } decodeNormal(ctx, in, out); } // 从IoSession中获取Context对象 private Context getContext(IoSession session) { Context ctx; ctx = (Context) session.getAttribute(CONTEXT); if (ctx == null){ ctx = new Context(); session.setAttribute(CONTEXT, ctx); } return ctx; } // 解码 private void decodeNormal(Context ctx, IoBuffer in, ProtocolDecoderOutput out) throws CharacterCodingException { // 取出未完成任务中已经匹配的文本换行符的个数 int matchCount = ctx.getMatchCount(); // 设置匹配文本换行符的IoBuffer变量 if (delimBuf == null){ IoBuffer tmp = IoBuffer.allocate(2).setAutoExpand(true); 40 tmp.putString(delimiter, charset.newEncoder()); tmp.flip(); delimBuf = tmp; } int oldPos = in.position(); // 解码的IoBuffer中数据的原始信息 int oldLimit = in.limit(); while (in.hasRemaining()) {// 变量解码的IoBuffer byte b = in.get(); if (delimBuf.get(matchCount) == b) {// 匹配第matchCount位换行符成功 matchCount++; if (matchCount == delimBuf.limit()) {// 当前匹配到字节个数与文 本换行符字节个数相同,匹配结束 int pos = in.position(); // 获得当前匹配到的position (position前所有数据有效) in.limit(pos); in.position(oldPos); // position回到原始位置 ctx.append(in); // 追加到Context对象未完成数据后面 in.limit(oldLimit); // in中匹配结束后剩余数据 in.position(pos); IoBuffer buf = ctx.getBuf(); buf.flip(); buf.limit(buf.limit() - matchCount);// 去掉匹配数据中的文本 换行符 try { out.write(buf.getString(ctx.getDecoder())); // 输出解码 内容 } finally { buf.clear(); // 释放缓存空间 } oldPos = pos; matchCount = 0; } } else { // 如果matchCount==0,则继续匹配 // 如果matchCount>0,说明没有匹配到文本换行符的中的前一个匹配成 功字节的下一个字节, // 跳转到匹配失败字符处,并置matchCount=0,继续匹配 in.position(in.position()-matchCount); matchCount = 0; // 匹配成功后,matchCount置空 41 } } // 把in中未解码内容放回buf中 in.position(oldPos); ctx.append(in); ctx.setMatchCount(matchCount); } public void dispose(IoSession session) throws Exception { } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } // 内部类,保存IoSession解码时未完成的任务 private class Context { private CharsetDecoder decoder; private IoBuffer buf;// 保存真实解码内容 private int matchCount = 0; // 匹配到的文本换行符个数 private Context() { decoder = charset.newDecoder(); buf = IoBuffer.allocate(80).setAutoExpand(true); } // 重置 public void reset() { matchCount = 0; decoder.reset(); } // 追加数据 public void append(IoBuffer in) { getBuf().put(in); } // ======get/set方法===================== public CharsetDecoder getDecoder() { return decoder; } 42 public IoBuffer getBuf() { return buf; } public int getMatchCount() { return matchCount; } public void setMatchCount(int matchCount) { this.matchCount = matchCount; } }// end class Context; } 编码器: import java.nio.charset.Charset; import org.apache.mina.common.IoBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolEncoder; import org.apache.mina.filter.codec.ProtocolEncoderOutput; public class MyTextLineCodecEncoderII implements ProtocolEncoder { private Charset charset;// 编码格式 private String delimiter;// 文本分隔符 public MyTextLineCodecEncoderII(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception { if (delimiter == null || "".equals(delimiter)){// 如果文本换行符未指定, 使用默认值 delimiter = "\r\n"; } if (charset == null){ charset = Charset.forName("utf-8"); } String value = message.toString(); IoBuffer buf = IoBuffer.allocate(value.length()).setAutoExpand(true); 43 buf.putString(value, charset.newEncoder()); // 真实数据 buf.putString(delimiter, charset.newEncoder()); // 文本换行符 buf.flip(); out.write(buf); } public void dispose(IoSession session) throws Exception { } } 编解码器工厂: import java.nio.charset.Charset; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFactory; import org.apache.mina.filter.codec.ProtocolDecoder; import org.apache.mina.filter.codec.ProtocolEncoder; public class MyTextLineCodecFactoryII implements ProtocolCodecFactory { private Charset charset;// 编码格式 private String delimiter;// 文本分隔符 public MyTextLineCodecFactoryII(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } public ProtocolDecoder getDecoder(IoSession session) throws Exception { return new MyTextLineCodecDecoderII(charset, delimiter); } public ProtocolEncoder getEncoder(IoSession session) throws Exception { return new MyTextLineCodecEncoderII(charset, delimiter); } } 服务端或客户端绑定过滤器: // 添加过滤器 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new MyTextLineCodecFactoryII(Charset .forName("utf-8"),"\r\n"))); Demo3:自定义协议编解码 自定义协议是使用最广泛的,因为它非常的灵活! 第一步:制定协议: 44 协议需求:向服务端发送请求(频道的 ID和说明文字),返回响应结果(该频道 下所有的节目信息); 协议格式如下: 请求格式 Syntax No. of Bits Identifier _ description (){ Descriptor tag 16 0x0001 descriptor length 32 从下一字节开始至末尾的数据长度 ID 16 channel ID 值 chanel_des_len 8 频道说明文字 for(i=0;i< chanel_des_len;i++){ Byte_data 8 } }// end _description 响应格式 Syntax No. of Bits Identifier _ description (){ Tag 16 0x8001 Data_length 32 从下一字节开始至末尾的数据长度 channel_addr 32 频道名称的地址 channel _len 8 频道名称的字符串长度 programme_count 16 节目个数 for(i=0;i< programme_count;i++){ dayIndex 8 属于哪一天(以当日为基准)(-1 表示 前一天;0 表示当天;1 表示下一天;2 表示 后两天) event_addr 32 节目名称的地址 event _len 8 节目名称的字符串长度 StartTime 32 节目偏移开始时间 TotalTime 16 节目总时长(以秒为单位) Status 8 节目当前状态(已录制【0x01】//待录 制【0x00】) url_addr 32 节目播放地址的 addr url _len 8 节目播放地址的长度 }// end for For(j=0;j<;j++){ Byte_data 8 真实数据 } }// end _description 协议解释如下: 1. 协议前两个字节(16Bits)是协议的唯一标识值; 如上:请求部分的 tag = 0x0001,响应部分的 tag = 0x8001 45 2. 接着四个字节(32Bits)是传输消息的长度; 3. 接下来是数据区; 分析请求部分: Syntax No. of Bits Identifier _ description (){ Descriptor tag 16 0x0001 descriptor length 32 从下一字节开始至末尾的数据长度 ID 16 channel ID 值 chanel_des_len 8 频道说明文字 for(i=0;i< chanel_des_len;i++){ Byte_data 8 } }// end _description 请求部分是客户端(机顶盒)向服务端发送的请求;协议 I的请求只发送了 两个个参数:channelID 和channel_dec(频道描述信息) 各个参数分析: a. descriptor tag:请求的唯一标识; -- 2个字节 b. descriptor length:数据区长度; -- 4个字节 c. ID:channelID; -- 2个字节 d. channel_dec_len:频道说明信息的字节长度 -- 1个字节 e. for循环:存放频道说明信息的真实数据(字节数组中) 响应部分分析略…… // ===================协议格式总结=========================== 前面 2个绿色部分称为报文头,固定 6个字节; 中间 2个蓝色部分称为基本数据区,用 Java的8个基本数据类型描述; 最后的红色部分称为真实数据区,所有 String 类型的信息都放在这里; 基本数据区+真实数据区 =数据区 协议格式:报文头+数据区 图示如下: T a g 数 据 区 L ength 基 本 数 据 区 真 实 数 据 区 数 据 区报 文 头 总之,对于基本数据类型,直接存放在基本数据区,对于 String 类型,在 基本数据区描述它的长度和在真实数据区的地址,然后存在在真实数据区;而 Java对象,则是把对象属性分解为基本数据类型和 String 类型发送; 因此,解码必须获得三个信息: a. 请求标识:根据请求的不同进行不同的解码 46 b. 数据区总长度:定是否接受数据成功; c. 偏移地址:知道真实数据区位置,就可以解码 String 数据; 图示如下: T a g 数 据 区 L ength 基 本 数 据 区 真 实 数 据 区 数 据 区 L en thL en thL en thL en th 偏 移 地 址 代码实现: 1. 首先定义消息的抽象类,定义获取 3个解码信息的方法; import java.nio.charset.Charset; public abstract class AbstrMessage { // 协议编号 public abstract short getTag(); // 数据区长度 public abstract int getLen(Charset charset); // 真实数据偏移地址 public abstract int getDataOffset(); } 定义请求对象和响应对象; 请求的 Java对象: import java.nio.charset.Charset; import org.apache.log4j.Logger; /* * 请求的Java对象 */ public class ChannelInfoRequest extends AbstrMessage { private Logger logger = Logger.getLogger(ChannelInfoRequest.class); private String channel_desc; private int channel_id; @Override public short getTag() { return (short) 0x0001; } @Override public int getLen(Charset charset) { 47 int len = 2 + 1; try { if (channel_desc != null &&!"".equals(channel_desc)){ len += channel_desc.getBytes(charset).length; } } catch (Exception e) { logger.error("频道说明转换为字节码错误...", e); } return len; } @Override public int getDataOffset() { int len = 2 + 4 + 2 + 1; return len; } public String getChannel_desc() { return channel_desc; } public void setChannel_desc(String channel_desc) { this.channel_desc = channel_desc; } public int getChannel_id() { return channel_id; } public void setChannel_id(int channel_id) { this.channel_id = channel_id; } } 响应的 Java对象: import java.nio.charset.Charset; import org.apache.log4j.Logger; /* * 响应的Java对象 */ public class ChannelInfoResponse extends AbstrMessage { private Logger logger = Logger.getLogger(ChannelInfoResponse.class); private String ChannelName; 48 private EventDto[] events; @Override public short getTag() { return (short) 0x8001; } @Override public int getLen(Charset charset) { int len = 4 + 1 + 2; try { if (events != null && events.length > 0) { for (int i = 0; i < events.length; i++) { EventDto edt = events[i]; len += 1 + 4 + 1 + 4 + 2 + 1 + 4 + 1 + edt.getLen(charset); } } if (ChannelName != null &&!"".equals(ChannelName)){ len += ChannelName.getBytes(charset).length; } } catch (Exception e) { logger.error("频道信息转换为字节码错误...", e); } return len; } @Override public int getDataOffset() { int len = 2 + 4 + 4 + 1 + 2; if (events != null && events.length > 0) { len += events.length *(1 + 4 + 1 + 4 + 2 + 1 + 4 + 1); } return len; } public String getChannelName() { return ChannelName; } public void setChannelName(String channelName) { ChannelName = channelName; } public EventDto[] getEvents() { 49 return events; } public void setEvents(EventDto[] events) { this.events = events; } } import java.nio.charset.Charset; import org.apache.log4j.Logger; public class EventDto { private Logger logger = Logger.getLogger(EventDto.class); private String eventName; private int beginTime; private int totalTime; private int dayIndex; private int status; private String url; // 节目中字符数据的字节长度 public int getLen(Charset charset) { int len = 0; try { if (eventName != null &&!"".equals(eventName)){ len += eventName.getBytes(charset).length; } if (url != null &&!"".equals(url)){ len += url.getBytes(charset).length; } } catch (Exception e) { logger.error("节目信息转换为字节码错误...", e); } return len; } //………get/set方法,略……… } 解码器: import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; 50 import org.apache.log4j.Logger; import org.apache.mina.common.IoBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolDecoderOutput; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageDecoderResult; import com.dvn.li.message.AbstrMessage; import com.dvn.li.message.ChannelInfoRequest; import com.dvn.li.message.ChannelInfoResponse; import com.dvn.li.message.EventDto; public class MyMessageDecoder implements MessageDecoder { private Logger logger = Logger.getLogger(MyMessageDecoder.class); private Charset charset; public MyMessageDecoder(Charset charset) { this.charset = charset; } // 检查给定的IoBuffer是否适合解码 public MessageDecoderResult decodable(IoSession session, IoBuffer in) { // 报头长度==6 if (in.remaining() < 6) { return MessageDecoderResult.NEED_DATA; } // tag正常 short tag = in.getShort(); // 注意先把16进制标识值转换为short类型的十进制数据,然后与tag比较 if (tag == (short) 0x0001 || tag == (short) 0x8001) { logger.info("请求标识符:" + tag); } else { logger.error("未知的解码类型...."); return MessageDecoderResult.NOT_OK; } // 真实数据长度 int len = in.getInt(); if (in.remaining() < len) { return MessageDecoderResult.NEED_DATA; } 51 return MessageDecoderResult.OK; } public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { logger.info("解码:" + in.toString()); CharsetDecoder decoder = charset.newDecoder(); AbstrMessage message = null; short tag = in.getShort(); // tag int len = in.getInt(); // len byte[] temp = new byte[len]; in.get(temp); // 数据区 // ===============解析数据做准备====================== IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); buf.put(temp); buf.flip(); // 为获取基本数据区长度做准备 IoBuffer databuf = IoBuffer.allocate(100).setAutoExpand(true); databuf.putShort(tag); databuf.putInt(len); databuf.put(temp); databuf.flip(); // 为获取真实数据区长度做准备 // ================开始解码========================= // 注意先把16进制标识值转换为short类型的十进制数据,然后与tag比较 if (tag == (short) 0x0001) {// 服务端解码 ChannelInfoRequest req = new ChannelInfoRequest(); short channel_id = buf.getShort(); byte channel_desc_len = buf.get(); String channel_desc = null; if (channel_desc_len > 0) { channel_desc = buf.getString(channel_desc_len, decoder); } req.setChannel_id(channel_id); req.setChannel_desc(channel_desc); message = req; } else if (tag == (short) 0x8001) {// 客户端解码 ChannelInfoResponse res = new ChannelInfoResponse(); 52 int channel_addr = buf.getInt(); byte channel_len = buf.get(); if (databuf.position() == 0) { databuf.position(channel_addr); } String channelName = null; if (channel_len > 0) { channelName = databuf.getString(channel_len, decoder); } res.setChannelName(channelName); short event_num = buf.getShort(); EventDto[] events = new EventDto[event_num]; for (int i = 0; i < event_num; i++) { EventDto edt = new EventDto(); byte dayIndex = buf.get(); buf.getInt(); byte eventName_len = buf.get(); String eventName = null; if (eventName_len > 0) { eventName = databuf.getString(eventName_len, decoder); } int beginTime = buf.getInt(); short totalTime = buf.getShort(); byte status = buf.get(); buf.getInt(); byte url_len = buf.get(); String url = null; if (url_len > 0) { url = databuf.getString(url_len, decoder); } edt.setDayIndex(dayIndex); edt.setEventName(eventName); edt.setBeginTime(beginTime); edt.setTotalTime(totalTime); edt.setStatus(status); edt.setUrl(url); events[i] = edt; } 53 res.setEvents(events); message = res; } else { logger.error("未找到解码器...."); } out.write(message); // ================解码成功========================= return MessageDecoderResult.OK; } public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { } } 编码器: import java.nio.charset.Charset; import java.nio.charset.CharsetEncoder; import org.apache.log4j.Logger; import org.apache.mina.common.IoBuffer; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderOutput; import org.apache.mina.filter.codec.demux.MessageEncoder; import com.dvn.li.message.AbstrMessage; import com.dvn.li.message.ChannelInfoRequest; import com.dvn.li.message.ChannelInfoResponse; import com.dvn.li.message.EventDto; public class MyMessageEncoder implements MessageEncoder { private Logger logger = Logger.getLogger(MyMessageEncoder.class); private Charset charset; public MyMessageEncoder(Charset charset) { this.charset = charset; } public void encode(IoSession session, AbstrMessage message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true); buf.putShort(message.getTag()); buf.putInt(message.getLen(charset)); 54 // ===========编码数据区=============== if (message instanceof ChannelInfoRequest) { ChannelInfoRequest req = (ChannelInfoRequest) message; buf.putShort((short) req.getChannel_id()); buf.put((byte) req.getChannel_desc().getBytes(charset).length); buf.putString(req.getChannel_desc(), charset.newEncoder()); } else if (message instanceof ChannelInfoResponse) { ChannelInfoResponse res = (ChannelInfoResponse) message; CharsetEncoder encoder = charset.newEncoder(); IoBuffer dataBuffer = IoBuffer.allocate(100).setAutoExpand(true);// 定义真实数据区 int offset = res.getDataOffset(); // 偏移地址 buf.putInt(offset); // 频道名称地址(偏移开始位置) byte channelName_len = 0; if (res.getChannelName() != null){ channelName_len = (byte) res.getChannelName().getBytes(charset).length; } buf.put(channelName_len); offset += channelName_len; if (channelName_len > 0) { dataBuffer.putString(res.getChannelName(), encoder); } EventDto[] events = res.getEvents(); if (events != null){ buf.putShort((short) events.length); for (int i = 0; i < events.length; i++) { EventDto edt = events[i]; buf.put((byte) edt.getDayIndex()); buf.putInt(offset); String eventName = edt.getEventName(); byte eventName_len = 0; if (eventName != null){ eventName_len = (byte) eventName.getBytes(charset).length; } offset += eventName_len; buf.put(eventName_len); if (eventName_len > 0) { dataBuffer.putString(eventName, encoder); 55 } buf.putInt(edt.getBeginTime()); buf.putShort((short) edt.getTotalTime()); buf.put((byte) edt.getStatus()); buf.putInt(offset); String url = edt.getUrl(); byte url_len = 0; if (url != null){ url_len = (byte) url.getBytes(charset).length; } offset += url_len; buf.put(url_len); if (url_len > 0) { dataBuffer.putString(url, encoder); } } } // 真实数据追加在基本数据后面 if (dataBuffer.position() > 0) { buf.put(dataBuffer.flip()); } } // ==========编码成功================= buf.flip(); logger.info("编码" + buf.toString()); out.write(buf); } } 编解码器工厂: import org.apache.mina.filter.codec.demux.DemuxingProtocolCodecFactory; import org.apache.mina.filter.codec.demux.MessageDecoder; import org.apache.mina.filter.codec.demux.MessageEncoder; import com.dvn.li.message.AbstrMessage; public class MyMessageCodecFactory extends DemuxingProtocolCodecFactory { private MessageDecoder decoder; private MessageEncoder encoder; // 注册编解码器 56 public MyMessageCodecFactory(MessageDecoder decoder, MessageEncoder encoder) { this.decoder = decoder; this.encoder = encoder; addMessageDecoder(this.decoder); addMessageEncoder(AbstrMessage.class, this.encoder); } } 服务端和服务端处理类: import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.IoSessionConfig; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LogLevel; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; import com.dvn.li.codec.MyMessageCodecFactory; import com.dvn.li.codec.MyMessageDecoder; import com.dvn.li.codec.MyMessageEncoder; import com.dvn.li.handler.Demo2ServerHandler; public class TestServer02 { private static Logger logger = Logger.getLogger(TestServer02.class); private static int PORT = 3005; public static void main(String[] args) { IoAcceptor acceptor = null; try { // 创建一个非阻塞的server端的Socket acceptor = new NioSocketAcceptor(); // 设置过滤器(添加自带的编解码器) acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new MyMessageCodecFactory( new MyMessageDecoder(Charset.forName("utf-8")), new MyMessageEncoder(Charset.forName("utf-8"))))); // 设置日志过滤器 57 LoggingFilter lf = new LoggingFilter(); lf.setMessageReceivedLogLevel(LogLevel.DEBUG); acceptor.getFilterChain().addLast("logger", lf); // 获得IoSessionConfig对象 IoSessionConfig cfg = acceptor.getSessionConfig(); // 读写通道10秒内无操作进入空闲状态 cfg.setIdleTime(IdleStatus.BOTH_IDLE, 100); // 绑定逻辑处理器 acceptor.setHandler(new Demo2ServerHandler()); // 绑定端口 acceptor.bind(new InetSocketAddress(PORT)); logger.info("服务端启动成功... 端口号为:" + PORT); } catch (Exception e) { logger.error("服务端启动异常....", e); e.printStackTrace(); } } } import org.apache.log4j.Logger; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import com.dvn.li.message.ChannelInfoRequest; import com.dvn.li.message.ChannelInfoResponse; import com.dvn.li.message.EventDto; public class Demo2ServerHandler extends IoHandlerAdapter { public static Logger logger = Logger.getLogger(Demo2ServerHandler.class); @Override public void sessionCreated(IoSession session) throws Exception { logger.info("服务端与客户端创建连接..."); } @Override public void sessionOpened(IoSession session) throws Exception { logger.info("服务端与客户端连接打开..."); } @Override public void messageReceived(IoSession session, Object message) throws Exception { 58 if (message instanceof ChannelInfoRequest) { ChannelInfoRequest req = (ChannelInfoRequest) message; int channel_id = req.getChannel_id(); String channel_desc = req.getChannel_desc(); logger.info("服务端接收到的数据为:channel_id=" + channel_id + " channel_desc=" + channel_desc); // ================具体操作,比如查询数据库等,这里略....============= ChannelInfoResponse res = new ChannelInfoResponse(); res.setChannelName("CCTV1高清频道"); EventDto[] events = new EventDto[2]; for (int i = 0; i < events.length; i++) { EventDto edt = new EventDto(); edt.setBeginTime(10); edt.setDayIndex(1); edt.setEventName("风云第一的" + i); edt.setStatus(1); edt.setTotalTime(100 + i); edt.setUrl("www.baidu.com"); events[i] = edt; } res.setEvents(events); session.write(res); } else { logger.info("未知请求!"); } } @Override public void messageSent(IoSession session, Object message) throws Exception { session.close(); logger.info("服务端发送信息成功..."); } @Override public void sessionClosed(IoSession session) throws Exception { } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { logger.info("服务端进入空闲状态..."); } 59 @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("服务端发送异常...", cause); } } 客户端和客户端处理类: import java.net.InetSocketAddress; import java.nio.charset.Charset; import org.apache.log4j.Logger; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IoConnector; import org.apache.mina.common.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector; import com.dvn.li.codec.MyMessageCodecFactory; import com.dvn.li.codec.MyMessageDecoder; import com.dvn.li.codec.MyMessageEncoder; import com.dvn.li.handler.Demo2ClientHandler; import com.dvn.li.message.ChannelInfoRequest; public class TestClient02 { private static Logger logger = Logger.getLogger(TestClient02.class); private static String HOST = "127.0.0.1"; private static int PORT = 3005; public static void main(String[] args) { // 创建一个非阻塞的客户端程序 IoConnector connector = new NioSocketConnector(); // 设置链接超时时间 connector.setConnectTimeout(30000); // 添加过滤器 connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new MyMessageCodecFactory( new MyMessageDecoder(Charset.forName("utf-8")), new MyMessageEncoder(Charset.forName("utf-8"))))); // 添加业务逻辑处理器类 connector.setHandler(new Demo2ClientHandler()); 60 IoSession session = null; try { ConnectFuture future = connector.connect(new InetSocketAddress( HOST,PORT));// 创建连接 future.awaitUninterruptibly();// 等待连接创建完成 session = future.getSession();// 获得session ChannelInfoRequest req = new ChannelInfoRequest(); // 发送请求 req.setChannel_id(12345); req.setChannel_desc("mina在做测试哦哦....哇呀呀!!!"); session.write(req);// 发送消息 } catch (Exception e) { logger.error("客户端链接异常...", e); } session.getCloseFuture().awaitUninterruptibly();// 等待连接断开 connector.dispose(); } } import org.apache.log4j.Logger; import org.apache.mina.common.IoHandlerAdapter; import org.apache.mina.common.IoSession; import com.dvn.li.message.ChannelInfoResponse; import com.dvn.li.message.EventDto; public class Demo2ClientHandler extends IoHandlerAdapter { private static Logger logger = Logger.getLogger(Demo2ClientHandler.class); @Override public void messageReceived(IoSession session, Object message) throws Exception { if (message instanceof ChannelInfoResponse) { ChannelInfoResponse res = (ChannelInfoResponse) message; String channelName = res.getChannelName(); EventDto[] events = res.getEvents(); logger.info("客户端接收到的消息为:channelName=" + channelName); if(events!=null && events.length>0){ for (int i = 0; i < events.length; i++) { EventDto edt = events[i]; logger.info("客户端接收到的消息为:BeginTime=" + edt.getBeginTime()); logger.info("客户端接收到的消息为:DayIndex=" + edt.getDayIndex()); logger.info("客户端接收到的消息为:EventName=" + edt.getEventName()); logger.info("客户端接收到的消息为:Status=" + edt.getStatus()); logger.info("客户端接收到的消息为:TotalTime=" + edt.getTotalTime()); 61 logger.info("客户端接收到的消息为:url=" + edt.getUrl()); } } }else{ logger.info("未知类型!"); } } @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { logger.error("客户端发生异常...", cause); } } 测试…… 服务端打印信息: 2010-12-23 13:57:36,533 INFO TestServer02 - 服务端启动成功... 端口号为:3005 2010-12-23 13:57:42,049 INFO LoggingFilter -CREATED 2010-12-23 13:57:42,049 INFO Demo2ServerHandler - 服务端与客户端创建连接... 2010-12-23 13:57:42,049 INFO LoggingFilter -OPENED 2010-12-23 13:57:42,049 INFO Demo2ServerHandler - 服务端与客户端连接打开... 2010-12-23 13:57:42,112 INFO MyMessageDecoder - 请求标识符:1 2010-12-23 13:57:42,112 INFO MyMessageDecoder - 解码:HeapBuffer[pos=0 lim=53 cap=2048: 00 01 00 00 00 2F 30 39 2C 6D 69 6E 61 E5 9C A8...] 2010-12-23 13:57:42,127 DEBUG LoggingFilter -RECEIVED: com.dvn.li.message.ChannelInfoRequest@1e3118a 2010-12-23 13:57:42,127 INFO Demo2ServerHandler - 服务端接收到的数据为:channel_id=12345 channel_desc=mina在做测试哦哦....哇呀呀!!! 2010-12-23 13:57:42,158 INFO MyMessageEncoder - 编码HeapBuffer[pos=0 lim=124 cap=124: 80 01 00 00 00 76 00 00 00 31 11 00 02 01 00 00...] 2010-12-23 13:57:42,174 INFO LoggingFilter -SENT: com.dvn.li.message.ChannelInfoResponse@be0e27 2010-12-23 13:57:42,174 INFO Demo2ServerHandler - 服务端发送信息成功... 2010-12-23 13:57:42,174 INFO LoggingFilter -CLOSED 客户端打印信息: 2010-12-23 13:57:42,080 INFO MyMessageEncoder - 编码HeapBuffer[pos=0 lim=53 cap=100: 00 01 00 00 00 2F 30 39 2C 6D 69 6E 61 E5 9C A8...] 2010-12-23 13:57:42,158 INFO MyMessageDecoder - 请求标识符:-32767 2010-12-23 13:57:42,174 INFO MyMessageDecoder - 解码:HeapBuffer[pos=0 lim=124 cap=2048: 80 01 00 00 00 76 00 00 00 31 11 00 02 01 00 00...] 2010-12-23 13:57:42,174 INFO Demo2ClientHandler - 客户端接收到的消息为:channelName=CCTV1 高清频道 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:BeginTime=10 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:DayIndex=1 62 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:EventName=风云第一 的0 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:Status=1 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:TotalTime=100 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:url=www.baidu.com 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:BeginTime=10 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:DayIndex=1 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:EventName=风云第一 的1 2010-12-23 13:57:42,190 INFO Demo2ClientHandler - 客户端接收到的消息为:Status=1 2010-12-23 13:57:42,205 INFO Demo2ClientHandler - 客户端接收到的消息为:TotalTime=101 2010-12-23 13:57:42,205 INFO Demo2ClientHandler - 客户端接收到的消息为:url=www.baidu.com 总结 IoFilter是转码和解码用滴,它是 Mina 最值得研究的地方,建议阅读它的 源码! 在实际的应用开发中,自定义协议是必用的,因为很多客户端和服务端是不 同语言实现的。 2.3.IoHandler 接口 IoHandler 是Mina实现其业务逻辑的顶级接口;它相当简单,你就理解它是 根据事件触发的简单应用程序即可。 在IoHandler 中定义了 7个方法,根据 I/O事件来触发对应的方法: import java.io.IOException; public interface IoHandler { void sessionCreated(IoSession session) throws Exception; void sessionOpened(IoSession session) throws Exception; void sessionClosed(IoSession session) throws Exception; void sessionIdle(IoSession session, IdleStatus status) throws Exception; void exceptionCaught(IoSession session, Throwable cause) throws Exception; void messageReceived(IoSession session, Object message) throws Exception; void messageSent(IoSession session, Object message) throws Exception; } sessionCreated:当一个新的连接建立时,由 I/O processor thread 调用; sessionOpened:当连接打开是调用; messageReceived:当接收了一个消息时调用; messageSent:当一个消息被(IoSession#write)发送出去后调用; sessionIdle:当连接进入空闲状态时调用; sessionClosed:当连接关闭时调用; exceptionCaught:当实现 IoHandler 的类抛出异常时调用; 一般情况下,我们最关心的只有 messageReceived 方法,接收消息并处理, 然后调用 IoSession 的write 方法发送出消息!(注意:这里接收到的消息都是 Java对象,在 IoFilter 中所有二进制数据都被解码啦!) 一般情况下很少有人实现 IoHandler 接口,而是继承它的一个实现类 IoHandlerAdapter,这样不用覆盖它的 7个方法,只需要根据具体需求覆盖其中 63 的几个方法就可以! Iohandler 的7个方法其实是根据 session 的4个状态值间变化来调用的: � Connected:会话被创建并使用; � Idle:会话在一段时间(可配置)内没有任何请求到达,进入空闲状态; � Closing:会话将被关闭(剩余 message 将被强制 flush); � Closed:会话被关闭; 状态转换图如下: 三. Mina 解析 Java NIO是相对于传统的 IO操作而言的,因为提出了缓冲池等概念,使它 的处理数据的效率大大提高; 多线程是并发处理的明智选择;为减少系统开销,线程池是并发应用中是经 常使用的技术; 而异步处理机制可以大大缩短每个请求的响应时间; Mina2 中就大量使用了这三项技术,使得它成为优秀的网络应用框架。 (这一章并非描述 Mina 的实际应用,而是对它的内部处理机制做分析;但 是本人水平有限,可能有的观点并非正确滴,错误之处望多指正,更希望能邮件 联系沟通; 我们对 Mina的解析也只对服务端而言:因为无论是 Mina也好,NIO也好, 多线程也好,异步处理机制也好,都是解决高并发问题的;高并发却是对服务端 而言的!因此,服务端才是重点呀。) 3.1 NIO分析 Mina 是一个 Java NIO 框架;而 NIO 的基本思想是:服务器程序只需要一个 线程就能同时负责接收客户的连接、客户发送的数据,以及向各个客户发送响应 数据。服务器程序的处理流程如下: //阻塞 while(一直等待,直到有接收连接就绪事件、读就绪事件或写就绪事件发生){ if(有客户连接) 接收客户的连接;//非阻塞 if(某个 Socket 的输入流中有可读数据) 64 从输入流中读数据;//非阻塞 if(某个 Socket 的输出流可以写数据) 向输出流写数据;//非阻塞 } 而传统的并发型服务器则是采用多线程的模式响应用户请求的; //阻塞 while(一直等待){ if(有客户连接) 启动新线程,与客户的通信;//可能会阻塞 } 但是,无论如何,服务端共同的结构如下: 1. Read Request; 接受请求 2. Decode Request; 请求值解码(读) 3. Process Service;请求处理 4. Encode Reply; 响应值编码(写) 5. Send Reply; 发送响应 是不是感觉太抽象了?OK,我们从传统的 IO模式的并发服务器说起。 3.1.1 传统阻塞服务器 传统的服务端一次只能处理一个请求,其他请求需要排队等待;示例如下: /* * 服务端只能一次处理一个客户端的请求 * 多个请求到达后需要排队 */ public class EchoServer01 { private Logger logger = Logger.getLogger(EchoServer01.class); private int PORT = 3015; private ServerSocket serverSocket; public EchoServer01() throws IOException { // 请求队列最大长度为5 serverSocket = new ServerSocket(PORT,5); logger.info("服务端启动... 端口号:" + PORT); } public void service() { while (true){ Socket socket = null; try { socket = serverSocket.accept(); logger.info("一个新的连接到达,地址为:" + socket.getInetAddress() 65 + ":" + socket.getPort()); // 获得客户端发送信息的输入流 InputStream socketIn = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader( socketIn)); // 给客户端响应信息的输出流 OutputStream socketOut = socket.getOutputStream(); PrintWriter pw = new PrintWriter(socketOut, true); String msg = null; while ((msg = br.readLine()) != null){ logger.info("服务端接受到的信息为:" + msg); pw.println("响应信息:" + new Date().toString());// 给客户端 一个日期字符串 if (msg.equals("bye")){ logger.info("客户端请求断开"); break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (socket != null) socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String args[]) throws IOException { new EchoServer01().service(); } } 代码就不解释啦,直接看注释吧,没有任何玄妙的地方。我们直接用 telnet 做测试吧。 a. 启动服务端; 2011-01-19 11:16:35,250 INFO EchoServer01 - 服务端启动... 端口号:3015 b. 启动,cmd,telnet 127.0.0.1 3015,回车; 66 c.测试; 客户端输入: 服务端响应: 2011-01-19 11:17:53,343 INFO EchoServer01 - 服务端启动... 端口号:3015 2011-01-19 11:18:24,156 INFO EchoServer01 - 一个新的连接到达,地址为: /127.0.0.1:3250 2011-01-19 11:18:26,187 INFO EchoServer01 - 服务端接受到的信息为:111 2011-01-19 11:18:27,968 INFO EchoServer01 - 服务端接受到的信息为:222 2011-01-19 11:18:30,718 INFO EchoServer01 - 服务端接受到的信息为:qqqqqqq 2011-01-19 11:18:32,593 INFO EchoServer01 - 服务端接受到的信息为:eeeeee 2011-01-19 11:18:33,984 INFO EchoServer01 - 服务端接受到的信息为:bye 2011-01-19 11:18:33,984 INFO EchoServer01 - 客户端请求断开 使用客户端代码做测试;EchoClient01 代码如下: /* * 使用Socket创建客户端请求 */ public class EchoClient01 { private Logger logger = Logger.getLogger(EchoClient01.class); private String HOST = "localhost"; private int PORT = 3015; private Socket socket; public EchoClient01() throws IOException { socket = new Socket(HOST,PORT); 67 } public void talk() throws IOException { try { // 获得服务端响应信息的输入流 InputStream socketIn = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader( socketIn)); // 给服务端发送信息的输出流 OutputStream socketOut = socket.getOutputStream(); PrintWriter pw = new PrintWriter(socketOut, true); BufferedReader localReader = new BufferedReader( new InputStreamReader(System.in)); String msg = null; while ((msg = localReader.readLine()) != null){ pw.println(msg); logger.info(br.readLine()); if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[]) throws IOException { new EchoClient01().talk(); } } 测试吧,无疑是通过的;但是,有两个问题出现了: a. 当前服务端一下只能处理一个请求;我靠,这还是服务器吗?做 web 开 发的人肯定有这样的想法; b. 服务端的请求队列最多只有是 5个,也就是一下能连 6个请求(1个处理, 5个等待),第 7个请求到达后会被拒绝; 68 注意看,第 7个客户端请求是无法成功的。服务端的异常信息如下: Exception in thread "main" java.net.ConnectException: Connection refused: connect at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333) at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366) at java.net.Socket.connect(Socket.java:519) at java.net.Socket.connect(Socket.java:469) at java.net.Socket.(Socket.java:366) at java.net.Socket.(Socket.java:180) at com.dvn.li.socketdemo.EchoClient01.(EchoClient01.java:26) at com.dvn.li.socketdemo.EchoClient01.main(EchoClient01.java:59) 异常提示也很明确:拒绝连接!因为我们在服务端建立时做了请求队列最大 长度的限制; public EchoServer01() throws IOException { // 请求队列最大长度为5 serverSocket = new ServerSocket(PORT,5); logger.info("服务端启动... 端口号:" + PORT); } c. 服务端容易阻塞;最显著的阻塞是 IO操作,比如客户端与服务端建立连 接后,向服务端发送一条消息,客户端因为人为操作很久没有输入结束; 此时其他的连接只好等待在队列中; 这样的服务端我们称之为传统阻塞服务端,大凡 Socket 的入门示例都是这 样的;但是,在实际的生产应用环境中用的非常少(注意:并不是不用哦,在特 殊的环境是还是可以使用的,比如手机终端,你接听电话肯定只能一下接受一个 请求,其他如短信接收,是要排队等待的)。 3.1.2 多线程阻塞服务器 在实际的应用开发中,我们更多采用的是多线程阻塞服务器,即每一个客户 端请求到达,就建立一个线程单独的处理它与服务端的通信,如下图所示: 69 H andlerC lientC lientC lientC lient C lientC lientC lientC lient C lientC lientC lientC lient ServerServerServerServer H andler H andler 好处就是可以并发处理每一个到达的请求!服务端代码如下: /* * 为每个客户端分配一个线程 * 服务器的主线程负责接收客户的连接 * 每次接收到一个客户连接,就会创建一个工作线程,由它负责与客户的通信 */ public class EchoServer02 { private Logger logger = Logger.getLogger(EchoServer02.class); private int PORT = 3015; private ServerSocket serverSocket; public EchoServer02() throws IOException { serverSocket = new ServerSocket(PORT); logger.info("服务器端启动.... 端口号:" + PORT); } public void service() { while (true){ Socket socket = null; try { socket = serverSocket.accept(); // 请求到达 Thread workThread = new Thread(new Server02Handler(socket)); // 创 建线程 workThread.start(); // 启动线程 } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[]) throws IOException { new EchoServer02().service(); } 70 } 很明显,每到达一个请求,就交给一个线程单独的处理;处理方法如下: public class Server02Handler implements Runnable { private Logger logger = Logger.getLogger(Server02Handler.class); private Socket socket; public Server02Handler(Socket socket) { this.socket = socket; } public void run() { try { logger.info("一个新的请求达到并创建 " + socket.getInetAddress() + ":" + socket.getPort()); InputStream socketIn = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader( socketIn)); OutputStream socketOut = socket.getOutputStream(); PrintWriter pw = new PrintWriter(socketOut, true); String msg = null; while ((msg = br.readLine()) != null){ logger.info("服务端受到的信息为:" + msg); pw.println(new Date()); // 给客户端响应日期字符串 if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { if (socket != null) socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } 启动服务端,使用 EchoClient01 客户端测试成功! 虽然它没有了传统阻塞服务端的单处理弊端,但是却有一个致命的危险存 在:大量请求到达时,不断的创建线程,很容易耗尽系统资源造成服务器崩溃; 71 而且每个线程的创建与销毁都很浪费资源; 解决的办法就是使用线程池!(是不是很熟悉?我们经常接触的数据库连接 池就是这样实现的。) 服务端代码如下: /* * 自定义线程池 * 多线程处理客户端请求 */ public class EchoServer03 { private Logger logger = Logger.getLogger(EchoServer03.class); private int PORT = 3015; private ServerSocket serverSocket; private ThreadPool threadPool;// 线程池 private final int POOL_SIZE = 4; // 单个CPU时线程池中的工作线程个数 public EchoServer03() throws IOException { serverSocket = new ServerSocket(PORT); // 创建线程池 // Runtime的availableProcessors()方法返回当前系统的CPU格式 // 系统的CPU越多,线程池中工作线程的数目也越多 threadPool = new ThreadPool(Runtime.getRuntime().availableProcessors() *POOL_SIZE); logger.info("服务端启动.... 端口号:" + PORT); } public void service() { while (true){ Socket socket = null; try { socket = serverSocket.accept(); // 把与客户通信的任务交给线程池 threadPool.execute(new Server02Handler(socket)); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[]) throws IOException { new EchoServer03().service(); 72 } } 服务端没有什么可解释的地方,关键是线程池的实现代码: /* * 自定义线程池 */ public class ThreadPool extends ThreadGroup { private Logger logger = Logger.getLogger(ThreadPool.class); private boolean isClosed = false;// 线程池是否关闭 // 将任务放在LinkedList中,LinkedList不支持同步, // 所以在添加任务和获取任务的方法声明中必须使用synchronized关键字 private LinkedList workQueue;// 表示工作队列 private static int threadPoolID;// 表示线程池ID private int threadID;// 表示工作线程ID // 构建一个线程组 public ThreadPool(int poolSize) {// poolSize是指线程池中工作线程的数目 super("ThreadPool-" + (threadPoolID++)); // 线程组名 setDaemon(true); workQueue = new LinkedList();// 创建工作队列 for (int i = 0; i < poolSize; i++) new WorkThread().start(); // 创建并启动工作线程(如果工作队列为空, 则所有工作线程处于阻塞状态) } // 向工作队列中添加一个任务,由工作线程去执行该任务 public synchronized void execute(Runnable task) { if (isClosed){// 线程池关闭则抛出IllegalStateException异常 throw new IllegalStateException(); } if (task != null){ workQueue.add(task); notify(); // 唤醒正在getTask()方法中等待任务的工作线程 } } // 从工作队列中取出一个任务 ----工作线程会调用此方法 protected synchronized Runnable getTask() throws InterruptedException { while (workQueue.size() == 0) { if (isClosed) 73 return null; wait(); // 如果工作队列没有任务,就等待任务 } return workQueue.removeFirst(); } // 关闭线程池 public synchronized void close() { if (!isClosed){ isClosed = true; workQueue.clear(); // 清空工作队列 interrupt();// 中断所有工作线程,该方法继承自ThreadGroup类 } } // 等待工作线程把所有任务执行完 public void join() { synchronized (this){ isClosed = true; notifyAll(); // 唤醒还在getTask()方法中等待任务的工作线程 } // activeCount()方法是ThreadGroup类的,获得线程组中当前所有活着的工作线 程数目 Thread[] threads = new Thread[activeCount()]; // enumerate方法继承自ThreadGroup类,获得线程组中当前所有活着的工作线程 int count = enumerate(threads); for (int i = 0; i < count; i++) {// 等待所有工作线程运行结束 try { threads[i].join(); // 等待工作线程运行结束 } catch (InterruptedException ex) { logger.error("工作线程出错...", ex); } } } // 内部类,工作线程 private class WorkThread extends Thread { public WorkThread() { // 加入当前的ThreadPool线程组中 // Thread(ThreadGroup group, String name) super(ThreadPool.this,"WorkThread-" + (threadID++)); } public void run() { 74 // isInterrupted()方法继承自ThreadGroup类,判断线程是否中断 while (!isInterrupted()) { Runnable task = null; try { task = getTask(); // 得到任务 } catch (InterruptedException ex) { logger.error("获得任务异常...", ex); } // 如果getTask()返回null或者线程执行getTask()时被中断,则结束此 线程 if (task == null) return; try { // 运行任务,捕获异常 task.run(); // 直接调用task的run方法 } catch (Throwable t) { logger.error("任务执行异常...", t); } }//#while end }//#run end }//# WorkThread class end } 启动服务端,使用 EchoClient01 客户端测试成功! 很多的服务端程序的实现思想就是基于该理念! 3.1.3 使用 JDK自带线程池的阻塞服务器 上面那个多线程阻塞服务器使用的是自定义的线程池,但是它的代码可能不 是很健壮,在更多的实际开发应用中,我们都是使用 JDK自带的线程池的。 java.util.concurrent 包提供了现成的线程池的实现。 75 � Executor 接口表示线程池,它的 execute(Runnable task)方法用来执行 Runnable 类型的任务。Executor 的子接口 � ExecutorService 中声明了管理线程池的一些方法,比如用于关闭线程池的 shutdown()方法等。 � Executors 类中包含一些静态方法,它们负责生成各种类型的线程池 ExecutorService 实例。 我们现在使用它来实现一个多线程阻塞服务器,服务端代码如下: /* * 使用JDK自带的线程池ExecutorService * 多线程处理客户端请求 */ public class EchoServer04 { private Logger logger = Logger.getLogger(EchoServer04.class); private int PORT = 3015; private ServerSocket serverSocket; private ExecutorService executorService;// 线程池 private final int POOL_SIZE = 4; // 单个CPU时线程池中的工作线程个数 public EchoServer04() throws IOException { serverSocket = new ServerSocket(PORT); // 创建线程池 // Runtime的availableProcessors()方法返回当前系统的CPU格式 76 // 系统的CPU越多,线程池中工作线程的数目也越多 executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors() *POOL_SIZE); logger.info("服务端启动.... 端口号:" + PORT); } public void service() { while (true){ Socket socket = null; try { socket = serverSocket.accept(); executorService.execute(new Server02Handler(socket)); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[]) throws IOException { new EchoServer04().service(); } } 怎么样,代码很简单吧! 启动服务端,使用 EchoClient01 客户端测试成功! 使用线程池时需要遵循以下原则: � 如果任务 A在执行过程中需要同步等待任务 B的执行结果,那么 任务 A不适合加入到线程池的工作队列中。 � (2)如果执行某个任务时可能会阻塞,并且是长时间的阻塞, 则应该设定超时时间,避免工作线程永久的阻塞下去而导致线程 泄漏。 � (3)根据任务的特点,对任务进行分类,然后把不同类型的任 务分别加入到不同线程池的工作队列中,这样可以根据任务的特 点,分别调整每个线程池。 � (4)调整线程池的大小。线程池的最佳大小主要取决于系统的 可用 CPU的数目以及工作队列中任务的特点。 � (5)避免任务过载。 现在基本上可以解决并发处理客户端的问题啦。但是它依然存在不足: 1. 并发量激增的情况下,一台服务器很难应付海量的多并发;这就需要提高服 务器并发处理能力和服务器个数;常见的解决方案是集群。 2. 服务器的好坏有 2个取决因素:一个是并发能力,一个是响应速度;在并发 能力有保障的情况下,每个工作线程,大部分的处理时间都浪费在 IO操作上, 因为 CPU的处理能力比 IO快太多,而 IO却存在太多的局限因素,造成线程 77 阻塞在 IO操作上,大大降低了响应速度;而且会造成资源的浪费,就好比 2 个同学,一个负责烧水,一个负责挑水,烧水的人一直守在炉子前等待水开, 一个却一直挑水;虽然烧水的人可以腾出时间帮助挑水的人,但是他却不能 这样做,因为他固定的只能负责一个任务; 对于高并发,我们很有必要提高 IO的操作效率,同时也应该改善我们处理 每个任务的原则,提高 CPU的利用率;Java NIO就是解决方案; 3.1.4 NIO 基础知识 Java NIO(new IO)是JDK1.4 引入的非阻塞 IO机制,具体它如何的好,我就 不说啦,百度一下看看就了解啦。 Java NIO引入了两个新的概念:通道 Channel 和选择器 Selector; 通道是服务端和客户端进行通信的接口-----原来是直接的 IO操作,客户端发信 息给服务端,服务端从 OutputStream 中读取,然后向 InputStream 中写数据; 现在则直接从 Channel 中读取或写入数据; 选择器是一个多路复用器:所有的通道向它注册事件,因此它管理了所有的通道 信息,并轮询各个通道的状态,一旦某个通道某事件发生(比如有数据读或可以 写入数据),则通知该管道对应事件的处理器去处理它; 如下图所示: Selector C lientC lientC lientC lient C lientC lientC lientC lient C lientC lientC lientC lient ServerServerServerServer SocketChannel SocketChannel SocketChannel H andler register register register 客户端连接上服务端后,首先每个客户端都要与服务端建立一个通道 (SocketChannel);然后每个通道向选择器(Selector)注册事件,注册器就会轮 询查看每个通道是否有事件发生,一旦某通道有事件发生,比如 Client1 的 SocketChannel 有数据了,就触发了读就绪事件,可以进行读取的操作啦。 选择器(Selector)是个典型的反应器模式(Reactor Pattern),它的实现原 理可以参考该文章: http://www.jdon.com/concurrent/reactor.htm 先介绍下 NIO的几个常用类: a. Buffer 缓冲区 Java NIO的缓冲区 Buffer 基本和 Mina 的IoBuffer 一样,但是准确的说, Mina的IoBuffer 就是对 Java NIO buffer 的二次封装,使得它的功能更加强大。 数据输入和输出往往是比较耗时的操作。缓冲区从两个方面提高 I/O操作的 效率: � 减少实际的物理读写次数。 78 这一点比较容易理解,我们经常使用的缓冲数组就是这个道理: public void ioRead(String filePath) throws IOException { FileInputStream in = new FileInputStream(filePath); byte[] b = new byte[1024]; int i = 0; while ((i = in.read(b)) != -1) { logger.info(new String(b, 0, i)); } } � 缓冲区在创建时被分配内存,这块内存区域一直被重用,这可以减少动 态分配和回收内存区域的次数。 这一点是 Buffer 的优势,也是 buffer 性能较高的一个原因。 java.nio.Buffer 类是一个抽象类,不能被实例化。共有 8个具体的缓冲区类, 其中最基本的缓冲区是 ByteBuffer,它存放的数据单元是字节。ByteBuffer 类 并没有提供公开的构造方法,但是提供了两个获得 ByteBuffer 实例的静态工厂 方法: � allocate(int capacity):返回一个 ByteBuffer 对象,参数 capacity 指定缓冲区的容量。 � directAllocate(int capacity): 返回一个 ByteBuffer 对象,参数 capacity 指定缓冲区的容量。该方法返回的缓冲区称为直接缓冲区,它 与当前操作系统能够更好的耦合,因此能进一步提高 I/O操作的速度。 但是分配直接缓冲区的系统开销很大,因此只有在缓冲区较大并且长期 存在,或者需要经常重用时,才使用这种缓冲区。 同样常用的是 CharBuffer,使用基本和上面一样,所有不多说啦!其他实现 类我也基本不怎么用。 常用的方法也不多做解释了,翻看 API文档吧。 b. Charset 字符编码 Charset 就是根据指定的编码个数进行编解码的一个接口,不多作解释。 � Charset 类的静态 forName(String encode)方法返回一个 Charset 对象,它 代表参数 encode 指定的编码类型。 � ByteBuffer encode(String str):对参数 str 指定的字符串进行编码,把 得到的字节序列存放在一个 ByteBuffer 对象中,并将其返回。 � ByteBuffer encode(CharBuffer cb):对参数 cb指定的字符缓冲区中的字 符进行编码,把得到的字节序列存放在一个 ByteBuffer 对象中,并将其返 79 回。 � CharBuffer decode(ByteBuffer bb):把参数 bb指定的 ByteBuffer 中的字 节序列进行解码,把得到的字符序列存放在一个 CharBuffer 对象中,并将 其返回。 c. Channel 通道 通道在 Java NIO的开始就做个解释,它是服务端和客户端进行通信的接口; 通道 Channel 用来连接缓冲区与数据源或数据汇(即数据目的地)。如下图所示, 数据源的数据经过通道到达缓冲区,缓冲区的数据经过通道到达数据汇 但是在实际的开发中,我们基本都是使用 Channel 的实现类: ServerSocketChannel 和SocketChannel,类图如下: ServerSocketChannel 从SelectableChannel 中继承了 configureBlocking()和 register()方法。ServerSocketChannel 是ServerSocket 的替代类,也具有负 责接收客户连接的 accept()方法; ServerSocketChannel 并没有 public 类型的构造方法,必须通过它的静态方 法open()来创建 ServerSocketChannel 对象; 每个ServerSocketChannel 对象都与一个 ServerSocket 对象关联。 ServerSocketChannel 的socket()方法返回与它关联的 ServerSocket 对象; 80 private Selector selector; private int PORT = 3015; private ServerSocketChannel serverSocketChannel = null; // 创建一个Selector对象 selector = Selector.open(); // 创建一个ServerSocketChannel对象 serverSocketChannel = ServerSocketChannel.open(); // 使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时, // 可以顺利绑定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); // 使ServerSocketChannel工作于非阻塞模式 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); SocketChannel可看作是Socket的替代类,但它比Socket具有更多的功能; SocketChannel不仅从SelectableChannel父类中继承了configureBlocking()和 register()方法,而且实现了ByteChannel接口,因此具有用于读写数据的 read(ByteBuffer dst)和write(ByteBuffer src)方法; SocketChannel没有public类型的构造方法,必须通过它的静态方法open()来创 建SocketChannel对象。 private String HOST = "127.0.0.1"; private int PORT = 3015; private SocketChannel socketChannel = null; // 创建一个SocketChannel对象 socketChannel = SocketChannel.open(); // 使SocketChannel工作于非阻塞模式 socketChannel.configureBlocking(false); // InetAddress ia = InetAddress.getLocalHost(); // InetSocketAddress isa = new InetSocketAddress(ia, 3015); InetSocketAddress isa = new InetSocketAddress(HOST,PORT); socketChannel.connect(isa); logger.info("与服务器建立连接成功...."); // 创建一个Selector对象 selector = Selector.open(); 其他不多做解释了,请参看NIO的API; d. Selector 选择器 选择器在 Java NIO的开始就做个解释,它是一个多路复用器,所有的通道 向它注册事件;只要 ServerSocketChannel 以及SocketChannel 向Selector 注 册了特定的事件,Selector 就会监控这些事件是否发生。 ServerSocketChannel以及SelectableChannel 的register()方法负责注册 事件,该方法返回一个 SelectionKey 对象,该对象是用于跟踪这些被注册事件 的句柄。 在SelectionKey 对象的有效期间,Selector 会一直监控与 SelectionKey 对象相关的事件,如果事件发生,就会把 SelectionKey 对象加入到 selected-keys 集合中。 81 在以下情况,SelectionKey 对象会失效,这意味着 Selector 再也不会监控 与它相关的事件 (1)程序调用 SelectionKey 的cancel()方法; (2)关闭与 SelectionKey 关联的 Channel; (3)与 SelectionKey 关联的 Selector 被关闭; 在SelectionKey 中定义了四种事件,分别用 4个int 类型的常量来表示: � SelectionKey.OP_ACCEPT:接收连接就绪事件,表示服务器监听到了客户连 接,服务器可以接收这个连接了。常量值为 16 � SelectionKey.OP_CONNECT:连接就绪事件,表示客户与服务器的连接已经 建立成功。常量值为 8。 � SelectionKey.OP_READ:读就绪事件,表示通道中已经有了可读数据,可以 执行读操作了。常量值为 1。 � SelectionKey.OP_WRITE:写就绪事件,表示已经可以向通道写数据了。常 量值为 4。 最常用的就是后三种事件:连接就绪事件和读写就绪事件; 下面就是它们的具体应用啦,请结合实例理解 NIO的用法; 3.1.5 基于 NIO的阻塞服务器 ServerSocketChannel 与SocketChannel 采用默认的阻塞模式,因此我们用 NIO 提供的 API做一个阻塞服务器,和以前的阻塞服务器做个对比,以加深它们的区 别; 服务端代码: /* * 使用NIO的ServerSocketChannel创建阻塞的Socket服务端 * 使用JDK自带的线程池ExecutorService,多线程处理客户端请求 */ public class EchoServer05 { private Logger logger = Logger.getLogger(EchoServer05.class); private int PORT = 3015; private ServerSocketChannel serverSocketChannel = null; private ExecutorService executorService;// 线程池 private static final int POOL_MULTIPLE = 4; // 单个CPU时线程池中的工作线程 个数 public EchoServer05() throws IOException { // 创建线程池 // Runtime的availableProcessors()方法返回当前系统的CPU格式 // 系统的CPU越多,线程池中工作线程的数目也越多 executorService = Executors.newFixedThreadPool(Runtime.getRuntime() .availableProcessors() *POOL_MULTIPLE); 82 // ServerSocketChannel并没有public类型的构造方法, // 必须通过它的静态方法open()来创建ServerSocketChannel对象 // 默认是阻塞模式的,通过configureBlocking(false)设置为非阻塞模式 serverSocketChannel = ServerSocketChannel.open(); // 使得在同一个主机上关闭了服务器程序,紧接着再启动该服务器程序时, // 可以顺利绑定到相同的端口 serverSocketChannel.socket().setReuseAddress(true); // 每个ServerSocketChannel对象都与一个ServerSocket对象关联 // ServerSocketChannel的socket()方法返回与它关联的ServerSocket对象 serverSocketChannel.socket().bind(new InetSocketAddress(PORT)); logger.info("服务端启动.... 端口号:" + PORT); } public void service() { while (true){// 阻塞 SocketChannel socketChannel = null; try { socketChannel = serverSocketChannel.accept(); // 等待连接 // 多线程处理 executorService.execute(new Server05Handler(socketChannel)); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String args[]) throws IOException { new EchoServer05().service(); } } 服务器端的业务处理代码: public class Server05Handler implements Runnable { private Logger logger = Logger.getLogger(Server05Handler.class); private SocketChannel socketChannel; public Server05Handler(SocketChannel socketChannel) { this.socketChannel = socketChannel; } public void run() { try { Socket socket = socketChannel.socket(); logger.info("一个新的请求达到并创建 " + socket.getInetAddress() + 83 ":" + socket.getPort()); InputStream socketIn = socket.getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader( socketIn)); OutputStream socketOut = socket.getOutputStream(); PrintWriter pw = new PrintWriter(socketOut, true); String msg = null; while ((msg = br.readLine()) != null){ logger.info("服务端受到的信息为:" + msg); pw.println(new Date()); // 给客户端响应日期字符串 if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { if (socketChannel != null) socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } } 客户端代码: /* * 使用NIO的SocketChannel创建阻塞的客户端 */ public class EchoClient05 { private Logger logger = Logger.getLogger(EchoClient05.class); private String HOST = "localhost"; private int PORT = 3015; private SocketChannel socketChannel; public EchoClient05() throws IOException { socketChannel = SocketChannel.open(); // InetAddress ia = InetAddress.getLocalHost(); InetSocketAddress isa = new InetSocketAddress(HOST,PORT); 84 // socketChannel.connect()与远程主机建立连接 // 默认采用阻塞模式 socketChannel.connect(isa); } public void talk() throws IOException { try { // 通过socketChannel.socket()方法获得与SocketChannel关联的Socket对 象, // 然后从这个Socket中获得输出流与输入流,再一行行的发送和接受数据。 // 获得服务端响应信息的输入流 InputStream socketIn = socketChannel.socket().getInputStream(); BufferedReader br = new BufferedReader(new InputStreamReader( socketIn)); // 给服务端发送信息的输出流 OutputStream socketOut = socketChannel.socket().getOutputStream(); PrintWriter pw = new PrintWriter(socketOut, true); BufferedReader localReader = new BufferedReader( new InputStreamReader(System.in)); String msg = null; while ((msg = localReader.readLine()) != null){ pw.println(msg); logger.info(br.readLine()); if (msg.equals("bye")) break; } } catch (IOException e) { e.printStackTrace(); } finally { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new EchoClient05().talk(); } } 启动服务端和客户端,测试,无疑是成功的! 是不是感觉很别扭呀,明明两个 ServerSocket 和Socket 解决的问题,却创 建了一大堆的对象。呵呵。。。NIO编程刚开始的却是感觉不爽呀,不过慢慢就 85 适应啦。但如果使用 Mina 框架,你会发现,根本不需要关心什么 NIO,它已经 给你封装好啦。 3.1.6 基于 NIO的非阻塞服务器 待完善…… 3.1.7 多线程的基于 NIO的非阻塞服务器 NIO有效解决了多线程服务器存在的线程开销问题,但在使用上略显得复杂 一些。许多基于 NIO 的多线程服务器程序往往直接基于选择器(Selector)的 Reactor 模式实现。这种简单的事件机制对于较复杂的服务器应用,显然缺乏扩 展性和可维护性, 而且缺乏直观清晰的结构层次。因此, 3.2 异步操作分析 待完善…… 3.3 Mina内部实现分析 待完善…… 3.4 Mina的线程模型配置 先看官方文档的描述:(这里我就纯粹翻译一下吧,注意 Mina 的线程模型配 置是针对 Mina2.0 以前的版本而言的,使用 2.0以后版本的可以跳过) 1、禁止缺省的 ThreadModel 设置 MINA2.0 及以后版本已经没有 ThreadModel 了,如果使用这些版本的话,可 以跳过本节。 ThreadModel 设置是在 MINA1.0 以后引入的,但是使用 ThreadModel 增加了 配置的复杂性,推荐禁止掉缺省的 TheadModel 配置。 IoAcceptor acceptor = new SocketAcceptor(); SocketAcceptorConfig cfg = new SocketAcceptorConfig(); cfg.setThreadModel(ThreadModel.MANUAL);// 禁止掉ThreadModel的缺省配置 2、配置 I/O工作线程的数量 这节只是 NIO实现相关的,NIO数据包以及虚拟机管道等的实现没有这个配 置。 在MINA的NIO实现中,有三种 I/O工作线程: >>Acceptor 线程:接受进入连接,并且转给 I/O处理器线程来进行读写操作。 每一个 SocketAcceptor 产生一个 Acceptor 线程,线程的数目不能配置。 >>Connector 线程:尝试连接远程对等机,并且将成功的连接转给 I/O处理器线 程来进行读写操作。每一个 SocketConnector 产生一个 Connector 线程,这个的 数目也不可以配置。 >>I/O 处理器线程:执行实际上的读写操作直到连接关闭。每一个 SocketAcceptor 或SocketConnector 都产生它们自己的 I/O处理线程。这个数 目可以配置,缺省是 1。 因此,对于每个 IoService,可以配置的就是 I/O处理线程的数目。下面的 代码产生一个有四个 I/O处理线程的 SocketAcceptor: IoAcceptor acceptor = new SocketAcceptor(4,Executors.newCachedThreadPool()); 没有单凭经验来决定 I/O处理线程数目的方法,一般设置为当前服务器 CPU 86 个数+1: IoAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime() .availableProcessors() + 1, Executors.newCachedThreadPool()); 3、增加一个 ExecutorFilter 到IoFilterChain 中 ExecutorFilter 是一个IoFilter,用于将进入的 I/O事件转到一个 java.util.concurrent.Executor 实现。事件会从这个 Executor 转到下一个 IoFilter,通常是一个线程池。可以在 IoFilterChain 的任何地方增加任意数 目的ExecutorFilter,实现任何类型的线程模型,从简单的线程池到复杂的 SEDA。 到现在为止我们还没有增加 ExecutorFilter ,如果没有增加 ExecutorFilter,事件会通过方法调用转到一个 IoHandler,这意味着在 IoHandler 实现中的业务逻辑会在 I/O处理线程里运行。我们叫这种线程模型为 "单线程模型"。单线程模型可以用来就会低反应网络应用程序,受CPU限制的业 务逻辑(如,游戏服务器). 典型的网络应用需要一个 ExecutorFilter 插入到 IoFilterChain 中,因为 业务逻辑和 I/O处理线程有不同的资源使用模式。如果你用 IoHandler 的实现来 执行数据库操作,而没有增加一个 ExecutorFilter 的话,那么,你整个服务器 会在执行数据库操作的时候锁定,特别是数据库性能低的时候。下面的例子配置 一个 IoService 在一个新的 IoSession 建立时增加一个 ExecutorFilter。 cfg.getFilterChain().addLast("threadPool",new ExecutorFilter(Executors.newCachedThreadPool())); 如果 server 关闭,则 execute 也需要关闭。 使用一个 ExecutorFilter 通常不意味着要用一个线程池,对于 Executor 的实现没有任何限制。 4、应该把 ExecutorFilter 放在 IoFilterChain 的什么地方 这个要根据于具体应用的情况来定。如果一个应用有一个 ProtocolCodecFilter 实现和一个常用的有数据库操作的 IoHandler 实现的话, 那么就建议在 ProtocolCodecFilter 实现的后面增加一个 ExecutorFilter,这 是因为大部分的协议解码实现的性能特性是受 CPU限制的,和I/O处理线程是一 样的。 // Add CPU-bound job first cfg.getFilterChain().addLast( "codec", new ProtocolCodecFilter(new VamsCodecFactory(Charset.forName("utf-8"), true))); // and then a thread pool cfg.getFilterChain().addLast("threadPool",new ExecutorFilter(Executors.newCachedThreadPool())); 5、选择 IoService的线程池类型时要小心 Executors.newCachedThreadPool()经常是 IoService 首选的。因为如果使 用其它类型的话,可能会对 IoService 产生不可预知的性能方面的影响。一旦 池中的所有线程都在使用中,IoService 会在向池尝试请求一个线程时开始锁 定,然后会出现一个奇怪的性能下降,这有时是很难跟踪的。 87 6、不推荐 IoServices 和ExecutorFilters 共享一个线程池 你可以想让 IoServices 和ExecutorFilters 共享一个线程池,而不是一家 一个。这个是不禁止的,但是会出现很多问题,在这种情况下,除非你为 IoServices 建立一个缓冲线程池。 四. Mina 实例 实例 1:继承 CumulativeProtocolDecoder 类实现根据文本换行符编解码; 实例 2:根据协议编写 Mina应用程序; 请求格式 Syntax No. of Bits Identifier _descript (){ Descriptor tag 16 0x0008 descriptor length 32 从下一字节开始至末尾的数据长度 ID 32 节目的 ID Program_code_addr 32 Asset_code Program_code_len 8 EPIODES_addr 32 是第几集节目字符地址 EPIODES_length 8 长度 For(j=0;j<;j++){ byte_data 8 数据 } } 响应格式 Syntax No. of Bits _descript (){ Tag 16 数据结构标志(0x8008) Data_length 32 从下一字节开始至末尾的数据长度 Count 16 关联节目节目个数 for(i=0;i< count;i++){ Programme_Type 32 节目类型 0x01 标示电影 0x02 标示电视剧 0x03 标示新闻 0x04 标示时移 TitleID 32 节目的 ID Program_code_addr 32 Asset_code Program_code_len 8 EPIODES_addr 32 是第几集节目字符地址 EPIODES_length 8 长度 TotalTime 16 节目总时长 88 Offset_time 16 节目偏移时间(非新闻类该值为 0x0) name_addr 32 节目名称的地址 name_len 8 节目名称的字符串长度 StringID_addr 32 节目媒资 ID 地址 StringID_len 8 节目媒资 ID 长度 } For(j=0;j<;j++){ Byte_data 8 数据 } } 具体实现代码,参考下载的源码。 五. 后记 Socket 编程俗称 Java网络编程,是 Java Web开发的精髓!做 J2EE的人可 能很少关心多线程,NIO等等这些东西,但是不可否认它却实时与我们打交道, 比如常用的 web容器 Tomcat。
还剩87页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 8 金币 [ 分享pdf获得金币 ] 4 人已下载

下载pdf

pdf贡献者

daniu2003

贡献于2014-05-07

下载需要 8 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf