MINA使用心得及相关要点

sdshw 贡献于2013-01-25

作者 Lawnstein.Chan  创建于2013-01-08 04:02:00   修改者Administrator  修改于2013-01-08 04:02:00字数69179

文档摘要:笔者之前的工作主要是做 java 的 web 端开发,后因工作原因参与了一个国家级的大项目,主要负责其中底层通讯的前置机模块。几经波折,将该系统完成后,结果在第一轮的测试中就惨败退回。其根本原因就在于原设计文档的要求单“通信机”与“终端”(注一)之间的并发量要达到 2W 以上的连接通信,而实际运行并发量只能达到 2600 个相差了近十倍左右。经过代码调优、扩展 JVM 内存等等手段,但因基础数据相差过大,所取得的优化效果十分有限。后考虑在根本着手,只有更改整个系统的通信接口,才有可能达到设计文档上的要求。某天在某个技术 QQ 群里一次讨论中,有网友向我推荐了一个框架,这就是本文要介绍的主角 -MINA 。
关键词:

MINA使用心得及相关要点 目录 前言 What is MINA 使用案例: Apache 直属 MINA 的子项目: 同类框架: MINA 快速入门 预备知识: 资源下载: Hello world 的关键关键代码: MINA 深度了解 Mina 的应用层 MINA 的内部流程 选择 MINA 的理由 传统 socket 编程 改进的 Socket 编程 使用 MINA 框架的编程 扩展知识 IoBuffer 接口 sendUrgentData() 方法的使用 Concurrent 包下的一些类 结尾语     前言 笔者之前的工作主要是做 java 的 web 端开发,后因工作原因参与了一个国家级的大项目,主要负责其中底层通讯的前置机模块。几经波折,将该系统完成后,结果在第一轮的测试中就惨败退回。其根本原因就在于原设计文档的要求单“通信机”与“终端”(注一)之间的并发量要达到 2W 以上的连接通信,而实际运行并发量只能达到 2600 个相差了近十倍左右。经过代码调优、扩展 JVM 内存等等手段,但因基础数据相差过大,所取得的优化效果十分有限。后考虑在根本着手,只有更改整个系统的通信接口,才有可能达到设计文档上的要求。某天在某个技术 QQ 群里一次讨论中,有网友向我推荐了一个框架,这就是本文要介绍的主角 -MINA 。   注一:前置机分成了三个部分,其设计的结构图如下所示: What is MINA Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版中提供), MINA 所支持的功能也在进一步的扩展中 使用案例: 目前正在使用 MINA 的软件包括有: Apache Directory Project 、 AMQP ( Advanced Message Queuing Protocol )、 RED5 Server ( Macromedia Flash Media RTMP )、 ObjectRADIUS 、 Openfire 等等。 Apache 直属 MINA 的子项目: FTPServer , AsyncWeb , SSHD 其实在有人推荐了 MINA 之后,本人就上网 google 了一把,搜索一番下来之后才发现自己的眼界过于狭隘了,原来有相同功能的开源框架还真不少,看来以后得继续多泡论坛和 QQ 了(有正当理由上班泡坛子,聊 QQ 了,偷笑一个 ^_^ 。。。。。。) 同类框架: Netty2: 具有很好的构架,并且该框架非常有名,使用该框架的项目不少,这意味着开发团队持续更新的动力非常大。同时 Netty2 的文档非常齐全,并且支持 JMX 。 Netty2 的缺点就是实现代码的质量不是非常好,最大的缺点就是只支持普通的 TCP 。 Cindy :起源于 Netty2 之后,借鉴了 Netty2 中 MessageRecognizer 类的设计,在当前的版本中已经全面支持普通 TCP/Secure TCP/UDP 单播 /UDP 多播 /Pipe ,可以使用同一个模型进行同步 / 异步 IO 处理。 Cindy 目前缺点是文档相对较少以及应用的项目比较少。 Grizzl :的设计与一般的 nio 框架相比是比较不同的,主要不同点在于读和写都是采用 blocking 方式,并且使用临时 selector ;线程模型高度可配置。性能据说比 MINA 还高,但是学习曲线很高。 QuickServer: http://www.quickserver.org/ Xscocket :是一个轻量级的解决方案,核心思想是屏蔽,简化 nio 方式的的开发,并不需要过多的学习。 对于这些框架的基本使用和基础架构,本人都经过了一番研究,发现这些框架要么重者过重,要么轻者过轻。鉴于项目的规模及时间的紧迫,再三比较之下,本人还是选择了学习曲线低,性能优异的 MINA. 至于这些优点是如何体现出来的,本文以下内容将继续为您解读。 MINA 快速入门 闲话少说,借用大师级的写书经验,咱们先来一个 hello world 。当然在这前我们还是得先做一些准备的。 预备知识: JAVA NIO 多线程 Socket 以上知识对本文的阅读理解有一定帮助,但并非一定必需。   资源下载: MINA2.0 :暂时分为 1.x 和 2.x 两个主版本,本文只涉及 2.X 的版本,至于为什么只讲 2.X 而不讲 1.X ,比较冠冕堂皇的回答是 ---- 因为有一位伟人曾经说过:要以发展的眼光看世界。。。。。。而真实原因嘛。。。。。。咳咳。。。。大家都知道的。。我就不便多言了。下载地址:http://mina.apache.org/downloads.html。项目中使用的是 2.03, 截止本文发稿为止,最新版本为: 2.04 log4j :因为其中缺少 log4j 的包,所以做试验的朋友还需要去下一个 log4j 的包。 开发工具:eclipse Jdk: 1.6x 监视测试工具:Oracle JRockit Mission Control 4.0.1 强烈推荐,简称 JRMC ,开发过程中,用它解决了很多性能瓶颈的问题,具体使用方法,因为篇幅所限在此不做详述,请自行查询相关文档。 Hello world 的关键关键代码: Server 端的 Main 函数:       IoAcceptor acceptor = newNioSocketAcceptor(); // 建立监控器 //acceptor.getSessionConfig().setReadBufferSize(2048); //acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.getFilterChain().addLast("codec ",       New ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));//加载解/编码工厂 acceptor.setHandler(newStandBaowenHandler ()); // 设置处理类,其主要内容见下。 acceptor.bind(newInetSocketAddress(9998));// 绑定的监听端口,可多次绑定,也可同时绑定多个。   StandBaowenHandler 的关键代码 publicvoidmessageReceived(IoSession session, Object message) throwsException {           session.getService().getManagedSessions();               String baowenSrc = message.toString();// 原始报文               System.out.println(baowenSrc );        }   鉴于篇幅关系,类没有写全,更具体的内容,请大家参考 mina 压缩包里自带的 demo 。但实际上服务端需要手写的部分确实只有以上二块内容,服务端就算是写好了,由次可以看出 mina 的入门之快。现在我们来测试。打开 cmd( 这个对于同仁们来说,就不用详述了吧。。。 - - !! ) ,输入 telnet 127.0.0.1 9998 (这里的 9998 要与上面代码绑定的端口一致)随便输入一些字符,敲下回车后就会在控制台显示您所输入的信息了。 做得这一步的童鞋可以说对 MINA 的使用可以说已经初步入门了,是不是觉得太简单了?这也太假了吧,这就算入门了?呵呵。。。个人认为这就是 MINA 的强悍之一,简单的几行代码就搞定了一个服务端,要入门简直是太简单了。但任何事物都有其二面性,真要把一个东西学好,用好,还要不出错,我们需要了解的东西就太多太多了。下面我们就从 MINA 框架的底层说起,因为在实际应用当中,要解决一些碰到的难点及问题,对框架的整个运作体系必须要有个全面深入的了解。只有这样才能在碰到问题时,有目的,有针对性的去找到症结所在。 MINA 深度了解 Mina 的应用层                                          图 1   一个设计成熟的开源框架,总是会仅可能的减少侵入性,并在整个项目中找到合适的位置,而不应对整个项目的构架设计产生过多的影响,图 1 就是 MINA 的应用层示意图。从图中和上节的 DEMO 中我们可以看到, MINA 很好的把业务代码和底层的通信隔离了开来,我们要做的仅仅是建立好监听,然后写上我们需要实现的业务逻辑就 OK 了。   MINA 的内部流程                                        图 2 (1) IoService :这个接口在一个线程上负责套接字的建立,拥有自己的 Selector ,监听是否有连接被建立。 (2) IoProcessor :这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是说它也拥有自己的 Selector ,这是与我们使用 JAVA NIO 编码时的一个不同之处, 通常在 JAVA NIO 编码中,我们都是使用一个 Selector ,也就是不区分 IoService 与 IoProcessor 两个功能接口。另外, IoProcessor 也是 MINA 框架的核心组件之一 . 在 MINA 框架启动时,会用一个线程池来专门生成线程,来负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler 。在默认情况 IoProcessor 会用 N+1 个线程来轮流询问监视的端口是否有数据传送,其中 n 为 cpu 的内核个数。按一般的多线程设计概念来说, IoProcessor 的线程数是越多越好,但实际上并非如此,因为大家都知道, IO 的操作是非常占用资源的,所以项目中的 IoProcessor 的线程数应该根据实际需要来定,而这个数字可以在生成 IoAcceptor 对象时进行设定。 Eg IoAcceptor acceptor = newNioSocketAcceptor( N );   (3.) IoFilter :这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤 ( 参见之前代码示例的注释部份) ,甚至是在过滤器链中利用 AOP 写上权限控制(笔者负责的部分没有涉及到这权限部分,但根据笔者对框架的理解要实现这一点应该问题不大,有需要的可以自行试验)。数据的编码( write 方向)与解码( read 方向)等功能,其中数据的 encode 与 decode 是最为重要的、也是您在使用 Mina 时最主要关注的地方(笔者曾经为了 decode 的解码编写,重写了不下十几种实现方式才找到准确无误适合项目的解码方法)。 (4.) IoHandler :这个接口负责编写业务逻辑,也就是接收、发送数据的地方。只本文的代码实例中,可以看到真正的业务代码只有一句 :System.out.println(str); 真实项目中当然不可能如此简单,但如果大家把业务处理写好,并写好业务接口,真正要用时,呆需要在此处替换即可,再次见证了 MINA 分层的彻底。 说了这么多,以上内容也只能让大家对 MINA 有个基础的了解,对于 MINA 框架优势的认识可能还不是很多,那么下面的内容,就此展开对比讨论,以便大家对 MINA 的适用场景及优点有个更全面的了解。 选择 MINA 的理由 传统socket 编程 在传统 I/O 中,最简单实现高并发服务器的编程方式就是对每一个客户开启一个线程。但是这种方式有如下几个弊端: 客户端上限很大的情况下不能及时响应 服务器硬件资源受限,性能也会急剧下降 受制于操作系统的限制 但这种设计方式优点还是有的 : 编码简单,实现容易 一定数量的连接性能比较好。 笔者的项目开发中,最开始就是采用的这种方式,写起来方便,控制起来也方便,但遗憾的是 JVM 最多只能开到 2K 多的线程,就会报 - can not create new thread 的错误。   (实现结构图见下:) 图 3 (一对一的结构图。) 改进的Socket 编程 为了解决每一个线程一个连接的模型,笔者最开始想到用多个线程处理 N 个用户,这样既可以保证处理多个用户的同时,线程开销降到系统的临界点。 这样的方式与前一个模型优势在于同样的多线程,但线程数固定,充分运用系统的优势性能,又不存在多余的开销。但是缺点也是显而易见的: 轮询的消耗不可避免。 一但产生 io 阻塞,其阻塞的时间纯属浪费。 客户数量固定的时候没有前一模型响应快 编码更加复杂。 图 4( 一对多 ) 使用MINA 框架的编程 为了解决上述的矛盾,最终的解决方案只能是异步的 NIO, 而随着笔者对 JAVA  NIO 的研究发现,要实现异步的 NIO ,并应用到实际项目中,必须对 NIO 有着比较深刻的了解和把握,笔者曾尝试着利用 JAVA 原生 NIO 接口写了一个 DEMO (具体的使用方法,感兴趣的童鞋可以 GOOGLE 一把,你会发现用原生 NIO 写程序与使用 MINA 写程序对比起来是多么的痛苦。。。。 -_-!! ),但由于笔者在这方面的底子过薄,试验结果不如人意,但要对 NIO 进行更为深入的学习,时间上面也不允许。直到 MINA 框架的映入眼帘,以上难题不再是问题。。。。以下是利用 MINA 的实现方式的一个简图。                                        图 5   其中 IoService 接口会专门起一个线程来轮询是否有新的连接产生,一旦有连接产生则通知 IoProcessor, 而 IoProcessor 则起 n+1 个线程来检查连接是否有数据在上面读写 (注二) 。一旦有连接产生,并有数据读写,则通知 decode 或 ENCODE ,进行报文的解码或编码,将处理后的报文再交给业务类进行业务处理。其中 IoProcessor 是处理请求的分配,包括选择 Selector ,超时验证,状态记录等。总之这个类和 IoService 一起配合工作,封装了 NIO 底层的实现以及 MINA 框架内部的功能的支持 . 由于过于复杂,篇幅所限所以不作详细讲解 . 结合实例,并根据以上的图文讲解,我们可以很轻易的总结出利用 MINA 编程的几个大致步骤: 创建一个实现了 IoService 接口的类 设置一个实现了 IoFilter 接口的过滤器(如果有需要的情况下) 设置一个 IoHandler 接口实现的处理类,用于处理事件(必须) 对 IoService 绑定一个端口开始工作 关于 MINA 的大致运行流程及使用步骤,我们就暂时分析到这,具体更细节的关于一些核心类的使用方法及自定义编码器的方法,大家可以直接参考 mina 中所带的几个案例,写得非常详细,足够解决大家在项目中碰到的大部分问题,接下来要与大家交流的是使用 MINA 时非常有可能遇到的一些扩展知识。   注二 :这一点请特别注意,因 IoProcessor 也是相当于轮询机制,这导致在报文过长时,或其它原因导致报文不能一次传输完毕的情况下,必须保存同一连接 ( 在 MINA 中是以 IoSession 类生成的对象 ) 下的上一次状态,这样才能截取到一个完成的报文,而这也是 Decode( 编码器 ) 需要做的核心工作 , 新手往往就在这上面要跌跟斗。   扩展知识 这部分的内容要说起来跟 MINA 的使用关联不大,但实际情况是用上了 MINA 框架的项目基本上多多少少都会涉及到这一块,那就是多线程的编程。多线程的编程历来是 JAVA 编程中的重难点,很多新手碰到此类编程问题,往往都找不出原因所在,甚至一些有多年编程经验的程序员也会在这上面偶而犯下错,在这里笔者也没有能力通过很短的篇来解说多线程,那么就向大家介绍几个类及一些小常识吧,希望能给大家带来帮助。 MINA框架使用总结 参考:http://xinsync.xju.edu.cn/index.php/archives/category/prglang/java/mina 简单介绍:MINA框架是对java的NIO包的一个封装,简化了NIO程序开发的难度,封装了很多底层的细节,然开发者把精力集中到业务逻辑上来,最近做了一个相关的项目,为了备忘对MINA做一个总结。 下面这个start方法用来初始化MINA: Java代码   1. private void start(int port, WebContext ctx)    2.       throws IOException, InstantiationException   3.         , IllegalAccessException, ClassNotFoundException {   4.     //初始化Acceptor   5.     NioSocketAcceptor acceptor = new NioSocketAcceptor(5);   6.            7.        java.util.concurrent.Executor threadPool = Executors.newFixedThreadPool(1500);//建立线程池   8.        //加入过滤器(Filter)到Acceptor   9.        acceptor.getFilterChain().addLast("exector", new ExecutorFilter(threadPool));   10. acceptor.getFilterChain().addLast("codec",    11.        new ProtocolCodecFilter(new WebDecoder(),new WebEncoder()));   12.        LoggingFilter filter = new LoggingFilter();   13.     filter.setExceptionCaughtLogLevel(LogLevel.DEBUG);   14.     filter.setMessageReceivedLogLevel(LogLevel.DEBUG);   15.     filter.setMessageSentLogLevel(LogLevel.DEBUG);   16.     filter.setSessionClosedLogLevel(LogLevel.DEBUG);   17.     filter.setSessionCreatedLogLevel(LogLevel.DEBUG);   18.     filter.setSessionIdleLogLevel(LogLevel.DEBUG);   19.     filter.setSessionOpenedLogLevel(LogLevel.DEBUG);   20.     acceptor.getFilterChain().addLast("logger", filter);    21.            22.        acceptor.setReuseAddress(true);//设置的是主服务监听的端口可以重用   23.            24.        acceptor.getSessionConfig().setReuseAddress(true);//设置每一个非主监听连接的端口可以重用   25.        acceptor.getSessionConfig().setReceiveBufferSize(1024);//设置输入缓冲区的大小   26.        acceptor.getSessionConfig().setSendBufferSize(10240);//设置输出缓冲区的大小   27. //设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出   28.        acceptor.getSessionConfig().setTcpNoDelay(true);   29. //设置主服务监听端口的监听队列的最大值为100,如果当前已经有100个连接,再新的连接来将被服务器拒绝   30.        acceptor.setBacklog(100);   31.        acceptor.setDefaultLocalAddress(new InetSocketAddress(port));   32.        //加入处理器(Handler)到Acceptor   33.        acceptor.setHandler(new WebHandler());   34.     acceptor.bind();   35. }    NioSocketAcceptor是MINA的适配器,一切都是从这里开始的。MINA中有个过滤器和处理器的概念,过滤器用来过滤数据,处理 器用来处理数据。具体来说MINA的处理模型就是request->过滤器A->过滤器B->处理器->过滤器B->过滤 器A->response,这里的request和response类似serlvet的request和response。 Java代码   1. acceptor.getFilterChain().addLast("exector", new ExecutorFilter(threadPool));   2. //加入一个线程池到适配器,这里用的是jdk自带的线程池     Java代码   1.  acceptor.getFilterChain().addLast("codec",    2.         new ProtocolCodecFilter(new WebDecoder(),new WebEncoder()));   3. // 这里是处理逻辑的关键部位,请求的处理都是在WebDecoder类和WebEncoder类中处理,可以明显从命名上看出来一个是用来解码,另一个是用 来编码,requet过来后先进入WebDecoder类(实现了ProtocolDecoder接口)进行解码处理,这里可以加入自己的逻辑把传进来的 流解码成自己需要的信息。而WebEncoder类(实现了ProtocolEncoder接口)是进行编码,在这个类里面加入自己的逻辑把处理后的信息 组装发送给客户端(response)。而在解码和编码过程中WebHandler(扩展了IoHandlerAdapter抽象类)起到了处理器的作 用。   4. //request->WebDecoder->WebHandler->WebEncode->response     现在详细描述一下request->WebDecoder->WebHandler->WebEncode->response的过程: 客户端发送一个请求到MINA服务器,这里相当于来了一个requet。请求首先来到 Java代码   1. WebDecoder类(实现了ProtocolDecoder接口)中的    2. boolean decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception{}方法   3. /*  4. 参数in:用户请求信息全存在这里,读数据就从in这里读。  5. 参数out:用来输出处理后的数据到Filter的下一个过滤器,如果没有过滤器了就输出到WebHandler,这里有点和  6. servelt的过滤器类似。利用out.write(Object object);这个函数可以把数据传到下一个Filter。我们可以自己定义  7. 一个对象,我们假设为Request,用它来传递消息,那末这里就可以写成out.write(new RequsetMessage());  8. 如果这个方法返回false,就是说当前逻辑包还没接收完(也就是当前的IoBuffer并没有包含足够的数据),需要再次  9. 执行decode方法(再次获取新的IoBuffer),用来获取足够的数据。如果返回值为true就表示可以不执行decode方  10. 法了,但是要激活handler方法,必须要调用out.write方法。  11. public class RequestMessage{}//这里什么也不做  12. */    然后到 Java代码   1. WebHandler(扩展了IoHandlerAdapter抽象类)中的   2. void messageReceived(IoSession session, Object message) throws Exception{}方法   3. WriteFuture future = session.write(response);//session中必须加入这个代码,才会激活encode方法   4. future.addListener(IoFutureListener.CLOSE);//这个的作用是发送完毕后关闭连接,加了就是短连接,不然是长连接   5. IoFutureListener里面有个operationComplete(IoFuture future)方法,当流发送完成之后才调用这个方法。   6. /*  7. 参数message:用来获取Filter传递过来的对象.对应代码RequestMessage request = (RequestMessage) message;  8. 参数session:用来发送数据到Filter.对应代码session.write(new ResponseMessage());  9. public class ResponseMessage{}//这里什么也不做,假设存放处理后的数据  10. 注意:对于一个MINA程序而言,对于WebHandler类只生成一个对象,所以要考虑线程安全问题  11.  */     然后到 Java代码   1. WebEncoder类(实现了ProtocolEncoder接口)中的   2. boolean encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception{}    3. 方法   4. /*  5. 参数message:用来获取上一个Filter节点的数据或者处理器的数据(如果这个过滤器为最靠近处理器的那个)  6. ResponseMessage response = (ResponseMessage)message;  7. 参数out:用来输出数据到下一个Filter节点过或者到客户端,用out.write(Object encodedMessage)把数据发送  8. 出去,但是要注意的是,如果这个Filter下一个节点如果是客户端的话,那个这个encodedMessage数据必须为  9. IoBuffer类型的,可以利用IoBuffer.wrap(byte[] byteArray)这个方法来格式化输出数据  10. */     · NIO_TEST.rar (13.9 KB) · 描述: 阻塞和非阻塞io的简单通信程序,自己写的(写的不好)放这里做个备份 · 下载次数: 1092 mina使用UDP协议的小结 mina是一个优秀的网络应用框架,优点当然是多多的, 目前使用中,发生的一些问题主要集中在线程的注销上,发现mina自身线程的注销方面做得不是太好.这里总结下UDP协议上的一些线程注销上需要注意的东西. DatagramConnectorConfig对象:           config.setSessionRecycler(IoSessionRecycler.NOOP);       这里面实际上已经有一个超时重发的线程被mina丢掉了,但是它还在运行. 改进方法:  ((ExpiringSessionRecycler)config.getSessionRecycler()).stopExpiring();                 config.setSessionRecycler(IoSessionRecycler.NOOP);       先禁掉系统默认的超时重发线程.   config的ThreadModel里的Executor也不能自己注销的,注销时,需要这样: ((ThreadPoolExecutor)((ExecutorThreadModel)config.getThreadModel()).getExecutor()).shutdown();     DatagramConnector对象:     connector = new DatagramConnector(); 这里面也有一个NewThreadExecutor线程处理对象在运行,也是关不掉的.   改进方法: ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5,     new TaskThreadFactory("protocol-pool-"),     new ThreadPoolExecutor.DiscardOldestPolicy());     connector = new DatagramConnector(threadPoolExecutor); 自己定义连接使用的线程池,自己可以控制注销线程池.   ByteBuffer对象: 这个对象里有这么一个东西     private static ByteBufferAllocator allocator = new PooledByteBufferAllocator();   这个东西里面有一个糟糕的东西 PooledByteBufferAllocator里有一个Expirer对象,它也是一个死线程,mina没有直接的方法可以停掉它,在注销时,可以用下面这种笨办法让mina处理掉自己   ByteBuffer.setAllocator(new ByteBufferAllocator(){    public ByteBuffer allocate(int capacity, boolean direct) {     return null;    }    public void dispose() {    }    public ByteBuffer wrap(java.nio.ByteBuffer nioBuffer) {     return null;    }});   一般应用是可以不管这些东西的. 基于MINA构建简单高性能的NIO应用 前言 MINA是Trustin Lee最新制作的Java通讯框架。通讯框架的主要作用是封装底层IO操作,提供高级的操作API。比较出名的通讯框架有C++的ACE、Python的 Twisted,而Java的通讯框架还有QuickServer、Netty2、Cindy、Grizzly等。 2004年6月,Trustin Lee发布了一个通讯框架Netty2,是Java界第一个事件模型架构的通讯框架,Cindy也从中借鉴了不少思想。由于Netty2的架构不是很 好,Trustin Lee在2004年底加入Apache Directory组之后,重写了整个框架,取名为MINA。MINA是一个基于Java NIO的通讯框架,Java从1.4开始引入NIO,提供了一个非阻塞、高性能的IO底层。 目前使用MINA的产品并不是很多,比较出名的就有Apache Directory、Openfire(Jive出品的一个XMPP产品)、red5(研究flash流媒体flv技术的朋友应该很清楚这个东 西,adobe fms的竞争者,国内也有视频网站在使用)等等。 笔者在07年初的时候,公司新项目需要用Java实现一个Socket Server,对比了Netty2、Cindy、QuickServer和MINA。当时Netty2已经停止开发,也找不到官方网站和代码,比较了另外 三个框架之后,毅然选择了当时文档比较缺乏和使用群较少的MINA,一年以来的使用经验来看,感觉还是很不错的,MINA有着清晰的架构,很方便做自定义 的扩充。在1.0发布之后,官方网站充实了很多,增加了不少文档,也听到越来越多的朋友开始使用MINA。后来专门针对JDK 1.5发布了1.1的版本,使用JDK内置的concurrent代替backport-util-concurrent。目前1.0和1.1同时存在, 但已经不再增加新功能,仅仅发布bug fix的版本,新功能都在2.0中实现,2.0调整了架构,性能有更大的提升,目前还在开发中。 基本特性 通过Java NIO支持TCP和UDP协议,另外还支持RS232和VM内通讯。由于MINA有清晰的架构,你也能很简单地实现一个底层网络协议。目前不支持阻塞 IO,似乎还没有计划支持,当然你可以在其之上实现一个阻塞的模型,不过按照笔者的经验来说,非阻塞IO更适合Server端编程。 一个类似ServletFilter的过滤器模型。这是笔者认为MINA的精髓所在,通过引入过滤器模型,可以将一些非业务的功能独立开来,层次更清晰,很有AOP的思想,可以很方便地进行日志、协议转换、压缩等等功能,还能在运行中动态增加或去掉功能。 可以直接使用底层的ByteBuffer,也可以使用用户定义的消息Object和编码方式。 高度可定制的线程模型,单线程、一个线程池,或者类似SEDA的多个线程池。 SSL支持,攻击防御和流量控制,mock测试友好,JMX支持,Spring集成,你还需要更多吗。 一个简单的例子 MINA使用非常简单,笔者以前做过一段时间传统的Java Socket开发,不过一直对Java NIO不是很理解,但是MINA很快就上手了,MINA封装了NIO繁琐的部分,使你可以更专注于业务功能实现。话不多说,让我们来看一个简单的例子,一 个很常见的例子,时间服务器。 我们的实现目标是一个能响应多个客户端的请求,然后返回服务器当前的系统时间的功能。传统的Java Socket程序,我们需要每accept一个客户端连接,就创建一个新的线程来响应,这会令到系统整体负载能力有较大的限制,而且我们必须手工编写连接 管理等代码。让我们来看看MINA是怎么处理的。 首先我们从官方网站下载MINA 1.1,这里我们假设JDK为1.5以上的版本,如果你使用的是JDK 1.4,请下载MINA 1.0,MINA 1.0跟1.1几乎一样,但是强烈建议使用JDK 1.5以上以获得更好的性能。 解开压缩包之后,能看见很多jar包,这里暂不介绍每个包的具体作用,可以把所有包都导入项目。值得留意的是MINA使用了一个slf4j的日志库,该日志库大有取缔common-logging之势。 这里是我们的主程序,非常简单。 首先我们需要一个IoAcceptor,这里我们选择了一个SocketAcceptor,也就是TCP协议。 然后,我们给应用加上日志过滤器和协议编码过滤器。 最后,我们把acceptor bind到本机的8123端口,并且使用TimeServerHandler来实现协议。 TimeServerHandler是我们实现具体业务功能的地方。 IoHandlerAdapter提供了7个事件方法,我们要做的事情仅仅是挑选我们需要做出响应的事件进行重载。在我这个例子了,我重载了两个方法。 sessionCreated会在客户端连接的时候调用,通常我们会在这里进行一些初始化操作,我这里仅仅是打印一条信息。 messageReceived就是整个Handler的中心部分,每一个从客户端发过来的消息都会转化成对该方法的调用。由于我们加入了协议编码过滤 器,因此这里获得的Object msg是一个String,而不是默认的ByteBuffer(下文会详细介绍ProtocolCodecFilter)。这里我们实现了一个很简单的业 务功能,如果用户输入的是quit,就断开连接,否则就输入当前时间。可以看出,IoSession封装了对当前连接的操作。 至此,我们就实现了一个时间服务器。 01.public class TimeServer { 02.  public static void main(String[] args) throws IOException { 03.    IoAcceptor acceptor = new SocketAcceptor(); 04.   05.    SocketAcceptorConfig cfg = new SocketAcceptorConfig(); 06.    cfg.getFilterChain().addLast( "logger", new LoggingFilter() ); 07.    cfg.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory())); 08.   09.    acceptor.bind( new InetSocketAddress(8123), new TimeServerHandler(), cfg); 10.    System.out.println("Time server started."); 11.  } 12.} 01.public class TimeServerHandler extends IoHandlerAdapter { 02.  public void messageReceived(IoSession session, Object msg) throws Exception { 03.    String str = (String) msg; 04.    if( "quit".equalsIgnoreCase(str) ) { 05.    session.close(); 06.    return; 07.  } 08.   09.  Date date = new Date(); 10.  session.write( date.toString() ); 11.  System.out.println("Message written..."); 12.  } 13.   14.  public void sessionCreated(IoSession session) throws Exception { 15.  System.out.println("Session created..."); 16.  } 17.} MINA架构 这里,我借用了一张Trustin Lee在Asia 2006的ppt里面的图片来介绍MINA的架构。 Remote Peer就是客户端,而下方的框是MINA的主要结构,各个框之间的箭头代表数据流向。 大家可以对比刚刚的例子来看这个架构图,IoService就是整个MINA的入口,负责底层的IO操作,客户端发过来的消息就是由它处理。刚刚我们使用 的IoAcceptor就是一个IoService,之所以抽象成IoService,是因为MINA用同样的架构来处理服务器和客户端编程,IoService的另一个子类就是IoConnector,用于客户端。不过根据笔者的使用经验,使用非阻塞的模型进行客户端编程非常的不方便,你最好寻求其他的阻塞通讯框架。 IoService把数据转化成一个一个的事件,传递给IoFilterChain。你可以加入一连串的IoFilter,进行各种功能。笔者的建议是将一些功能性的,业务不相关的代码,用IoFilter来实现,使得整个应用结构更清晰,也方便代码重用。 被IoFilter处理过的事件,发送给 IoHandler,然后我们在这里实现具体的业务逻辑。这个部分很简单,如果你有Swing的使用经验的话,你会发现它跟Swing的事件非常相像,你 要做的事情,仅仅是重载你需要的方法,然后编写具体的业务功能。在这其中,最重要的一个方法就是messageReceived了。 值得留意的是一个IoSession的类,每一个IoSession实例代表这一个连接,我们需要对连接进行的任何操作都通过这个类来实现。 从IoHandler通过调用IoSession.write等方法向客户端发送的消息,会通过跟输入数据相反的次序依次传递,直至由IoService负责把数据发送给客户端。 这就已经是MINA的全部,是不是很简单。 接下来,我会详细介绍我们编写具体代码的时候主要涉及到的三个类,IoHandler、IoSession和IoFilter。 IoHandler MINA的内部实现了一个事件模型,而IoHanlder则是所有事件最终产生响应的位置。每一个方法的名字很明确表明该事件的含义。 messageReceived是接收客户端消息的事件,我们应该在这里实现业务逻辑。messageSent是服务器发送消息的事件,一般情况下我们不 会使用它。sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。值得留意的是,客户端连接有两个事 件,sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能 放在sessionOpened事件中。 细心的读者可能会发现,我们刚刚的例子继承的是IoHandlerAdapter,IoHandlerAdapter其实就是一个IoHanlder的空的实现,这样我们就可以不用重载不感兴趣的事件。 IoSession IoSession是一个接口,MINA里很多的地方都使用接口,很好地体现了面向接口编程的思想。它提供了对当前连接的操作功能,还有用户定义属性的存 储功能,这点非常重要。IoSession是线程安全的,也就是我们能够在多线程环境中随意操作IoSession,这点给开发带来很大的好处。我们来看 看具体提供的方法,笔者列举一些比较常用和重要的方法 在这里,笔者把IoSession的方法大致分成三类 第一类,连接操作功能。 最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,但是实际上应该传入什么类型呢?具体还得看你是 否使用了ProtocolCodecFilter(下面会详细介绍),如果使用了ProtocolCodecFilter,那这个message将可能是 一个String,或者是一个用户定义的JavaBean。默认的情况,message是一个ByteBuffer。ByteBuffer是MINA的一 个类,跟java.nio.ByteBuffer类同名,MINA 2.0将会将它改成IoBuffer,以避免讨论上的误会。 另一个值得留意的是Future类,MINA是一个非阻塞的通信框架,其中一个明显的体现就是调用IoSession.write方法是不会阻塞的。用户 调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就不得而知了。当然,实际上调用了write之后,数据几乎是立刻发出的,这 得益与NIO的高性能。但是,如果我们必须确认了消息发出,然后进行某些处理,我们就需要使用Future类,以下是一个很常见的代码   通过调用future.join,程序就会阻塞,直至消息处理结束。我们还能通过future.isWritten得知消息是否成功发送。 在这里,笔者顺便说一个实际使用的发现,消息发送是会自动合并的,简单来说,如果在很短的时间里,对同一个IoSession进行了两次write操作, 客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销,而笔者的实际使用过程中,效果 也相当好。但是如果这样行为会导致客户端工作不正常,你也可以通过参数关闭它。 第二类,属性存储操作。 通常来说,我们的系统是有用户状态的,我们就需要在连接上存储用户属性,IoSession的Attribute就是这样一个功能。例如两个连接同时连入 服务器,一个连接是用户A,用户ID是13,另一个连接是用户B,用户ID是14,我们就可以在用户登录成功之后,调用 IoSession.setAttribute(“login_id”,13),然后在其后的操作中,通过 IoSession.getAttribute(“login_id”)获得当前登录用户ID,并进行相应的操作。简单来说,就是一个类似 HttpSession的功能,当然具体的实现方法不一样。 第三类,连接状态。 这里就不多说了,从方法名上我们就能知道它具体的功能。 IoFilter 过滤器是MINA的一个很重要的功能。IoFilter也是一个接口,但是相对比较复杂,这里就不列举它的方法了。简单来说IoFilter就像 ServletFilter,在事件被IoHandler处理之前或之后进行一些特定的操作,但是它比ServletFilter复杂,可以处理很多种事 件,除了包括IoHandler的7个事件以外,还有一些内部的事件可以进行操作。 MINA提供了一些常用的IoFilter实现,例如有LoggingFilter(日志功能)、BlacklistFilter(黑名单功能)、 CompressionFilter(压缩功能)、SSLFilter(SSL支持),这些过滤器比较简单,通过阅读它们的源代码,能够更进一步理解过滤 器的实现。笔者在这里要重点介绍两个过滤器,ProtocolCodecFilter和ExecutorFilter ProtocolCodecFilter 网络传输的内容其实本质是一个二进制流,但是我们的业务功能不会,或者说不应该去直接操作二进制流。MINA默认向IoHandler传入的 message是一个ByteBuffer,如果我们直接在IoHandler操作ByteBuffer,会导致大量协议分析的代码和实际的业务代码混杂 在一起。最适合的做法,就是在IoFilter把ByteBuffer转换成String或者JavaBean,ProtocolCodecFilter 正是这样的一个功能的过滤器。 使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们 需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工 作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。 ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。TextLineCodecFactory就是 String跟ByteBuffer的转化,说白了就是文本,例如你要实现一个SMTP服务器,或者POP服务器,就可以使用它。而笔者的实际使用,大多 数情况都是使用 TextLineCodecFactory。 这里提及一下IoFilter的顺序问题,IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入 ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。实际使用的时候,一定要处理好 过滤器的顺序。 ExecutorFilter 另一个重要的过滤器就是ExecutorFilter。这里,我需要先说明一下MINA的线程工作模式,MINA默认是单线程处理所有客户端的消息,也就 是说,即使你在一台8CPU的机器上面跑,可能也只用到一个CPU,另外,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。很多朋友抱 怨MINA的性能差,其实是因为他们没有加入ExecutorFilter的缘故。ExecutorFilter设计的很精巧,大家可以仔细阅读一下源代 码,它会将同一个连接的消息合并起来按顺序调用,不会出现两个线程同时处理同一个连接的情况。 1.IoAcceptor acceptor = ...; 2.IoServiceConfig acceptorConfig = acceptor.getDefaultConfig(); 3.acceptorConfig.setThreadModel(ThreadModel.MANUAL); 这里再次提及IoFitler的顺序问题,一般情况下,我们会将ExecutorFilter放在ProtocolCodecFilter之后,因为我们 不需要多线程地执行ProtocolCodec操作,用单一线程来进行ProtocolCodec性能会比较高,而具体的业务逻辑可能还设计数据库操作, 因此更适合放在不同的线程中运行。 优化指南 MINA默认配置的性能并不是很高的,部分原因是MINA目前还保留初期版本的架构,另外一个原因是因为JVM的发展。 1.IoAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 首先我们关闭默认的ThreadModel设置 ThreadModel是一个很简单的线程实现,用于IoService。但是它实在太弱,以至于在并发环境产生大量问题。在MINA 2.0中,ThreadModel直接被取消。你应该使用ExecutorFilter来实现线程。 1.acceptor.getDefaultConfig().getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()); 然后我们增加I/O处理线程 每一个Acceptor/Connector都使用一个线程来处理连接,然后把连接发送给I/O processor进行读写操作,我们只可以修改I/O processor使用的线程数,用以下代码设置 当然是要将ExecutorFilter加入,上文已经很详细地描述了 笔者在开发过程中,多次遇到OutOfMemoryError,经过研究之后才发现原因。MINA默认是使用direct memory实现ByteBuffer池的方案(以下简称direct buffer),通过JNI在内存开辟一段空间来使用,该方案在早期的MINA版本中是一个非常好的特性,那是因为MINA开发初期,JVM并没有现在的 强大,带有池效果的direct buffer性能比较好。但是当我们使用-Xms -Xmx等指令增加JVM可使用的内存,那仅仅增加了堆的内存空间,而direct memory的空间并没有增加,导致MINA实际使用的时候经常出现OutOfMemoryError。如果你的确想使用direct memory,可以通过-XX:MaxDirectMemorySize选项来设置。不过笔者不建议这样做,因为最新的测试表明,在现代的JVM里 面,direct memory比堆的表现更差。这里可能有读者会觉得奇怪,为什么不用池,而要用堆呢,而且还需要gc。那是因为现在的JVM gc能力已经很强了,而且在并发环境里面,pool的同步也是一个性能的问题。我们可以通过这样的代码进行设置 MINA 2.0已经默认把直接内存分配改成堆,为了提供最好的性能和稳定性。 1.ByteBuffer.setUseDirectBuffers(false); 2.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); 最后一条优化技巧就是,把你的应用部署在 Linux上, 并且打开Java NIO使用Linux epoll的功能。可能你还没听过epoll,但是你应该听过Lighttpd、Nginx、Squid等,得益于epoll,它们提供很高的网络性能, 还占用非常少的系统资源。JDK6已经默认把epoll配置打开,因此笔者建议把你的应用部署在JDK6上面,也同时因为JDK6还有别的优化特性。如果 你的应用必须部署在JDK5上,你也可以通过参数把epoll支持打开。 使用Mina传输Java对象 下面是所要传输的实体类UserInfo.java 01 package com.mina.model; 02   03 import java.io.Serializable; 04   05 /** 06  * @see Mina传输的实体类,要求其实现Serializable接口 07  */ 08 @SuppressWarnings("serial") 09 public class UserInfo implements Serializable{ 10     private String name; 11       12     public UserInfo(String name){ 13         this.name = name; 14     } 15       16     public String getName() { 17         return name; 18     } 19 } 下面是Mina编写的服务端主类MyServer.java 01 package com.mina.server; 02   03 import java.io.IOException; 04 import java.net.InetSocketAddress; 05   06 import org.apache.mina.core.service.IoAcceptor; 07 import org.apache.mina.core.session.IdleStatus; 08 import org.apache.mina.filter.codec.ProtocolCodecFilter; 09 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 10 import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 11   12 public class MyServer { 13     public static void main(String[] args) throws IOException { 14         int bindPort = 9876; 15           16         IoAcceptor acceptor = new NioSocketAcceptor(); 17           18         acceptor.getSessionConfig().setReadBufferSize(2048); 19         acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 20           21         //设定服务器解析消息的规则是以Object对象为单位进行传输,注意此时该对象需实现Serializable接口 22         acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); 23           24         acceptor.setHandler(new ServerHandler()); 25           26         acceptor.bind(new InetSocketAddress(bindPort)); 27           28         System.out.println("MinaServer is startup, and it`s listing on := " + bindPort); 29     } 30 } 下面是服务端的消息处理器ServerHandler.java 01 package com.mina.server; 02   03 import org.apache.mina.core.service.IoHandlerAdapter; 04 import org.apache.mina.core.session.IoSession; 05   06 import com.mina.model.UserInfo; 07   08 public class ServerHandler extends IoHandlerAdapter { 09     @Override 10     public void messageReceived(IoSession session, Object message) throws Exception { 11         UserInfo ui = (UserInfo)message; //我们已设定了服务器解析消息的规则是以UserInfo对象为单位进行传输 12         System.out.println("收到客户机发来的用户名:" + ui.getName()); 13         session.write(new UserInfo(ui.getName() + "==>>是个神秘的人")); 14     } 15       16     @Override 17     public void sessionOpened(IoSession session) throws Exception{ 18         System.out.println("InComing Client:" + session.getRemoteAddress()); 19     } 20 } 接下来是Mina编写的客户端主类MyClient.java 01 package com.mina.client; 02   03 import java.net.InetSocketAddress; 04   05 import org.apache.mina.core.service.IoConnector; 06 import org.apache.mina.filter.codec.ProtocolCodecFilter; 07 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 08 import org.apache.mina.transport.socket.nio.NioSocketConnector; 09   10 import com.mina.model.UserInfo; 11   12 public class MyClient { 13     public static void main(String[] args) { 14         IoConnector connector = new NioSocketConnector(); 15           16         connector.setConnectTimeoutMillis(30000); 17           18         connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); 19           20         connector.setHandler(new ClientHandler(new UserInfo("张起灵"))); 21           22         connector.connect(new InetSocketAddress("127.0.0.1", 9876)); 23     } 24 } 最后是客户端的消息处理器ClientHandler.java 查看源码 打印? 01 package com.mina.client; 02   03 import org.apache.mina.core.service.IoHandlerAdapter; 04 import org.apache.mina.core.session.IoSession; 05   06 import com.mina.model.UserInfo; 07   08 public class ClientHandler extends IoHandlerAdapter { 09     private final UserInfo ui; 10       11     public ClientHandler(UserInfo ui){ 12         this.ui = ui; 13     } 14   15     @Override 16     public void sessionOpened(IoSession session) throws Exception { 17         session.write(ui); 18     } 19       20     @Override 21     public void messageReceived(IoSession session, Object message) throws Exception { 22         UserInfo ui = (UserInfo)message; 23         System.out.println("收到服务机发来的用户名:" + ui.getName()); 24     } 25   26     @Override 27     public void exceptionCaught(IoSession session, Throwable cause) throws Exception { 28         System.out.println("与" + session.getRemoteAddress() + "通信过程中出现错误:[" + cause.getMessage() + "]..连接即将关闭...."); 29         session.close(false); 30         session.getService().dispose(); 31     } 32 } 使用mina 作代理服务器例子 Main 01 importjava.net.InetSocketAddress; 02   03 importorg.apache.mina.core.service.IoConnector; 04 importorg.apache.mina.transport.socket.nio.NioSocketAcceptor; 05 importorg.apache.mina.transport.socket.nio.NioSocketConnector; 06   07 /** 08  * (Entry point) Demonstrates how to write a very simple tunneling proxy 09  * using MINA. The proxy only logs all data passing through it. This is only 10  * suitable for text based protocols since received data will be converted into 11  * strings before being logged. 12  *

13  * Start a proxy like this:
14  * org.apache.mina.example.proxy.Main 12345 www.google.com 80
15  * and open http://localhost:12345 in a 16  * browser window. 17  *

18  * 19  * @author The Apache MINA Project (dev@mina.apache.org) 20  * @version $Rev$, $Date$ 21  */ 22 publicclassMain { 23   24     publicstaticvoidmain(String[] args) throwsException { 25         if(args.length != 3) { 26             System.out.println(Main.class.getName() 27                     + " "); 28             return; 29         } 30   31         // Create TCP/IP acceptor. 32         NioSocketAcceptor acceptor = newNioSocketAcceptor(); 33   34         // Create TCP/IP connector. 35         IoConnector connector = newNioSocketConnector(); 36   37         // Set connect timeout. 38         connector.setConnectTimeoutMillis(30*1000L); 39   40         ClientToProxyIoHandler handler = newClientToProxyIoHandler(connector, 41                 newInetSocketAddress(args[1], Integer.parseInt(args[2]))); 42   43         // Start proxy. 44         acceptor.setHandler(handler); 45         acceptor.bind(newInetSocketAddress(Integer.parseInt(args[0]))); 46   47         System.out.println("Listening on port "+ Integer.parseInt(args[0])); 48     } 49   50 } [代码] ServerToProxyIoHandler 1 /** 2  * Handles the server to proxy part of the proxied connection. 3  * 4  * @author The Apache MINA Project (dev@mina.apache.org) 5  * @version $Rev$, $Date$ 6  * 7  */ 8 publicclassServerToProxyIoHandler extendsAbstractProxyIoHandler { 9 } [代码] AbstractProxyIoHandler 01 importjava.nio.charset.Charset; 02   03 importorg.apache.mina.core.buffer.IoBuffer; 04 importorg.apache.mina.core.service.IoHandlerAdapter; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.slf4j.Logger; 07 importorg.slf4j.LoggerFactory; 08   09 /** 10  * Base class of {@link org.apache.mina.core.service.IoHandler} classes which handle 11  * proxied connections. 12  * 13  * @author The Apache MINA Project (dev@mina.apache.org) 14  * @version $Rev$, $Date$ 15  * 16  */ 17 publicabstractclassAbstractProxyIoHandler extendsIoHandlerAdapter { 18     privatestaticfinalCharset CHARSET = Charset.forName("iso8859-1"); 19     publicstaticfinalString OTHER_IO_SESSION = AbstractProxyIoHandler.class.getName()+".OtherIoSession"; 20   21     privatefinalLogger logger = LoggerFactory.getLogger(getClass()); 22       23     @Override 24     publicvoidsessionCreated(IoSession session) throwsException { 25         session.suspendRead(); 26         session.suspendWrite(); 27     } 28   29     @Override 30     publicvoidsessionClosed(IoSession session) throwsException { 31         if(session.getAttribute( OTHER_IO_SESSION ) != null) { 32             IoSession sess = (IoSession) session.getAttribute(OTHER_IO_SESSION); 33             sess.setAttribute(OTHER_IO_SESSION, null); 34             sess.close(false); 35             session.setAttribute(OTHER_IO_SESSION, null); 36         } 37     } 38   39     @Override 40     publicvoidmessageReceived(IoSession session, Object message) 41             throwsException { 42         IoBuffer rb = (IoBuffer) message; 43         IoBuffer wb = IoBuffer.allocate(rb.remaining()); 44         rb.mark(); 45         wb.put(rb); 46         wb.flip(); 47         ((IoSession) session.getAttribute(OTHER_IO_SESSION)).write(wb); 48         rb.reset(); 49         logger.info(rb.getString(CHARSET.newDecoder())); 50     } 51 } [代码] ClientToProxyIoHandler 01 importjava.net.SocketAddress; 02   03 importorg.apache.mina.core.RuntimeIoException; 04 importorg.apache.mina.core.future.ConnectFuture; 05 importorg.apache.mina.core.future.IoFutureListener; 06 importorg.apache.mina.core.service.IoConnector; 07 importorg.apache.mina.core.session.IoSession; 08   09 /** 10  * Handles the client to proxy part of the proxied connection. 11  * 12  * @author The Apache MINA Project (dev@mina.apache.org) 13  * @version $Rev$, $Date$ 14  * 15  */ 16 publicclassClientToProxyIoHandler extendsAbstractProxyIoHandler { 17     privatefinalServerToProxyIoHandler connectorHandler = newServerToProxyIoHandler(); 18   19     privatefinalIoConnector connector; 20   21     privatefinalSocketAddress remoteAddress; 22   23     publicClientToProxyIoHandler(IoConnector connector, 24             SocketAddress remoteAddress) { 25         this.connector = connector; 26         this.remoteAddress = remoteAddress; 27         connector.setHandler(connectorHandler); 28     } 29   30     @Override 31     publicvoidsessionOpened(finalIoSession session) throwsException { 32   33         connector.connect(remoteAddress).addListener(newIoFutureListener() { 34             publicvoidoperationComplete(ConnectFuture future) { 35                 try{ 36                     future.getSession().setAttribute(OTHER_IO_SESSION, session); 37                     session.setAttribute(OTHER_IO_SESSION, future.getSession()); 38                     IoSession session2 = future.getSession(); 39                     session2.resumeRead(); 40                     session2.resumeWrite(); 41                 } catch(RuntimeIoException e) { 42                     // Connect failed 43                     session.close(true); 44                 } finally{ 45                     session.resumeRead(); 46                     session.resumeWrite(); 47                 } 48             } 49         }); 50     } 51 } MINA 实现的简易 HTTP 服务器 [Java]代码 01 publicclassTest { 02     publicstaticvoidmain(String[] args) throwsIOException { 03         HttpServer httpServer; 04         httpServer = HttpServer.create(newInetSocketAddress(81), 5); 05         httpServer.createContext("/", newHandler()); 06         httpServer.start(); 07     } 08   09     staticclassHandler implementsHttpHandler { 10         publicvoidhandle(HttpExchange exchange) throwsIOException { 11             Headers requestHeaders = exchange.getRequestHeaders(); 12             Headers responseHeaders = exchange.getResponseHeaders(); 13             responseHeaders.set("Content-Type", "text/plain"); 14             exchange.sendResponseHeaders(200, 0L); 15             OutputStream responseBody = newBufferedOutputStream(exchange.getResponseBody(), 64*1024); 16             responseBody.write("Hello!".getBytes()); 17             responseBody.close(); 18             exchange.close(); 19         } 20     } 21 } Java 通过 HTTP 下载文件 Download.java 001 packagecore.spider; 002   003 importjava.io.*; 004 importjava.net.*; 005 importjava.util.*; 006   007 // This class downloads a file from a URL. 008 classDownload extendsObservable implementsRunnable { 009       010     // Max size of download buffer. 011     privatestaticfinalintMAX_BUFFER_SIZE = 1024; 012       013     // These are the status names. 014     publicstaticfinalString STATUSES[] = {"Downloading", 015     "Paused", "Complete", "Cancelled", "Error"}; 016       017     // These are the status codes. 018     publicstaticfinalintDOWNLOADING = 0; 019     publicstaticfinalintPAUSED = 1; 020     publicstaticfinalintCOMPLETE = 2; 021     publicstaticfinalintCANCELLED = 3; 022     publicstaticfinalintERROR = 4; 023       024     privateURL url; // download URL 025     privateintsize; // size of download in bytes 026     privateintdownloaded; // number of bytes downloaded 027     privateintstatus; // current status of download 028       029     // Constructor for Download. 030     publicDownload(URL url) { 031         this.url = url; 032         size = -1; 033         downloaded = 0; 034         status = DOWNLOADING; 035           036         // Begin the download. 037         download(); 038     } 039       040     // Get this download's URL. 041     publicString getUrl() { 042         returnurl.toString(); 043     } 044       045     // Get this download's size. 046     publicintgetSize() { 047         returnsize; 048     } 049       050     // Get this download's progress. 051     publicfloatgetProgress() { 052         return((float) downloaded / size) * 100; 053     } 054       055     // Get this download's status. 056     publicintgetStatus() { 057         returnstatus; 058     } 059       060     // Pause this download. 061     publicvoidpause() { 062         status = PAUSED; 063         stateChanged(); 064     } 065       066     // Resume this download. 067     publicvoidresume() { 068         status = DOWNLOADING; 069         stateChanged(); 070         download(); 071     } 072       073     // Cancel this download. 074     publicvoidcancel() { 075         status = CANCELLED; 076         stateChanged(); 077     } 078       079     // Mark this download as having an error. 080     privatevoiderror() { 081         status = ERROR; 082         stateChanged(); 083     } 084       085     // Start or resume downloading. 086     privatevoiddownload() { 087         Thread thread = newThread(this); 088         thread.start(); 089     } 090       091     // Get file name portion of URL. 092     privateString getFileName(URL url) { 093         String fileName = url.getFile(); 094         returnfileName.substring(fileName.lastIndexOf('/') + 1); 095     } 096       097     // Download file. 098     publicvoidrun() { 099         RandomAccessFile file = null; 100         InputStream stream = null; 101           102         try{ 103             // Open connection to URL. 104             HttpURLConnection connection = 105                     (HttpURLConnection) url.openConnection(); 106               107             // Specify what portion of file to download. 108             connection.setRequestProperty("Range", 109                     "bytes="+ downloaded + "-"); 110               111             // Connect to server. 112             connection.connect(); 113               114             // Make sure response code is in the 200 range. 115             if(connection.getResponseCode() / 100!= 2) { 116                 error(); 117             } 118               119             // Check for valid content length. 120             intcontentLength = connection.getContentLength(); 121             if(contentLength < 1) { 122                 error(); 123             } 124               125             // Set the size for this download if it hasn't been already set. 126             if(size == -1) { 127                 size = contentLength; 128                 stateChanged(); 129             } 130               131             // Open file and seek to the end of it. 132             file = newRandomAccessFile(getFileName(url), "rw"); 133             file.seek(downloaded); 134               135             stream = connection.getInputStream(); 136             while(status == DOWNLOADING) { 137             // Size buffer according to how much of the file is left to download. 138                 bytebuffer[]; 139                 if(size - downloaded > MAX_BUFFER_SIZE) { 140                     buffer = newbyte[MAX_BUFFER_SIZE]; 141                 } else{ 142                     buffer = newbyte[size - downloaded]; 143                 } 144                   145                 // Read from server into buffer. 146                 intread = stream.read(buffer); 147                 if(read == -1) 148                     break; 149                   150                 // Write buffer to file. 151                 file.write(buffer, 0, read); 152                 downloaded += read; 153                 stateChanged(); 154             } 155               156       // Change status to complete if this point was reached because downloading has finished. 157             if(status == DOWNLOADING) { 158                 status = COMPLETE; 159                 stateChanged(); 160             } 161         } catch(Exception e) { 162             error(); 163         } finally{ 164             // Close file. 165             if(file != null) { 166                 try{ 167                     file.close(); 168                 } catch(Exception e) {} 169             } 170               171             // Close connection to server. 172             if(stream != null) { 173                 try{ 174                     stream.close(); 175                 } catch(Exception e) {} 176             } 177         } 178     } 179       180     // Notify observers that this download's status has changed. 181     privatevoidstateChanged() { 182         setChanged(); 183         notifyObservers(); 184     } 185   186     publicstaticvoidmain(String args[]){ 187         try{ 188         Download d = newDownload(newURL("http://www.oschina.net/img/logo.gif")); 189         d.run(); 190     } catch(MalformedURLException e) { 191         // TODO Auto-generated catch block 192         e.printStackTrace(); 193     } 194     } 195 } 使用MINA框架的转发服务器和客户端 [文件] SmsObject.java ~ 803B    下载(29) 01 publicclassSmsObject { 02     privateintreceiver; 03     privateintdata_type; 04     privateintdata_receiver; 05     privateintdata_sender; 06     privateString data; 07       08     publicintgetReceiver(){ 09         returnreceiver; 10     } 11       12     publicintgetDataType(){ 13         returndata_type; 14     } 15       16     publicintgetDataReceiver(){ 17         returndata_receiver; 18     } 19       20     publicintgetDataSender(){ 21         returndata_sender; 22     } 23       24     publicString getData(){ 25         returndata; 26     } 27       28     publicvoidsetReceiver(intreceiver){ 29         this.receiver = receiver; 30     } 31       32     publicvoidsetDataType(intdata_type){ 33         this.data_type = data_type; 34     } 35       36     publicvoidsetDataReceiver(intdata_receiver){ 37         this.data_receiver = data_receiver; 38     } 39       40     publicvoidsetDataSender(intdata_sender){ 41         this.data_sender = data_sender; 42     } 43       44     publicvoidsetData(String data){ 45         this.data = data; 46     } 47       48 } [文件] Server.java ~ 1KB    下载(19) 01 importjava.net.InetSocketAddress; 02 importjava.nio.charset.Charset; 03   04 importorg.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; 05 importorg.apache.mina.core.session.IdleStatus; 06 importorg.apache.mina.filter.codec.ProtocolCodecFilter; 07 importorg.apache.mina.transport.socket.SocketAcceptor; 08 importorg.apache.mina.transport.socket.nio.NioSocketAcceptor; 09   10 /** 11  * MINA Server 12  * @author Lazy 13  */ 14   15 publicclassServer { 16       17     privatestaticintbindPort = 9999; 18   19     /** 20      * @param args 21      */ 22     publicstaticvoidmain(String[] args) throwsException{ 23         // TODO Auto-generated method stub 24         SocketAcceptor acceptor = newNioSocketAcceptor(); 25           26         //setMinReadBufferSize(), setMaxReadBufferSize() 27         acceptor.getSessionConfig().setReadBufferSize(2048); 28         acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 29         //acceptor.getManagedSessions() 30           31         //set MinaServerFilter 32         DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 33         chain.addLast("codec", newProtocolCodecFilter(newCmccCodecFactory(Charset.forName("UTF-8")))); 34           35         //set MinaServerHandler 36         acceptor.setHandler(newServerHandler()); 37           38         //bind port     39         try 40         { 41             acceptor.bind(newInetSocketAddress(bindPort)); 42             System.out.println("Mina Server is Listing on:= "+ bindPort); 43         } 44         catch(Exception e) 45         { 46             e.printStackTrace(); 47         } 48   49     } 50 } [文件] ServerHandler.java ~ 3KB    下载(19) 001 importjava.util.ArrayList; 002   003 importorg.apache.mina.core.service.IoHandler; 004 importorg.apache.mina.core.session.IdleStatus; 005 importorg.apache.mina.core.session.IoSession; 006   007 //import org.slf4j.Logger; 008 //import org.slf4j.LoggerFactory; 009   010 publicclassServerHandler implementsIoHandler { 011       012     //SLF4J 013     //private final static Logger log = LoggerFactory.getLogger(ServerHandler.class); 014       015     privatestaticfinalCliLocator cli = newCliLocator(); 016       017     @Override 018     publicvoidexceptionCaught(IoSession arg0, Throwable arg1) 019             throwsException { 020         // TODO Auto-generated method stub 021   022     } 023   024     @Override 025     publicvoidmessageReceived(IoSession session, Object message) throwsException { 026         // TODO Auto-generated method stub 027           028         SmsObject sms = (SmsObject) message; 029         //log.info("The message is [" + sms.getMessage() + "]"); 030   031         /* 032          * other operate 033   034         System.out.println("================================================"); 035         System.out.println("Data From : " + session.getRemoteAddress()); 036         System.out.println("Receiver : [" + sms.getReceiver() + "]"); 037         System.out.println("Data Type : [" + sms.getDataType() + "]"); 038         System.out.println("Data Receiver : [" + sms.getDataReceiver() + "]"); 039         System.out.println("Data Sender : [" + sms.getDataSender() + "]"); 040         System.out.println("Data : [" + sms.getData() + "]"); 041         System.out.println("================================================"); 042           043          * */   044           045         //The processing of registration information 046         Integer i = new Integer(255); 047         if( i.equals(sms.getReceiver()) && 048             i.equals(sms.getDataType()) && 049             i.equals(sms.getDataReceiver()) && 050             i.equals(sms.getDataSender())) { 051               052             cli.addCli(session, sms.getData()); 053             System.out.println("Client : " + session.getRemoteAddress() + "  DONE"); 054         } else { 055             //Forwarding 056             ArrayList tempList = new ArrayList(); 057             tempList = cli.getCli(sms.getReceiver()); 058               059             System.out.println("tempting=======>" + session.getRemoteAddress() + "  with receiver : " + sms.getReceiver()); 060             if(tempList != null) { 061                 //System.out.println("true"); 062                 for (IoSession session1 : tempList){ 063                     System.out.println("Send =========>" + session1.getRemoteAddress()); 064                     session1.write(sms); 065                 } 066                 System.out.println("================================================"); 067             } 068             else System.out.println("forwarding false"); 069         } 070           071         //Trigger the client 072         sms.setReceiver(i); 073         sms.setDataType(i); 074         sms.setDataReceiver(i); 075         sms.setDataSender(i); 076         sms.setData(" "); 077         session.write(sms); 078   079     } 080   081     @Override 082     public void messageSent(IoSession arg0, Object arg1) throws Exception { 083         // TODO Auto-generated method stub 084   085     } 086   087     @Override 088     public void sessionClosed(IoSession session) throws Exception { 089         // TODO Auto-generated method stub 090           091         //delete the timeout address 092         System.out.println("Client Closed :" + session.getRemoteAddress()); 093         cli.delCli(session); 094         /* 095          * other operate 096          * */ 097     } 098   099     @Override 100     public void sessionCreated(IoSession arg0) throws Exception { 101         // TODO Auto-generated method stub 102   103     } 104   105     @Override 106     public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception { 107         // TODO Auto-generated method stub 108   109     } 110   111     @Override 112     public void sessionOpened(IoSession session) throws Exception { 113         // TODO Auto-generated method stub 114         System.out.println("Come :" + session.getRemoteAddress()); 115         /* 116          * other operate 117          * */ 118     } 119   120 } [文件] CliLocator.java ~ 2KB    下载(27) 01 importjava.util.ArrayList; 02 importjava.util.HashMap; 03   04 importorg.apache.mina.core.session.IoSession; 05   06 publicclassCliLocator { 07       08     privatestaticHashMap> cliLocation; 09       10     publicCliLocator() { 11         cliLocation = newHashMap>(); 12     } 13       14     publicvoidaddCli(IoSession session, String s) { 15         String[] sa = s.split(" "); 16         ArrayList tempList = newArrayList(); 17           18         for(inti=0; i"); 23             try { 24                 System.out.print(cliLocation.containsKey(key)); 25             } 26             catch (Exception E) { 27                 System.out.println(E.toString()); 28             } 29 */          30             if(cliLocation.containsKey(key)) { 31                 if(cliLocation.get(key) != null) 32                     tempList = cliLocation.get(key); 33             } else{ 34                 tempList.add(session); 35                 cliLocation.put(key, tempList); 36                 continue; 37             } 38             tempList.add(session); 39             cliLocation.put(key, tempList); 40         } 41     } 42       43     publicArrayList getCli(intreceiver) { 44           45         //System.out.println("Getting=====>" + receiver); 46         try{ 47             if(cliLocation.containsKey(receiver)) { 48                 //System.out.println("true"); 49                 returncliLocation.get(receiver); 50             } 51             else{ 52                 //System.out.println("false"); 53                 returnnull; 54             } 55         } 56         catch(Exception E) { 57             System.out.println(E.toString()); 58             returnnull; 59         } 60     } 61       62     publicvoiddelCli(IoSession session){ 63         ArrayList tempList = newArrayList(); 64           65         for(Integer inte : cliLocation.keySet()) { 66             tempList = cliLocation.get(inte); 67             if(tempList.contains(session)) { 68                 tempList.remove(session); 69             } 70         } 71     } 72       73 } [文件] CmccCodecFactory.java ~ 792B    下载(21) 01 importjava.nio.charset.Charset; 02   03 importorg.apache.mina.core.session.IoSession; 04 importorg.apache.mina.filter.codec.ProtocolCodecFactory; 05 importorg.apache.mina.filter.codec.ProtocolDecoder; 06 importorg.apache.mina.filter.codec.ProtocolEncoder; 07   08 publicclassCmccCodecFactory implementsProtocolCodecFactory{ 09   10     privatefinalCmccEncoder encoder; 11     privatefinalCmccDecoder decoder; 12   13     publicCmccCodecFactory(){ 14         this(Charset.defaultCharset()); 15     } 16   17     publicCmccCodecFactory(Charset charset){ 18         this.encoder = newCmccEncoder(charset); 19         this.decoder = newCmccDecoder(charset); 20     } 21   22     @Override 23     publicProtocolDecoder getDecoder(IoSession session) throwsException{ 24         returndecoder; 25     } 26   27     @Override 28     publicProtocolEncoder getEncoder(IoSession session) throwsException{ 29         returnencoder; 30     } 31 } [文件] CmccDecoder.java ~ 2KB    下载(18) 01 importjava.nio.charset.Charset; 02 importjava.nio.charset.CharsetDecoder; 03   04 importorg.apache.mina.core.buffer.IoBuffer; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.CumulativeProtocolDecoder; 07 importorg.apache.mina.filter.codec.ProtocolDecoderOutput; 08   09   10 publicclassCmccDecoder extendsCumulativeProtocolDecoder{ 11       12     privatefinalCharset charset; 13   14     publicCmccDecoder(Charset charset){ 15         this.charset = charset; 16     } 17       18     @Override 19     protectedbooleandoDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throwsException{ 20         IoBuffer buffer = IoBuffer.allocate(300).setAutoExpand(true); 21         CharsetDecoder cd = charset.newDecoder(); 22           23         bytehead = (byte)192; 24         byteend = (byte)193; 25         intreceiver = -1, data_type = -1, data_receiver = -1, data_sender = -1; 26         String data=""; 27         intmatchCount = 0; 28         booleanflag = false; 29           30         while(in.hasRemaining()){ 31             byteb = in.get(); 32             buffer.put(b); 33               34             if(b == head){ 35                 buffer.flip(); 36                 buffer.clear(); 37                 matchCount = 0; 38                 flag = true; 39             }elseif(b == end){ 40                 if(flag){ 41                     buffer.flip(); 42                     //System.out.println(buffer); 43                     receiver = buffer.getInt(0); 44                     data_type = buffer.getInt(4); 45                     data_receiver = buffer.getInt(8); 46                     data_sender = buffer.getInt(12); 47 /*                  48                     System.out.println(buffer); 49                     System.out.println(matchCount); 50 */                  51                     buffer.skip(16); 52                     data = buffer.getString(matchCount-16, cd); 53 /*                  54                     System.out.println("====================================="); 55                     for (int i = 16; i < 24; i++) 56                         System.out.println((char)buffer.get(i)); 57                     System.out.println("====================================="); 58                     System.out.println(buffer); 59 */                  60                     buffer.clear(); 61                     matchCount = 0; 62                     break; 63                 } 64             }else{ 65                 matchCount++; 66             } 67         } 68           69         SmsObject smsObject = newSmsObject(); 70         smsObject.setReceiver(receiver); 71         smsObject.setDataType(data_type); 72         smsObject.setDataReceiver(data_receiver); 73         smsObject.setDataSender(data_sender); 74         smsObject.setData(data); 75         out.write(smsObject); 76         returnfalse; 77     } 78 } [文件] CmccEncoder.java ~ 1KB    下载(18) 01 importjava.nio.charset.Charset; 02 importjava.nio.charset.CharsetEncoder; 03   04 importorg.apache.mina.core.buffer.IoBuffer; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.ProtocolEncoderAdapter; 07 importorg.apache.mina.filter.codec.ProtocolEncoderOutput; 08   09   10 publicclassCmccEncoder extendsProtocolEncoderAdapter{ 11     privatefinalCharset charset; 12       13     publicCmccEncoder(Charset charset){ 14         this.charset = charset; 15     } 16       17     @Override 18     publicvoidencode(IoSession session, Object message, ProtocolEncoderOutput out) throwsException{ 19         SmsObject sms = (SmsObject)message; 20         CharsetEncoder ce = charset.newEncoder(); 21         IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 22           23         bytehead = (byte)192; 24         byteend = (byte)193; 25         intreceiver = sms.getReceiver(); 26         intdata_type = sms.getDataType(); 27         intdata_receiver = sms.getDataReceiver(); 28         intdata_sender = sms.getDataSender(); 29         String data = sms.getData(); 30           31         buffer.put(head); 32         buffer.putInt(receiver); 33         buffer.putInt(data_type); 34         buffer.putInt(data_receiver); 35         buffer.putInt(data_sender); 36         buffer.putString(data, ce); 37         buffer.put(end); 38           39         buffer.flip(); 40         session.write(buffer); 41     } 42 } 使用mina框架实现cmpp2.0服务端 我自己写的使用mina框架实现cmpp2.0服务端,经过一段使用解决了几个bug现在比较稳定 [文件]源代码 ~ 4KB    下载(19) 001 packagecmpp; 002   003 importorg.apache.mina.core.service.IoHandlerAdapter; 004 importorg.apache.mina.core.session.IoSession; 005 importorg.apache.mina.filter.codec.ProtocolCodecFilter; 006 importorg.apache.mina.filter.executor.ExecutorFilter; 007 importorg.apache.mina.filter.executor.OrderedThreadPoolExecutor; 008 importorg.apache.mina.integration.jmx.IoServiceMBean; 009   010 importorg.apache.mina.transport.socket.SocketAcceptor; 011 importorg.apache.mina.transport.socket.SocketConnector; 012 importorg.apache.mina.transport.socket.nio.NioSocketAcceptor; 013   014 importorg.slf4j.Logger; 015 importorg.slf4j.LoggerFactory; 016   017 importjava.io.IOException; 018 importjava.lang.management.ManagementFactory; 019 importjava.net.InetSocketAddress; 020   021 importjava.util.concurrent.ThreadFactory; 022 importjava.util.concurrent.TimeUnit; 023 importjava.util.concurrent.atomic.AtomicInteger; 024   025 importjavax.management.InstanceAlreadyExistsException; 026 importjavax.management.MBeanRegistrationException; 027 importjavax.management.MBeanServer; 028 importjavax.management.MalformedObjectNameException; 029 importjavax.management.NotCompliantMBeanException; 030 importjavax.management.ObjectName; 031   032 /** 033  * TODO : Add documentation 034  * 035  * @author Apache MINA Project 036  * 037  */ 038 publicclassMinaCmpp extendsIoHandlerAdapter { 039     privatestaticfinalLogger logger = LoggerFactory 040             .getLogger(MinaCmpp.class); 041   042     publicstaticfinalintMSG_SIZE = 5000; 043     publicstaticfinalintMSG_COUNT = 10; 044     privatestaticfinalintPORT = 7890; 045     privatestaticfinalintBUFFER_SIZE = 8192; 046   047     publicstaticfinalString OPEN = "open"; 048   049     publicSocketAcceptor acceptor; 050     publicSocketConnector connector; 051   052     privatefinalObject LOCK = newObject(); 053   054     privatestaticfinalThreadFactory THREAD_FACTORY = newThreadFactory() { 055         publicThread newThread(finalRunnable r) { 056             returnnewThread(null, r, "MinaThread", 64* 1024); 057         } 058     }; 059   060     privateOrderedThreadPoolExecutor executor; 061   062     publicstaticAtomicInteger sent = newAtomicInteger(0); 063   064     publicMinaCmpp() throwsIOException { 065         executor = newOrderedThreadPoolExecutor(0, 1000, 60, TimeUnit.SECONDS, 066                 THREAD_FACTORY); 067   068         acceptor = newNioSocketAcceptor(Runtime.getRuntime() 069                 .availableProcessors() + 1); 070         acceptor.setReuseAddress(true); 071         acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE); 072   073         acceptor.getFilterChain().addLast("threadPool", 074                 newExecutorFilter(executor)); 075         acceptor.getFilterChain().addLast("codec", 076                 newProtocolCodecFilter(newCmppProtocolCodecFactory())); 077   078         MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); 079         IoServiceMBean acceptorMBean = newIoServiceMBean(acceptor); 080         ObjectName acceptorName; 081         try{ 082             acceptorName = newObjectName(acceptor.getClass().getPackage() 083                     .getName() 084                     + ":type=acceptor,name=" 085                     + acceptor.getClass().getSimpleName()); 086             mBeanServer.registerMBean(acceptorMBean, acceptorName); 087   088             // MBeanServer mBeanServer = 089             // ManagementFactory.getPlatformMBeanServer(); 090             // mBeanServer.registerMBean(new IoSessionMBean(session), 091             // new ObjectName(session.getClass() 092             // .getPackage().getName() 093             // + ":type=session,name=" + session.getClass().getSimpleName())); 094   095         } catch(MalformedObjectNameException e) { 096             // TODO Auto-generated catch block 097             e.printStackTrace(); 098         } catch(NullPointerException e) { 099             // TODO Auto-generated catch block 100             e.printStackTrace(); 101         } catch(InstanceAlreadyExistsException e) { 102             // TODO Auto-generated catch block 103             e.printStackTrace(); 104         } catch(MBeanRegistrationException e) { 105             // TODO Auto-generated catch block 106             e.printStackTrace(); 107         } catch(NotCompliantMBeanException e) { 108             // TODO Auto-generated catch block 109             e.printStackTrace(); 110         } 111   112     } 113   114     publicvoidstart() throwsException { 115         finalInetSocketAddress socketAddress = newInetSocketAddress( 116                 "0.0.0.0", PORT); 117         acceptor.setHandler(newCmppIoHandler(LOCK)); 118         acceptor.bind(socketAddress); 119         logger.info("MinaCmpp�������������˿���"+ PORT); 120     } 121   122     @Override 123     publicvoidexceptionCaught(IoSession session, Throwable cause) { 124         if(!(cause instanceofIOException)) { 125             logger.error("Exception: ", cause); 126         } else{ 127             logger.info("I/O error: "+ cause.getMessage()); 128         } 129         session.close(true); 130     } 131   132     publicstaticvoidmain(String[] args) throwsException { 133         newMinaCmpp().start(); 134     } 135 } [文件]源代码 ~ 2KB    下载(16) 01 packagecmpp; 02   03   04   05 importorg.apache.mina.core.session.IoSession; 06 importorg.slf4j.Logger; 07 importorg.slf4j.LoggerFactory; 08   09 importcmpp.pdu.ActiveTest; 10   11 publicclassActiveThread extendsThread { 12     privateIoSession session = null; 13     privatestaticfinalLogger logger = LoggerFactory 14             .getLogger(ActiveThread.class); 15     privatelongheartbeatInterval = 60000; 16     privatelongheartbeatRetry = 3; 17     privatelongreconnectInterval = 10000; 18     publicstaticlonglastActiveTime = 0; 19     privatelonglastCheckTime = 0; 20   21     publicActiveThread(IoSession s) { 22         setDaemon(true); 23         this.session = s; 24         lastCheckTime = System.currentTimeMillis(); 25         lastActiveTime = System.currentTimeMillis(); 26     } 27   28     publicvoidrun() { 29         try{ 30             while(session.isConnected()) { 31                 longcurrentTime = System.currentTimeMillis(); 32                 if((currentTime - lastCheckTime) > heartbeatInterval) { 33                     logger.info("CmppSession.checkConnection"); 34                     if((currentTime - lastActiveTime) < (heartbeatInterval * heartbeatRetry)) { 35                         logger.info("send ActiveTest"); 36                         lastCheckTime = currentTime; 37                         ActiveTest activeTest = newActiveTest(); 38                         activeTest.assignSequenceNumber(); 39                         activeTest.timeStamp = currentTime; 40                         session.write(activeTest); 41                     } else{ 42                         logger.info("connection lost!"); 43                         session.close(true); 44                         break; 45                     } 46                 } 47                 try{ 48                     Thread.sleep(reconnectInterval); 49                 } catch(InterruptedException e) { 50                     // 51                 } 52             } 53         } catch(Exception e) { 54             e.printStackTrace(); 55         } 56     } 57 } [文件]源代码 ~ 909B    下载(14) 01 packagecmpp; 02   03 importorg.apache.mina.core.buffer.IoBuffer; 04 importorg.apache.mina.core.session.AttributeKey; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.ProtocolCodecFactory; 07 importorg.apache.mina.filter.codec.ProtocolDecoder; 08 importorg.apache.mina.filter.codec.ProtocolDecoderOutput; 09 importorg.apache.mina.filter.codec.ProtocolEncoder; 10   11 /** 12  * TODO: Document me 13  * @author Apache MINA Project 14  * 15  */ 16 publicclassCmppProtocolCodecFactory implementsProtocolCodecFactory { 17   privateProtocolDecoder decoder = newCmppRequestDecoder(); 18   privateProtocolEncoder encoder = newCmppResponseEncoder(); 19   20   publicProtocolDecoder getDecoder(IoSession sessionIn) throwsException { 21     returndecoder; 22   } 23   24   publicProtocolEncoder getEncoder(IoSession sessionIn) throwsException { 25     returnencoder; 26   } 27 } [文件]源代码 ~ 862B    下载(12) 01 packagecmpp; 02   03 importorg.apache.mina.core.buffer.IoBuffer; 04 importorg.apache.mina.core.session.IoSession; 05 importorg.apache.mina.filter.codec.ProtocolEncoder; 06 importorg.apache.mina.filter.codec.ProtocolEncoderOutput; 07   08 importcmpp.pdu.CmppPDU; 09   10 /** 11  * TODO: Document me ! 12  * 13  * @author Apache MINA Project 14  */ 15 publicclassCmppResponseEncoder implementsProtocolEncoder { 16     publicvoidencode(IoSession session, Object message, 17             ProtocolEncoderOutput out) throwsException { 18         CmppPDU pdu = (CmppPDU) message; 19         byte[] bytes = pdu.getData().getBuffer(); 20         IoBuffer buf = IoBuffer.allocate(bytes.length, false); 21   22         buf.setAutoExpand(true); 23 //      buf.putInt(bytes.length); 24         buf.put(bytes); 25   26         buf.flip(); 27         out.write(buf); 28   29     } 30   31     publicvoiddispose(IoSession session) throwsException { 32   33     } 34 } [文件]源代码 ~ 2KB    下载(16) 01 packagecmpp; 02   03   04 importorg.apache.mina.core.buffer.IoBuffer; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.CumulativeProtocolDecoder; 07 importorg.apache.mina.filter.codec.ProtocolDecoderOutput; 08 importorg.slf4j.Logger; 09 importorg.slf4j.LoggerFactory; 10   11 importcmpp.pdu.CmppPDU; 12 importcmpp.pdu.CmppPDUParser; 13 importcmpp.sms.ByteBuffer; 14   15   16 /** 17  * TODO: Document me ! 18  * 19  * @author Apache MINA Project 20  */ 21 publicclassCmppRequestDecoder extendsCumulativeProtocolDecoder { 22     privatestaticfinalLogger logger = LoggerFactory 23             .getLogger(CmppRequestDecoder.class); 24   25     @Override 26     protectedbooleandoDecode(finalIoSession session, IoBuffer in, 27             ProtocolDecoderOutput out) throwsException { 28   29         // �������¼�������� 30         // 1. һ��ip����ֻ��һ��������Ϣ 31         // 2. һ��ip���а�һ��������Ϣ����һ����Ϣ��һ���� 32         // 3. һ��ip���а�һ����Ϣ��һ���� 33         // 4. һ��ip���а���������������Ϣ���ࣨѭ�������ڸ����decode�У� 34   35         if(in.remaining() > 4) { 36             logger.info("resv msg "+ in.toString()); 37             in.mark(); 38             intlength = in.getInt();           39             logger.info("length="+ length + ",in.limit="+ in.limit()+",in.remaining="+in.remaining()); 40             if(length > (in.remaining()+4)) 41             { 42                 in.rewind(); 43                 returnfalse; 44             } 45   46             byte[] bytedata = newbyte[length-4]; 47             in.get(bytedata);   48             ByteBuffer buffer = newByteBuffer(); 49             buffer.appendInt(length); 50             buffer.appendBytes(bytedata); 51             CmppPDU pdu = CmppPDUParser.createPDUFromBuffer(buffer); 52             if(pdu == null) returnfalse; 53             logger.info(pdu.dump()); 54             out.write(pdu); 55             returntrue; 56   57         } 58   59         returnfalse; 60     } 61 } [文件] CmppIoHandler.java ~ 6KB    下载(14) 001 packagecmpp; 002   003 importorg.apache.mina.core.buffer.IoBuffer; 004 importorg.apache.mina.core.service.IoHandlerAdapter; 005 importorg.apache.mina.core.session.IoSession; 006 importorg.apache.mina.integration.jmx.IoSessionMBean; 007 importorg.slf4j.Logger; 008 importorg.slf4j.LoggerFactory; 009   010 importcmpp.CmppConstant; 011   012 importcmpp.ActiveThread; 013 importcmpp.pdu.CmppPDU; 014 importcmpp.sms.ByteBuffer; 015 importcmpp.sms.ShortMessage; 016 importstaticcmpp.MinaCmpp.MSG_COUNT; 017 importstaticcmpp.MinaCmpp.OPEN; 018   019 importjava.io.IOException; 020 importjava.lang.management.ManagementFactory; 021   022 importjava.text.SimpleDateFormat; 023 importjava.util.Date; 024 importjava.util.concurrent.ExecutorService; 025 importjava.util.concurrent.Executors; 026 importjava.util.concurrent.atomic.AtomicInteger; 027   028 importjavax.management.MBeanServer; 029 importjavax.management.ObjectName; 030   031 /** 032  * TODO: Document me ! 033  * 034  * @author Apache MINA Project 035  * 036  */ 037 publicclassCmppIoHandler extendsIoHandlerAdapter { 038     privatestaticfinalLogger logger = LoggerFactory 039             .getLogger(CmppIoHandler.class); 040     publicstaticAtomicInteger received = newAtomicInteger(0); 041     publicstaticAtomicInteger closed = newAtomicInteger(0); 042     privatefinalObject LOCK; 043     publicstaticbooleanConnect = false; 044     publicstaticbooleanFirstmsg = true; 045     privateExecutorService exec = Executors.newSingleThreadExecutor(); 046     publicCmppIoHandler(Object lock) { 047         LOCK = lock; 048     } 049   050     @Override 051     publicvoidexceptionCaught(IoSession session, Throwable cause) { 052         if(!(cause instanceofIOException)) { 053             logger.error("Exception: ", cause); 054         } else{ 055             logger.info("I/O error: "+ cause.getMessage()); 056         } 057         session.close(true); 058     } 059   060     @Override 061     publicvoidsessionOpened(IoSession session) throwsException { 062         logger.info("Session "+ session.getId() + " is opened"); 063   064         // ����ActivePDU-Thread 065         // ExecutorService exec = Executors.newSingleThreadExecutor(); 066         // exec.execute(new ActiveThread(session)); 067         Thread t = newThread(newActiveThread(session)); 068         t.setDaemon(true); 069         t.start(); 070         session.resumeRead(); 071   072     } 073   074     @Override 075     publicvoidsessionCreated(IoSession session) throwsException { 076         logger.info("Creation of session "+ session.getId()); 077         session.setAttribute(OPEN); 078         session.suspendRead(); 079   080     } 081   082     @Override 083     publicvoidsessionClosed(IoSession session) throwsException { 084         session.removeAttribute(OPEN); 085         logger.info("{}> Session closed", session.getId()); 086         finalintclsd = closed.incrementAndGet(); 087   088         if(clsd == MSG_COUNT) { 089             synchronized(LOCK) { 090                 LOCK.notifyAll(); 091             } 092         } 093     } 094   095     @Override 096     publicvoidmessageReceived(IoSession session, Object message) 097             throwsException { 098         CmppPDU pdu = (CmppPDU) message; 099         logger.info("MESSAGE: "+ pdu.header.getCommandId() + ":" 100                 + pdu.header.getSequenceNumber() + "on session " 101                 + session.getId()); 102         finalintrec = received.incrementAndGet(); 103         if(Firstmsg == true|| Connect == true) { 104             Firstmsg = false; 105             switch(pdu.header.getCommandId()) { 106             caseCmppConstant.CMD_CONNECT: 107                 cmpp.pdu.Connect con = (cmpp.pdu.Connect) pdu; 108                 cmpp.pdu.ConnectResp conresp = (cmpp.pdu.ConnectResp) con 109                         .getResponse(); 110                 session.write(conresp); 111                 logger.info("conresp:"+ pdu.header.getSequenceNumber() 112                         + " on session "+ session.getId()); 113                 Connect = true; 114 //              Thread t2 = new Thread(new SendMoThread(session)); 115 //              t2.setDaemon(true); 116 //              t2.start(); 117                 exec.execute(newSendMoThread(session)); 118                 break; 119             caseCmppConstant.CMD_ACTIVE_TEST: 120                 cmpp.pdu.ActiveTest activeTest = (cmpp.pdu.ActiveTest) pdu; 121                 cmpp.pdu.ActiveTestResp activeTestResp = (cmpp.pdu.ActiveTestResp) activeTest 122                         .getResponse(); 123                 session.write(activeTestResp); 124                 logger.info("active_test:"+ pdu.header.getSequenceNumber() 125                         + " on session "+ session.getId()); 126                 break; 127             caseCmppConstant.CMD_ACTIVE_TEST_RESP: 128                 cmpp.pdu.ActiveTestResp activeTestRsp = (cmpp.pdu.ActiveTestResp) pdu; 129                 pdu.dump(); 130                 logger.info("activeTestRsp:"+ pdu.header.getSequenceNumber() 131                         + " on session "+ session.getId()); 132                 ActiveThread.lastActiveTime = System.currentTimeMillis(); 133                 break; 134             caseCmppConstant.CMD_SUBMIT: 135                 cmpp.pdu.Submit submit = (cmpp.pdu.Submit) pdu; 136                 submit.dump(); 137                 cmpp.pdu.SubmitResp subresp = (cmpp.pdu.SubmitResp) submit 138                         .getResponse(); 139                 subresp.setMsgId(cmpp.pdu.Tools.GetRspid()); 140                 session.write(subresp); 141                 logger.info("subresp:"+ pdu.header.getSequenceNumber() 142                         + " on session "+ session.getId()); 143                 // ����״̬���� 144                 cmpp.pdu.Deliver deliver = sendMsgStat(submit); 145                 session.write(deliver); 146                 break; 147             caseCmppConstant.CMD_DELIVER_RESP: 148                 cmpp.pdu.DeliverResp delresp = (cmpp.pdu.DeliverResp) pdu; 149                 delresp.dump(); 150                 logger.info("DeliverResp:"+ pdu.header.getSequenceNumber() 151                         + " on session "+ session.getId()); 152                 break; 153             default: 154                 logger.warn("Unexpected PDU received! PDU Header: " 155                         + pdu.header.getData().getHexDump()); 156                 break; 157             } 158         } 159         if(rec == MSG_COUNT) { 160             synchronized(LOCK) { 161                 LOCK.notifyAll(); 162             } 163         } 164   165         // session.close(true); 166     } 167   168     privatecmpp.pdu.Deliver sendMsgStat(cmpp.pdu.Submit submit) { 169         // TODO Auto-generated method stub 170         cmpp.pdu.Deliver delive = newcmpp.pdu.Deliver(); 171         delive.setMsgId(cmpp.pdu.Tools.GetMsgid()); 172         delive.setDstId(submit.getMsgSrc()); 173         delive.setServiceId(submit.getServiceId()); 174         delive.setSrcTermId(submit.getDestTermId()[0]); 175         delive.setIsReport((byte) 1); 176         delive.assignSequenceNumber(); 177         ShortMessage sm = newShortMessage(); 178         ByteBuffer messageData = newByteBuffer(); 179         messageData.appendBytes(submit.getMsgId(), 8); 180         messageData.appendString("0", 7); 181         SimpleDateFormat sdf = newSimpleDateFormat("yyMMddHHmm"); 182         messageData.appendString(sdf.format(newDate()), 10); 183         messageData.appendString(sdf.format(newDate()), 10); 184         messageData.appendString(submit.getDestTermId()[0], 32); 185         messageData.appendInt(30); 186         sm.setMessage(messageData.getBuffer(), (byte) 04); 187         delive.setSm(sm); 188         delive.setLinkId(submit.getLinkId()); 189         returndelive; 190     } 191 } [文件] CmppConstant.java ~ 4KB    下载(15) 001 packagecmpp; 002   003   004 publicclassCmppConstant { 005     publicstaticfinalbyteCOMMAND_NAME_LENGTH=12; 006     publicstaticfinalbyteTRANSMITTER = (byte) 0x00; 007     publicstaticfinalbyteRECEIVER = (byte) 0x01; 008     publicstaticfinalbyteTRANSCEIVER = (byte) 0x02; 009       010     publicstaticfinalintCONNECTION_CLOSED = 0; 011     publicstaticfinalintCONNECTION_OPENED = 1; 012   013     publicstaticfinallongACCEPT_TIMEOUT = 60000; 014     publicstaticfinallongRECEIVER_TIMEOUT = 60000; 015     publicstaticfinallongCOMMS_TIMEOUT = 60000; 016     publicstaticfinallongQUEUE_TIMEOUT = 60000; 017     publicstaticfinallongRECEIVE_BLOCKING = 0; 018     publicstaticfinallongCONNECTION_RECEIVE_TIMEOUT = 0; 019     publicstaticintPDU_HEADER_SIZE = 12; 020       021     //CMPP Command Set 022     publicstaticfinalintCMD_CONNECT = 0x00000001; 023     publicstaticfinalintCMD_CONNECT_RESP = 0x80000001; 024     publicstaticfinalintCMD_TERMINATE = 0x00000002; 025     publicstaticfinalintCMD_TERMINATE_RESP = 0x80000002; 026     publicstaticfinalintCMD_SUBMIT = 0x00000004; 027     publicstaticfinalintCMD_SUBMIT_RESP = 0x80000004; 028     publicstaticfinalintCMD_DELIVER = 0x00000005; 029     publicstaticfinalintCMD_DELIVER_RESP = 0x80000005; 030     publicstaticfinalintCMD_QUERY = 0x00000006; 031     publicstaticfinalintCMD_QUERY_RESP = 0x80000006; 032     publicstaticfinalintCMD_CANCEL = 0x00000007; 033     publicstaticfinalintCMD_CANCEL_RESP = 0x80000007; 034     publicstaticfinalintCMD_ACTIVE_TEST = 0x00000008; 035     publicstaticfinalintCMD_ACTIVE_TEST_RESP = 0x80000008; 036   037   038       039   040     //Command_Status Error Codes 041     publicstaticfinalintESME_ROK = 0x00000000; 042   043     //Interface_Version 044     publicstaticfinalbyteVERSION = 0x20; 045   046     //Address_TON 047     publicstaticfinalbyteGSM_TON_UNKNOWN = 0x00; 048     publicstaticfinalbyteGSM_TON_INTERNATIONAL = 0x01; 049     publicstaticfinalbyteGSM_TON_NATIONAL = 0x02; 050     publicstaticfinalbyteGSM_TON_NETWORK = 0x03; 051     publicstaticfinalbyteGSM_TON_SUBSCRIBER = 0x04; 052     publicstaticfinalbyteGSM_TON_ALPHANUMERIC = 0x05; 053     publicstaticfinalbyteGSM_TON_ABBREVIATED = 0x06; 054     publicstaticfinalbyteGSM_TON_RESERVED_EXTN = 0x07; 055   056     //Address_NPI 057     publicstaticfinalbyteGSM_NPI_UNKNOWN = 0x00; 058     publicstaticfinalbyteGSM_NPI_E164 = 0x01; 059     publicstaticfinalbyteGSM_NPI_ISDN = GSM_NPI_E164; 060     publicstaticfinalbyteGSM_NPI_X121 = 0x03; 061     publicstaticfinalbyteGSM_NPI_TELEX = 0x04; 062     publicstaticfinalbyteGSM_NPI_LAND_MOBILE = 0x06; 063     publicstaticfinalbyteGSM_NPI_NATIONAL = 0x08; 064     publicstaticfinalbyteGSM_NPI_PRIVATE = 0x09; 065     publicstaticfinalbyteGSM_NPI_ERMES = 0x0A; 066     publicstaticfinalbyteGSM_NPI_INTERNET = 0x0E; 067     publicstaticfinalbyteGSM_NPI_WAP_CLIENT_ID = 0x12; 068     publicstaticfinalbyteGSM_NPI_RESERVED_EXTN = 0x0F; 069   070     //Port Value 071     publicstaticfinalintMIN_VALUE_PORT = 1024; 072     publicstaticfinalintMAX_VALUE_PORT = 65535; 073       074     //Address Length 075     publicstaticfinalintMIN_LENGTH_ADDRESS = 7; 076       077     // list of character encodings 078     // see http://java.sun.com/j2se/1.3/docs/guide/intl/encoding.doc.html 079     // from rt.jar 080   081     // American Standard Code for Information Interchange 082     publicstaticfinalString ENC_ASCII = "ASCII"; 083     // Windows Latin-1 084     publicstaticfinalString ENC_CP1252 = "Cp1252"; 085     // ISO 8859-1, Latin alphabet No. 1 086     publicstaticfinalString ENC_ISO8859_1 = "ISO8859_1"; 087     // Sixteen-bit Unicode Transformation Format, big-endian byte order 088     // with byte-order mark 089     publicstaticfinalString ENC_UTF16_BEM = "UnicodeBig"; 090     // Sixteen-bit Unicode Transformation Format, big-endian byte order 091     publicstaticfinalString ENC_UTF16_BE = "UnicodeBigUnmarked"; 092     // Sixteen-bit Unicode Transformation Format, little-endian byte order 093     // with byte-order mark 094     publicstaticfinalString ENC_UTF16_LEM = "UnicodeLittle"; 095     // Sixteen-bit Unicode Transformation Format, little-endian byte order 096     publicstaticfinalString ENC_UTF16_LE = "UnicodeLittleUnmarked"; 097     // Eight-bit Unicode Transformation Format 098     publicstaticfinalString ENC_UTF8 = "UTF8"; 099     // Sixteen-bit Unicode Transformation Format, byte order specified by 100     // a mandatory initial byte-order mark 101     publicstaticfinalString ENC_UTF16 = "UTF-16"; 102   103 } [文件] CmppPDUParser.java ~ 3KB    下载(6) 001 /* 002  * Created on 2005-5-30 003  * 004  * TODO To change the template for this generated file go to 005  * Window - Preferences - Java - Code Style - Code Templates 006  */ 007 package cmpp.pdu; 008   009 import cmpp.CmppConstant; 010 import cmpp.sms.ByteBuffer; 011 import cmpp.sms.PDUException; 012 import cmpp.sms.SmsObject; 013   014 /** 015  * @author intermax 016  * 017  * TODO To change the template for this generated type comment go to 018  * Window - Preferences - Java - Code Style - Code Templates 019  */ 020 publicclassCmppPDUParser extendsSmsObject { 021       022     publicstaticCmppPDU createPDUFromBuffer(ByteBuffer buffer) { 023         CmppPDU pdu = null; 024         CmppPDUHeader pduHeader = newCmppPDUHeader(); 025         try{ 026             pduHeader.setData(buffer); 027             switch(pduHeader.getCommandId()) { 028             caseCmppConstant.CMD_SUBMIT_RESP: 029                 SubmitResp submitResp = newSubmitResp(); 030                 submitResp.header = pduHeader; 031                 submitResp.setBody(buffer); 032                 pdu = submitResp; 033                 break; 034             caseCmppConstant.CMD_DELIVER: 035                 Deliver deliver = newDeliver(); 036                 deliver.header = pduHeader; 037                 deliver.setBody(buffer); 038                 pdu = deliver; 039                 break; 040             caseCmppConstant.CMD_ACTIVE_TEST: 041                 ActiveTest activeTest = newActiveTest(); 042                 activeTest.header = pduHeader; 043                 pdu = activeTest; 044                 break; 045             caseCmppConstant.CMD_ACTIVE_TEST_RESP: 046                 ActiveTestResp activeTestResp = newActiveTestResp(); 047                 activeTestResp.header = pduHeader; 048                 pdu = activeTestResp; 049                 break; 050             caseCmppConstant.CMD_DELIVER_RESP: 051                 DeliverResp deliverResp = newDeliverResp(); 052                 deliverResp.header = pduHeader; 053                 deliverResp.setBody(buffer); 054                 pdu = deliverResp; 055                 break; 056             caseCmppConstant.CMD_SUBMIT: 057                 Submit submit = newSubmit(); 058                 submit.header = pduHeader; 059                 submit.setBody(buffer); 060                 pdu = submit; 061                 break; 062             caseCmppConstant.CMD_QUERY: 063                 Query query  = newQuery(); 064                 query.header = pduHeader; 065                 query.setBody(buffer); 066                 pdu = query; 067                 break; 068             caseCmppConstant.CMD_QUERY_RESP: 069                 QueryResp queryResp  = newQueryResp(); 070                 queryResp.header = pduHeader; 071                 queryResp.setBody(buffer); 072                 pdu = queryResp; 073                 break; 074             caseCmppConstant.CMD_CONNECT: 075                 Connect login = newConnect(); 076                 login.header = pduHeader; 077                 login.setBody(buffer); 078                 pdu = login; 079                 break; 080             caseCmppConstant.CMD_CONNECT_RESP: 081                 ConnectResp loginResp = newConnectResp(); 082                 loginResp.header = pduHeader; 083                 loginResp.setBody(buffer); 084                 pdu = loginResp; 085                 break; 086             caseCmppConstant.CMD_CANCEL: 087                 Cancel cancel = newCancel(); 088                 cancel.header = pduHeader; 089                 cancel.setBody(buffer); 090                 pdu = cancel; 091                 break; 092             caseCmppConstant.CMD_CANCEL_RESP: 093                 CancelResp cancelResp = newCancelResp(); 094                 cancelResp.header = pduHeader; 095                 cancelResp.setBody(buffer); 096                 pdu = cancelResp; 097                 break; 098             default: 099                 logger.error("Unknown Command! PDU Header: "+ pduHeader.getData().getHexDump()); 100                 break; 101             } 102         } catch(PDUException e) { 103             logger.error("Error parsing PDU: ", e); 104         } 105         returnpdu; 106     } 107 } [文件] CmppPDU.java ~ 3KB    下载(7) 001 /* 002  * Created on 2005-5-7 003  * 004  * TODO To change the template for this generated file go to 005  * Window - Preferences - Java - Code Style - Code Templates 006  */ 007 package cmpp.pdu; 008   009 import cmpp.sms.PDU; 010   011   012 /** 013  * @author lucien 014  * 015  * TODO To change the template for this generated type comment go to Window - 016  * Preferences - Java - Code Style - Code Templates 017  */ 018 public abstract class CmppPDU extends PDU  { 019   020     private static int sequenceNumber = 0; 021   022     private boolean sequenceNumberChanged = false; 023   024     public CmppPDUHeader header = null; 025   026     public CmppPDU() { 027         header = new CmppPDUHeader(); 028     } 029   030     public CmppPDU(int commandId) { 031         header = new CmppPDUHeader(); 032         header.setCommandId(commandId); 033     } 034   035     /** Checks if the header field is null and if not, creates it. */ 036     privatevoidcheckHeader() { 037         if(header == null) { 038             header = newCmppPDUHeader(); 039         } 040     } 041   042     publicintgetCommandLength() { 043         checkHeader(); 044         returnheader.getCommandLength(); 045     } 046   047     publicintgetCommandId() { 048         checkHeader(); 049         returnheader.getCommandId(); 050     } 051   052     publicintgetSequenceNumber() { 053         checkHeader(); 054         returnheader.getSequenceNumber(); 055     } 056   057     publicvoidsetCommandLength(intcmdLen) { 058         checkHeader(); 059         header.setCommandLength(cmdLen); 060     } 061   062     publicvoidsetCommandId(intcmdId) { 063         checkHeader(); 064         header.setCommandId(cmdId); 065     } 066   067     publicvoidsetSequenceNumber(intseqNr) { 068         checkHeader(); 069         header.setSequenceNumber(seqNr); 070     } 071   072     publicvoidassignSequenceNumber() { 073         assignSequenceNumber(false); 074     } 075   076     publicvoidassignSequenceNumber(booleanalways) { 077         if((!sequenceNumberChanged) || always) { 078             synchronized(this) { 079                 setSequenceNumber(++sequenceNumber); 080             } 081             sequenceNumberChanged = true; 082         } 083     } 084   085     publicvoidresetSequenceNumber() { 086         setSequenceNumber(0); 087         sequenceNumberChanged = false; 088     } 089   090     publicbooleanequals(Object object) { 091         if((object != null) && (object instanceofCmppPDU)) { 092             CmppPDU pdu = (CmppPDU) object; 093             returnpdu.getSequenceNumber() == getSequenceNumber(); 094         } else{ 095             returnfalse; 096         } 097     } 098   099     publicString getSequenceNumberAsString() { 100         intdata = header.getSequenceNumber(); 101         byte[] intBuf = newbyte[4]; 102         intBuf[3] = (byte) (data & 0xff); 103         intBuf[2] = (byte) ((data >>> 8) & 0xff); 104         intBuf[1] = (byte) ((data >>> 16) & 0xff); 105         intBuf[0] = (byte) ((data >>> 24) & 0xff); 106         returnnewString(intBuf); 107     } 108   109     publicabstractbooleanisRequest(); 110   111     publicabstractbooleanisResponse(); 112   113     publicString dump() { 114         returnname() + " dump() unimplemented"; 115     } 116   117   118 } [文件] CmppPDUHeader.java ~ 2KB    下载(7) view source print? 01 /* 02  * Created on 2005-5-7 03  * 04  * TODO To change the template for this generated file go to 05  * Window - Preferences - Java - Code Style - Code Templates 06  */ 07 package cmpp.pdu; 08   09   10 import cmpp.CmppConstant; 11 import cmpp.sms.ByteBuffer; 12 import cmpp.sms.ByteData; 13 import cmpp.sms.NotEnoughDataInByteBufferException; 14 import cmpp.sms.PDUException; 15   16   17 /** 18  * @author lucien 19  * 20  * TODO To change the template for this generated type comment go to 21  * Window - Preferences - Java - Code Style - Code Templates 22  */ 23 publicclassCmppPDUHeader extendsByteData { 24     privateintcommandLength = CmppConstant.PDU_HEADER_SIZE; 25     privateintcommandId = 0; 26     privateintsequenceNumber = 0; 27   28     publicByteBuffer getData() { 29         ByteBuffer buffer = newByteBuffer(); 30         buffer.appendInt(getCommandLength()); 31         buffer.appendInt(getCommandId()); 32         buffer.appendInt(getSequenceNumber()); 33         returnbuffer; 34     } 35   36     publicvoidsetData(ByteBuffer buffer) throwsPDUException { 37         try{ 38             commandLength = buffer.removeInt(); 39             commandId = buffer.removeInt(); 40             sequenceNumber = buffer.removeInt(); 41         } catch(NotEnoughDataInByteBufferException e) { 42             thrownewPDUException(e); 43         } 44     } 45   46     publicintgetCommandLength() { 47         returncommandLength; 48     } 49   50     publicintgetCommandId() { 51         returncommandId; 52     } 53   54     publicintgetSequenceNumber() { 55         returnsequenceNumber; 56     } 57   58     publicvoidsetCommandLength(intcmdLen) { 59         commandLength = cmdLen; 60     } 61   62     publicvoidsetCommandId(intcmdId) { 63         commandId = cmdId; 64     } 65   66     publicvoidsetSequenceNumber(intseqNr) { 67         sequenceNumber = seqNr; 68     } 69   70 }

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享文档获得金币 ] 1 人已下载

下载文档