MINA使用心得及相关要点 目录 前言 What is MINA 使用案例: Apache 直属 MINA 的子项目: 同类框架: MINA 快速入门 预备知识: 资源下载: Hello world 的关键关键代码: MINA 深度了解 Mina 的应用层 MINA 的内部流程 选择 MINA 的理由 传统 socket 编程 改进的 Socket 编程 使用 MINA 框架的编程 扩展知识 IoBuffer 接口 sendUrgentData() 方法的使用 Concurrent 包下的一些类 结尾语 前言 笔者之前的工作主要是做 java 的 web 端开发,后因工作原因参与了一个国家级的大项目,主要负责其中底层通讯的前置机模块。几经波折,将该系统完成后,结果在第一轮的测试中就惨败退回。其根本原因就在于原设计文档的要求单“通信机”与“终端”(注一)之间的并发量要达到 2W 以上的连接通信,而实际运行并发量只能达到 2600 个相差了近十倍左右。经过代码调优、扩展 JVM 内存等等手段,但因基础数据相差过大,所取得的优化效果十分有限。后考虑在根本着手,只有更改整个系统的通信接口,才有可能达到设计文档上的要求。某天在某个技术 QQ 群里一次讨论中,有网友向我推荐了一个框架,这就是本文要介绍的主角 -MINA 。 注一:前置机分成了三个部分,其设计的结构图如下所示: What is MINA Apache MINA(Multipurpose Infrastructure for Network Applications) 是 Apache 组织一个较新的项目,它为开发高性能和高可用性的网络应用程序提供了非常便利的框架。当前发行的 MINA 版本支持基于 Java NIO 技术的 TCP/UDP 应用程序开发、串口通讯程序(只在最新的预览版中提供), MINA 所支持的功能也在进一步的扩展中 使用案例: 目前正在使用 MINA 的软件包括有: Apache Directory Project 、 AMQP ( Advanced Message Queuing Protocol )、 RED5 Server ( Macromedia Flash Media RTMP )、 ObjectRADIUS 、 Openfire 等等。 Apache 直属 MINA 的子项目: FTPServer , AsyncWeb , SSHD 其实在有人推荐了 MINA 之后,本人就上网 google 了一把,搜索一番下来之后才发现自己的眼界过于狭隘了,原来有相同功能的开源框架还真不少,看来以后得继续多泡论坛和 QQ 了(有正当理由上班泡坛子,聊 QQ 了,偷笑一个 ^_^ 。。。。。。) 同类框架: Netty2: 具有很好的构架,并且该框架非常有名,使用该框架的项目不少,这意味着开发团队持续更新的动力非常大。同时 Netty2 的文档非常齐全,并且支持 JMX 。 Netty2 的缺点就是实现代码的质量不是非常好,最大的缺点就是只支持普通的 TCP 。 Cindy :起源于 Netty2 之后,借鉴了 Netty2 中 MessageRecognizer 类的设计,在当前的版本中已经全面支持普通 TCP/Secure TCP/UDP 单播 /UDP 多播 /Pipe ,可以使用同一个模型进行同步 / 异步 IO 处理。 Cindy 目前缺点是文档相对较少以及应用的项目比较少。 Grizzl :的设计与一般的 nio 框架相比是比较不同的,主要不同点在于读和写都是采用 blocking 方式,并且使用临时 selector ;线程模型高度可配置。性能据说比 MINA 还高,但是学习曲线很高。 QuickServer: http://www.quickserver.org/ Xscocket :是一个轻量级的解决方案,核心思想是屏蔽,简化 nio 方式的的开发,并不需要过多的学习。 对于这些框架的基本使用和基础架构,本人都经过了一番研究,发现这些框架要么重者过重,要么轻者过轻。鉴于项目的规模及时间的紧迫,再三比较之下,本人还是选择了学习曲线低,性能优异的 MINA. 至于这些优点是如何体现出来的,本文以下内容将继续为您解读。 MINA 快速入门 闲话少说,借用大师级的写书经验,咱们先来一个 hello world 。当然在这前我们还是得先做一些准备的。 预备知识: JAVA NIO 多线程 Socket 以上知识对本文的阅读理解有一定帮助,但并非一定必需。 资源下载: MINA2.0 :暂时分为 1.x 和 2.x 两个主版本,本文只涉及 2.X 的版本,至于为什么只讲 2.X 而不讲 1.X ,比较冠冕堂皇的回答是 ---- 因为有一位伟人曾经说过:要以发展的眼光看世界。。。。。。而真实原因嘛。。。。。。咳咳。。。。大家都知道的。。我就不便多言了。下载地址:http://mina.apache.org/downloads.html。项目中使用的是 2.03, 截止本文发稿为止,最新版本为: 2.04 log4j :因为其中缺少 log4j 的包,所以做试验的朋友还需要去下一个 log4j 的包。 开发工具:eclipse Jdk: 1.6x 监视测试工具:Oracle JRockit Mission Control 4.0.1 强烈推荐,简称 JRMC ,开发过程中,用它解决了很多性能瓶颈的问题,具体使用方法,因为篇幅所限在此不做详述,请自行查询相关文档。 Hello world 的关键关键代码: Server 端的 Main 函数: IoAcceptor acceptor = newNioSocketAcceptor(); // 建立监控器 //acceptor.getSessionConfig().setReadBufferSize(2048); //acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.getFilterChain().addLast("codec ", New ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"), LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue())));//加载解/编码工厂 acceptor.setHandler(newStandBaowenHandler ()); // 设置处理类,其主要内容见下。 acceptor.bind(newInetSocketAddress(9998));// 绑定的监听端口,可多次绑定,也可同时绑定多个。 StandBaowenHandler 的关键代码 publicvoidmessageReceived(IoSession session, Object message) throwsException { session.getService().getManagedSessions(); String baowenSrc = message.toString();// 原始报文 System.out.println(baowenSrc ); } 鉴于篇幅关系,类没有写全,更具体的内容,请大家参考 mina 压缩包里自带的 demo 。但实际上服务端需要手写的部分确实只有以上二块内容,服务端就算是写好了,由次可以看出 mina 的入门之快。现在我们来测试。打开 cmd( 这个对于同仁们来说,就不用详述了吧。。。 - - !! ) ,输入 telnet 127.0.0.1 9998 (这里的 9998 要与上面代码绑定的端口一致)随便输入一些字符,敲下回车后就会在控制台显示您所输入的信息了。 做得这一步的童鞋可以说对 MINA 的使用可以说已经初步入门了,是不是觉得太简单了?这也太假了吧,这就算入门了?呵呵。。。个人认为这就是 MINA 的强悍之一,简单的几行代码就搞定了一个服务端,要入门简直是太简单了。但任何事物都有其二面性,真要把一个东西学好,用好,还要不出错,我们需要了解的东西就太多太多了。下面我们就从 MINA 框架的底层说起,因为在实际应用当中,要解决一些碰到的难点及问题,对框架的整个运作体系必须要有个全面深入的了解。只有这样才能在碰到问题时,有目的,有针对性的去找到症结所在。 MINA 深度了解 Mina 的应用层 图 1 一个设计成熟的开源框架,总是会仅可能的减少侵入性,并在整个项目中找到合适的位置,而不应对整个项目的构架设计产生过多的影响,图 1 就是 MINA 的应用层示意图。从图中和上节的 DEMO 中我们可以看到, MINA 很好的把业务代码和底层的通信隔离了开来,我们要做的仅仅是建立好监听,然后写上我们需要实现的业务逻辑就 OK 了。 MINA 的内部流程 图 2 (1) IoService :这个接口在一个线程上负责套接字的建立,拥有自己的 Selector ,监听是否有连接被建立。 (2) IoProcessor :这个接口在另一个线程上负责检查是否有数据在通道上读写,也就是说它也拥有自己的 Selector ,这是与我们使用 JAVA NIO 编码时的一个不同之处, 通常在 JAVA NIO 编码中,我们都是使用一个 Selector ,也就是不区分 IoService 与 IoProcessor 两个功能接口。另外, IoProcessor 也是 MINA 框架的核心组件之一 . 在 MINA 框架启动时,会用一个线程池来专门生成线程,来负责调用注册在 IoService 上的过滤器,并在过滤器链之后调用 IoHandler 。在默认情况 IoProcessor 会用 N+1 个线程来轮流询问监视的端口是否有数据传送,其中 n 为 cpu 的内核个数。按一般的多线程设计概念来说, IoProcessor 的线程数是越多越好,但实际上并非如此,因为大家都知道, IO 的操作是非常占用资源的,所以项目中的 IoProcessor 的线程数应该根据实际需要来定,而这个数字可以在生成 IoAcceptor 对象时进行设定。 Eg IoAcceptor acceptor = newNioSocketAcceptor( N ); (3.) IoFilter :这个接口定义一组拦截器,这些拦截器可以包括日志输出、黑名单过滤 ( 参见之前代码示例的注释部份) ,甚至是在过滤器链中利用 AOP 写上权限控制(笔者负责的部分没有涉及到这权限部分,但根据笔者对框架的理解要实现这一点应该问题不大,有需要的可以自行试验)。数据的编码( write 方向)与解码( read 方向)等功能,其中数据的 encode 与 decode 是最为重要的、也是您在使用 Mina 时最主要关注的地方(笔者曾经为了 decode 的解码编写,重写了不下十几种实现方式才找到准确无误适合项目的解码方法)。 (4.) IoHandler :这个接口负责编写业务逻辑,也就是接收、发送数据的地方。只本文的代码实例中,可以看到真正的业务代码只有一句 :System.out.println(str); 真实项目中当然不可能如此简单,但如果大家把业务处理写好,并写好业务接口,真正要用时,呆需要在此处替换即可,再次见证了 MINA 分层的彻底。 说了这么多,以上内容也只能让大家对 MINA 有个基础的了解,对于 MINA 框架优势的认识可能还不是很多,那么下面的内容,就此展开对比讨论,以便大家对 MINA 的适用场景及优点有个更全面的了解。 选择 MINA 的理由 传统socket 编程 在传统 I/O 中,最简单实现高并发服务器的编程方式就是对每一个客户开启一个线程。但是这种方式有如下几个弊端: 客户端上限很大的情况下不能及时响应 服务器硬件资源受限,性能也会急剧下降 受制于操作系统的限制 但这种设计方式优点还是有的 : 编码简单,实现容易 一定数量的连接性能比较好。 笔者的项目开发中,最开始就是采用的这种方式,写起来方便,控制起来也方便,但遗憾的是 JVM 最多只能开到 2K 多的线程,就会报 - can not create new thread 的错误。 (实现结构图见下:) 图 3 (一对一的结构图。) 改进的Socket 编程 为了解决每一个线程一个连接的模型,笔者最开始想到用多个线程处理 N 个用户,这样既可以保证处理多个用户的同时,线程开销降到系统的临界点。 这样的方式与前一个模型优势在于同样的多线程,但线程数固定,充分运用系统的优势性能,又不存在多余的开销。但是缺点也是显而易见的: 轮询的消耗不可避免。 一但产生 io 阻塞,其阻塞的时间纯属浪费。 客户数量固定的时候没有前一模型响应快 编码更加复杂。 图 4( 一对多 ) 使用MINA 框架的编程 为了解决上述的矛盾,最终的解决方案只能是异步的 NIO, 而随着笔者对 JAVA NIO 的研究发现,要实现异步的 NIO ,并应用到实际项目中,必须对 NIO 有着比较深刻的了解和把握,笔者曾尝试着利用 JAVA 原生 NIO 接口写了一个 DEMO (具体的使用方法,感兴趣的童鞋可以 GOOGLE 一把,你会发现用原生 NIO 写程序与使用 MINA 写程序对比起来是多么的痛苦。。。。 -_-!! ),但由于笔者在这方面的底子过薄,试验结果不如人意,但要对 NIO 进行更为深入的学习,时间上面也不允许。直到 MINA 框架的映入眼帘,以上难题不再是问题。。。。以下是利用 MINA 的实现方式的一个简图。 图 5 其中 IoService 接口会专门起一个线程来轮询是否有新的连接产生,一旦有连接产生则通知 IoProcessor, 而 IoProcessor 则起 n+1 个线程来检查连接是否有数据在上面读写 (注二) 。一旦有连接产生,并有数据读写,则通知 decode 或 ENCODE ,进行报文的解码或编码,将处理后的报文再交给业务类进行业务处理。其中 IoProcessor 是处理请求的分配,包括选择 Selector ,超时验证,状态记录等。总之这个类和 IoService 一起配合工作,封装了 NIO 底层的实现以及 MINA 框架内部的功能的支持 . 由于过于复杂,篇幅所限所以不作详细讲解 . 结合实例,并根据以上的图文讲解,我们可以很轻易的总结出利用 MINA 编程的几个大致步骤: 创建一个实现了 IoService 接口的类 设置一个实现了 IoFilter 接口的过滤器(如果有需要的情况下) 设置一个 IoHandler 接口实现的处理类,用于处理事件(必须) 对 IoService 绑定一个端口开始工作 关于 MINA 的大致运行流程及使用步骤,我们就暂时分析到这,具体更细节的关于一些核心类的使用方法及自定义编码器的方法,大家可以直接参考 mina 中所带的几个案例,写得非常详细,足够解决大家在项目中碰到的大部分问题,接下来要与大家交流的是使用 MINA 时非常有可能遇到的一些扩展知识。 注二 :这一点请特别注意,因 IoProcessor 也是相当于轮询机制,这导致在报文过长时,或其它原因导致报文不能一次传输完毕的情况下,必须保存同一连接 ( 在 MINA 中是以 IoSession 类生成的对象 ) 下的上一次状态,这样才能截取到一个完成的报文,而这也是 Decode( 编码器 ) 需要做的核心工作 , 新手往往就在这上面要跌跟斗。 扩展知识 这部分的内容要说起来跟 MINA 的使用关联不大,但实际情况是用上了 MINA 框架的项目基本上多多少少都会涉及到这一块,那就是多线程的编程。多线程的编程历来是 JAVA 编程中的重难点,很多新手碰到此类编程问题,往往都找不出原因所在,甚至一些有多年编程经验的程序员也会在这上面偶而犯下错,在这里笔者也没有能力通过很短的篇来解说多线程,那么就向大家介绍几个类及一些小常识吧,希望能给大家带来帮助。 MINA框架使用总结 参考:http://xinsync.xju.edu.cn/index.php/archives/category/prglang/java/mina 简单介绍:MINA框架是对java的NIO包的一个封装,简化了NIO程序开发的难度,封装了很多底层的细节,然开发者把精力集中到业务逻辑上来,最近做了一个相关的项目,为了备忘对MINA做一个总结。 下面这个start方法用来初始化MINA: Java代码 1. private void start(int port, WebContext ctx) 2. throws IOException, InstantiationException 3. , IllegalAccessException, ClassNotFoundException { 4. //初始化Acceptor 5. NioSocketAcceptor acceptor = new NioSocketAcceptor(5); 6. 7. java.util.concurrent.Executor threadPool = Executors.newFixedThreadPool(1500);//建立线程池 8. //加入过滤器(Filter)到Acceptor 9. acceptor.getFilterChain().addLast("exector", new ExecutorFilter(threadPool)); 10. acceptor.getFilterChain().addLast("codec", 11. new ProtocolCodecFilter(new WebDecoder(),new WebEncoder())); 12. LoggingFilter filter = new LoggingFilter(); 13. filter.setExceptionCaughtLogLevel(LogLevel.DEBUG); 14. filter.setMessageReceivedLogLevel(LogLevel.DEBUG); 15. filter.setMessageSentLogLevel(LogLevel.DEBUG); 16. filter.setSessionClosedLogLevel(LogLevel.DEBUG); 17. filter.setSessionCreatedLogLevel(LogLevel.DEBUG); 18. filter.setSessionIdleLogLevel(LogLevel.DEBUG); 19. filter.setSessionOpenedLogLevel(LogLevel.DEBUG); 20. acceptor.getFilterChain().addLast("logger", filter); 21. 22. acceptor.setReuseAddress(true);//设置的是主服务监听的端口可以重用 23. 24. acceptor.getSessionConfig().setReuseAddress(true);//设置每一个非主监听连接的端口可以重用 25. acceptor.getSessionConfig().setReceiveBufferSize(1024);//设置输入缓冲区的大小 26. acceptor.getSessionConfig().setSendBufferSize(10240);//设置输出缓冲区的大小 27. //设置为非延迟发送,为true则不组装成大包发送,收到东西马上发出 28. acceptor.getSessionConfig().setTcpNoDelay(true); 29. //设置主服务监听端口的监听队列的最大值为100,如果当前已经有100个连接,再新的连接来将被服务器拒绝 30. acceptor.setBacklog(100); 31. acceptor.setDefaultLocalAddress(new InetSocketAddress(port)); 32. //加入处理器(Handler)到Acceptor 33. acceptor.setHandler(new WebHandler()); 34. acceptor.bind(); 35. } NioSocketAcceptor是MINA的适配器,一切都是从这里开始的。MINA中有个过滤器和处理器的概念,过滤器用来过滤数据,处理 器用来处理数据。具体来说MINA的处理模型就是request->过滤器A->过滤器B->处理器->过滤器B->过滤 器A->response,这里的request和response类似serlvet的request和response。 Java代码 1. acceptor.getFilterChain().addLast("exector", new ExecutorFilter(threadPool)); 2. //加入一个线程池到适配器,这里用的是jdk自带的线程池 Java代码 1. acceptor.getFilterChain().addLast("codec", 2. new ProtocolCodecFilter(new WebDecoder(),new WebEncoder())); 3. // 这里是处理逻辑的关键部位,请求的处理都是在WebDecoder类和WebEncoder类中处理,可以明显从命名上看出来一个是用来解码,另一个是用 来编码,requet过来后先进入WebDecoder类(实现了ProtocolDecoder接口)进行解码处理,这里可以加入自己的逻辑把传进来的 流解码成自己需要的信息。而WebEncoder类(实现了ProtocolEncoder接口)是进行编码,在这个类里面加入自己的逻辑把处理后的信息 组装发送给客户端(response)。而在解码和编码过程中WebHandler(扩展了IoHandlerAdapter抽象类)起到了处理器的作 用。 4. //request->WebDecoder->WebHandler->WebEncode->response 现在详细描述一下request->WebDecoder->WebHandler->WebEncode->response的过程: 客户端发送一个请求到MINA服务器,这里相当于来了一个requet。请求首先来到 Java代码 1. WebDecoder类(实现了ProtocolDecoder接口)中的 2. boolean decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception{}方法 3. /* 4. 参数in:用户请求信息全存在这里,读数据就从in这里读。 5. 参数out:用来输出处理后的数据到Filter的下一个过滤器,如果没有过滤器了就输出到WebHandler,这里有点和 6. servelt的过滤器类似。利用out.write(Object object);这个函数可以把数据传到下一个Filter。我们可以自己定义 7. 一个对象,我们假设为Request,用它来传递消息,那末这里就可以写成out.write(new RequsetMessage()); 8. 如果这个方法返回false,就是说当前逻辑包还没接收完(也就是当前的IoBuffer并没有包含足够的数据),需要再次 9. 执行decode方法(再次获取新的IoBuffer),用来获取足够的数据。如果返回值为true就表示可以不执行decode方 10. 法了,但是要激活handler方法,必须要调用out.write方法。 11. public class RequestMessage{}//这里什么也不做 12. */ 然后到 Java代码 1. WebHandler(扩展了IoHandlerAdapter抽象类)中的 2. void messageReceived(IoSession session, Object message) throws Exception{}方法 3. WriteFuture future = session.write(response);//session中必须加入这个代码,才会激活encode方法 4. future.addListener(IoFutureListener.CLOSE);//这个的作用是发送完毕后关闭连接,加了就是短连接,不然是长连接 5. IoFutureListener里面有个operationComplete(IoFuture future)方法,当流发送完成之后才调用这个方法。 6. /* 7. 参数message:用来获取Filter传递过来的对象.对应代码RequestMessage request = (RequestMessage) message; 8. 参数session:用来发送数据到Filter.对应代码session.write(new ResponseMessage()); 9. public class ResponseMessage{}//这里什么也不做,假设存放处理后的数据 10. 注意:对于一个MINA程序而言,对于WebHandler类只生成一个对象,所以要考虑线程安全问题 11. */ 然后到 Java代码 1. WebEncoder类(实现了ProtocolEncoder接口)中的 2. boolean encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception{} 3. 方法 4. /* 5. 参数message:用来获取上一个Filter节点的数据或者处理器的数据(如果这个过滤器为最靠近处理器的那个) 6. ResponseMessage response = (ResponseMessage)message; 7. 参数out:用来输出数据到下一个Filter节点过或者到客户端,用out.write(Object encodedMessage)把数据发送 8. 出去,但是要注意的是,如果这个Filter下一个节点如果是客户端的话,那个这个encodedMessage数据必须为 9. IoBuffer类型的,可以利用IoBuffer.wrap(byte[] byteArray)这个方法来格式化输出数据 10. */ · NIO_TEST.rar (13.9 KB) · 描述: 阻塞和非阻塞io的简单通信程序,自己写的(写的不好)放这里做个备份 · 下载次数: 1092 mina使用UDP协议的小结 mina是一个优秀的网络应用框架,优点当然是多多的, 目前使用中,发生的一些问题主要集中在线程的注销上,发现mina自身线程的注销方面做得不是太好.这里总结下UDP协议上的一些线程注销上需要注意的东西. DatagramConnectorConfig对象: config.setSessionRecycler(IoSessionRecycler.NOOP); 这里面实际上已经有一个超时重发的线程被mina丢掉了,但是它还在运行. 改进方法: ((ExpiringSessionRecycler)config.getSessionRecycler()).stopExpiring(); config.setSessionRecycler(IoSessionRecycler.NOOP); 先禁掉系统默认的超时重发线程. config的ThreadModel里的Executor也不能自己注销的,注销时,需要这样: ((ThreadPoolExecutor)((ExecutorThreadModel)config.getThreadModel()).getExecutor()).shutdown(); DatagramConnector对象: connector = new DatagramConnector(); 这里面也有一个NewThreadExecutor线程处理对象在运行,也是关不掉的. 改进方法: ScheduledThreadPoolExecutor threadPoolExecutor = new ScheduledThreadPoolExecutor(5, new TaskThreadFactory("protocol-pool-"), new ThreadPoolExecutor.DiscardOldestPolicy()); connector = new DatagramConnector(threadPoolExecutor); 自己定义连接使用的线程池,自己可以控制注销线程池. ByteBuffer对象: 这个对象里有这么一个东西 private static ByteBufferAllocator allocator = new PooledByteBufferAllocator(); 这个东西里面有一个糟糕的东西 PooledByteBufferAllocator里有一个Expirer对象,它也是一个死线程,mina没有直接的方法可以停掉它,在注销时,可以用下面这种笨办法让mina处理掉自己 ByteBuffer.setAllocator(new ByteBufferAllocator(){ public ByteBuffer allocate(int capacity, boolean direct) { return null; } public void dispose() { } public ByteBuffer wrap(java.nio.ByteBuffer nioBuffer) { return null; }}); 一般应用是可以不管这些东西的. 基于MINA构建简单高性能的NIO应用 前言 MINA是Trustin Lee最新制作的Java通讯框架。通讯框架的主要作用是封装底层IO操作,提供高级的操作API。比较出名的通讯框架有C++的ACE、Python的 Twisted,而Java的通讯框架还有QuickServer、Netty2、Cindy、Grizzly等。 2004年6月,Trustin Lee发布了一个通讯框架Netty2,是Java界第一个事件模型架构的通讯框架,Cindy也从中借鉴了不少思想。由于Netty2的架构不是很 好,Trustin Lee在2004年底加入Apache Directory组之后,重写了整个框架,取名为MINA。MINA是一个基于Java NIO的通讯框架,Java从1.4开始引入NIO,提供了一个非阻塞、高性能的IO底层。 目前使用MINA的产品并不是很多,比较出名的就有Apache Directory、Openfire(Jive出品的一个XMPP产品)、red5(研究flash流媒体flv技术的朋友应该很清楚这个东 西,adobe fms的竞争者,国内也有视频网站在使用)等等。 笔者在07年初的时候,公司新项目需要用Java实现一个Socket Server,对比了Netty2、Cindy、QuickServer和MINA。当时Netty2已经停止开发,也找不到官方网站和代码,比较了另外 三个框架之后,毅然选择了当时文档比较缺乏和使用群较少的MINA,一年以来的使用经验来看,感觉还是很不错的,MINA有着清晰的架构,很方便做自定义 的扩充。在1.0发布之后,官方网站充实了很多,增加了不少文档,也听到越来越多的朋友开始使用MINA。后来专门针对JDK 1.5发布了1.1的版本,使用JDK内置的concurrent代替backport-util-concurrent。目前1.0和1.1同时存在, 但已经不再增加新功能,仅仅发布bug fix的版本,新功能都在2.0中实现,2.0调整了架构,性能有更大的提升,目前还在开发中。 基本特性 通过Java NIO支持TCP和UDP协议,另外还支持RS232和VM内通讯。由于MINA有清晰的架构,你也能很简单地实现一个底层网络协议。目前不支持阻塞 IO,似乎还没有计划支持,当然你可以在其之上实现一个阻塞的模型,不过按照笔者的经验来说,非阻塞IO更适合Server端编程。 一个类似ServletFilter的过滤器模型。这是笔者认为MINA的精髓所在,通过引入过滤器模型,可以将一些非业务的功能独立开来,层次更清晰,很有AOP的思想,可以很方便地进行日志、协议转换、压缩等等功能,还能在运行中动态增加或去掉功能。 可以直接使用底层的ByteBuffer,也可以使用用户定义的消息Object和编码方式。 高度可定制的线程模型,单线程、一个线程池,或者类似SEDA的多个线程池。 SSL支持,攻击防御和流量控制,mock测试友好,JMX支持,Spring集成,你还需要更多吗。 一个简单的例子 MINA使用非常简单,笔者以前做过一段时间传统的Java Socket开发,不过一直对Java NIO不是很理解,但是MINA很快就上手了,MINA封装了NIO繁琐的部分,使你可以更专注于业务功能实现。话不多说,让我们来看一个简单的例子,一 个很常见的例子,时间服务器。 我们的实现目标是一个能响应多个客户端的请求,然后返回服务器当前的系统时间的功能。传统的Java Socket程序,我们需要每accept一个客户端连接,就创建一个新的线程来响应,这会令到系统整体负载能力有较大的限制,而且我们必须手工编写连接 管理等代码。让我们来看看MINA是怎么处理的。 首先我们从官方网站下载MINA 1.1,这里我们假设JDK为1.5以上的版本,如果你使用的是JDK 1.4,请下载MINA 1.0,MINA 1.0跟1.1几乎一样,但是强烈建议使用JDK 1.5以上以获得更好的性能。 解开压缩包之后,能看见很多jar包,这里暂不介绍每个包的具体作用,可以把所有包都导入项目。值得留意的是MINA使用了一个slf4j的日志库,该日志库大有取缔common-logging之势。 这里是我们的主程序,非常简单。 首先我们需要一个IoAcceptor,这里我们选择了一个SocketAcceptor,也就是TCP协议。 然后,我们给应用加上日志过滤器和协议编码过滤器。 最后,我们把acceptor bind到本机的8123端口,并且使用TimeServerHandler来实现协议。 TimeServerHandler是我们实现具体业务功能的地方。 IoHandlerAdapter提供了7个事件方法,我们要做的事情仅仅是挑选我们需要做出响应的事件进行重载。在我这个例子了,我重载了两个方法。 sessionCreated会在客户端连接的时候调用,通常我们会在这里进行一些初始化操作,我这里仅仅是打印一条信息。 messageReceived就是整个Handler的中心部分,每一个从客户端发过来的消息都会转化成对该方法的调用。由于我们加入了协议编码过滤 器,因此这里获得的Object msg是一个String,而不是默认的ByteBuffer(下文会详细介绍ProtocolCodecFilter)。这里我们实现了一个很简单的业 务功能,如果用户输入的是quit,就断开连接,否则就输入当前时间。可以看出,IoSession封装了对当前连接的操作。 至此,我们就实现了一个时间服务器。 01.public class TimeServer { 02. public static void main(String[] args) throws IOException { 03. IoAcceptor acceptor = new SocketAcceptor(); 04. 05. SocketAcceptorConfig cfg = new SocketAcceptorConfig(); 06. cfg.getFilterChain().addLast( "logger", new LoggingFilter() ); 07. cfg.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TextLineCodecFactory())); 08. 09. acceptor.bind( new InetSocketAddress(8123), new TimeServerHandler(), cfg); 10. System.out.println("Time server started."); 11. } 12.} 01.public class TimeServerHandler extends IoHandlerAdapter { 02. public void messageReceived(IoSession session, Object msg) throws Exception { 03. String str = (String) msg; 04. if( "quit".equalsIgnoreCase(str) ) { 05. session.close(); 06. return; 07. } 08. 09. Date date = new Date(); 10. session.write( date.toString() ); 11. System.out.println("Message written..."); 12. } 13. 14. public void sessionCreated(IoSession session) throws Exception { 15. System.out.println("Session created..."); 16. } 17.} MINA架构 这里,我借用了一张Trustin Lee在Asia 2006的ppt里面的图片来介绍MINA的架构。 Remote Peer就是客户端,而下方的框是MINA的主要结构,各个框之间的箭头代表数据流向。 大家可以对比刚刚的例子来看这个架构图,IoService就是整个MINA的入口,负责底层的IO操作,客户端发过来的消息就是由它处理。刚刚我们使用 的IoAcceptor就是一个IoService,之所以抽象成IoService,是因为MINA用同样的架构来处理服务器和客户端编程,IoService的另一个子类就是IoConnector,用于客户端。不过根据笔者的使用经验,使用非阻塞的模型进行客户端编程非常的不方便,你最好寻求其他的阻塞通讯框架。 IoService把数据转化成一个一个的事件,传递给IoFilterChain。你可以加入一连串的IoFilter,进行各种功能。笔者的建议是将一些功能性的,业务不相关的代码,用IoFilter来实现,使得整个应用结构更清晰,也方便代码重用。 被IoFilter处理过的事件,发送给 IoHandler,然后我们在这里实现具体的业务逻辑。这个部分很简单,如果你有Swing的使用经验的话,你会发现它跟Swing的事件非常相像,你 要做的事情,仅仅是重载你需要的方法,然后编写具体的业务功能。在这其中,最重要的一个方法就是messageReceived了。 值得留意的是一个IoSession的类,每一个IoSession实例代表这一个连接,我们需要对连接进行的任何操作都通过这个类来实现。 从IoHandler通过调用IoSession.write等方法向客户端发送的消息,会通过跟输入数据相反的次序依次传递,直至由IoService负责把数据发送给客户端。 这就已经是MINA的全部,是不是很简单。 接下来,我会详细介绍我们编写具体代码的时候主要涉及到的三个类,IoHandler、IoSession和IoFilter。 IoHandler MINA的内部实现了一个事件模型,而IoHanlder则是所有事件最终产生响应的位置。每一个方法的名字很明确表明该事件的含义。 messageReceived是接收客户端消息的事件,我们应该在这里实现业务逻辑。messageSent是服务器发送消息的事件,一般情况下我们不 会使用它。sessionClosed是客户端断开连接的事件,可以在这里进行一些资源回收等操作。值得留意的是,客户端连接有两个事 件,sessionCreated和sessionOpened,两者稍有不同,sessionCreated是由I/O processor线程触发的,而sessionOpened在其后,由业务线程触发的,由于MINA的I/O processor线程非常少,因此如果我们真的需要使用sessionCreated,也必须是耗时短的操作,一般情况下,我们应该把业务初始化的功能 放在sessionOpened事件中。 细心的读者可能会发现,我们刚刚的例子继承的是IoHandlerAdapter,IoHandlerAdapter其实就是一个IoHanlder的空的实现,这样我们就可以不用重载不感兴趣的事件。 IoSession IoSession是一个接口,MINA里很多的地方都使用接口,很好地体现了面向接口编程的思想。它提供了对当前连接的操作功能,还有用户定义属性的存 储功能,这点非常重要。IoSession是线程安全的,也就是我们能够在多线程环境中随意操作IoSession,这点给开发带来很大的好处。我们来看 看具体提供的方法,笔者列举一些比较常用和重要的方法 在这里,笔者把IoSession的方法大致分成三类 第一类,连接操作功能。 最主要的方法有两个,向客户端发送消息和断开连接。可以看的出,write接受的变量是一个Object,但是实际上应该传入什么类型呢?具体还得看你是 否使用了ProtocolCodecFilter(下面会详细介绍),如果使用了ProtocolCodecFilter,那这个message将可能是 一个String,或者是一个用户定义的JavaBean。默认的情况,message是一个ByteBuffer。ByteBuffer是MINA的一 个类,跟java.nio.ByteBuffer类同名,MINA 2.0将会将它改成IoBuffer,以避免讨论上的误会。 另一个值得留意的是Future类,MINA是一个非阻塞的通信框架,其中一个明显的体现就是调用IoSession.write方法是不会阻塞的。用户 调用了write方法之后,消息内容会发到底层等候发送,至于什么时候发出,就不得而知了。当然,实际上调用了write之后,数据几乎是立刻发出的,这 得益与NIO的高性能。但是,如果我们必须确认了消息发出,然后进行某些处理,我们就需要使用Future类,以下是一个很常见的代码 通过调用future.join,程序就会阻塞,直至消息处理结束。我们还能通过future.isWritten得知消息是否成功发送。 在这里,笔者顺便说一个实际使用的发现,消息发送是会自动合并的,简单来说,如果在很短的时间里,对同一个IoSession进行了两次write操作, 客户端有可能只收到一条消息,而这条消息就是服务器发出的两条消息前后接起来。这样的设计可以在高并发的时候节省网络开销,而笔者的实际使用过程中,效果 也相当好。但是如果这样行为会导致客户端工作不正常,你也可以通过参数关闭它。 第二类,属性存储操作。 通常来说,我们的系统是有用户状态的,我们就需要在连接上存储用户属性,IoSession的Attribute就是这样一个功能。例如两个连接同时连入 服务器,一个连接是用户A,用户ID是13,另一个连接是用户B,用户ID是14,我们就可以在用户登录成功之后,调用 IoSession.setAttribute(“login_id”,13),然后在其后的操作中,通过 IoSession.getAttribute(“login_id”)获得当前登录用户ID,并进行相应的操作。简单来说,就是一个类似 HttpSession的功能,当然具体的实现方法不一样。 第三类,连接状态。 这里就不多说了,从方法名上我们就能知道它具体的功能。 IoFilter 过滤器是MINA的一个很重要的功能。IoFilter也是一个接口,但是相对比较复杂,这里就不列举它的方法了。简单来说IoFilter就像 ServletFilter,在事件被IoHandler处理之前或之后进行一些特定的操作,但是它比ServletFilter复杂,可以处理很多种事 件,除了包括IoHandler的7个事件以外,还有一些内部的事件可以进行操作。 MINA提供了一些常用的IoFilter实现,例如有LoggingFilter(日志功能)、BlacklistFilter(黑名单功能)、 CompressionFilter(压缩功能)、SSLFilter(SSL支持),这些过滤器比较简单,通过阅读它们的源代码,能够更进一步理解过滤 器的实现。笔者在这里要重点介绍两个过滤器,ProtocolCodecFilter和ExecutorFilter ProtocolCodecFilter 网络传输的内容其实本质是一个二进制流,但是我们的业务功能不会,或者说不应该去直接操作二进制流。MINA默认向IoHandler传入的 message是一个ByteBuffer,如果我们直接在IoHandler操作ByteBuffer,会导致大量协议分析的代码和实际的业务代码混杂 在一起。最适合的做法,就是在IoFilter把ByteBuffer转换成String或者JavaBean,ProtocolCodecFilter 正是这样的一个功能的过滤器。 使用ProtocolCodecFilter很简单,我们只要把ProtocolCodecFilter加入到FilterChain就可以了,但是我们 需要提供一个ProtocolCodecFactory。其实ProtocolCodecFilter仅仅是实现了过滤器部分的功能,它会将最终的转换工 作,交给从ProtocolCodecFactory获得的Encode和Decode。如果我们需要编写自己的ProtocolCodec,就应该从 ProtocolCodecFactory入手。MINA内置了几个ProtocolCodecFactory,比较常用的就是 ObjectSerializationCodecFactory和TextLineCodecFactory。 ObjectSerializationCodecFactory是Java Object序列化之后的内容直接跟ByteBuffer互相转化,比较适合两端都是Java的情况使用。TextLineCodecFactory就是 String跟ByteBuffer的转化,说白了就是文本,例如你要实现一个SMTP服务器,或者POP服务器,就可以使用它。而笔者的实际使用,大多 数情况都是使用 TextLineCodecFactory。 这里提及一下IoFilter的顺序问题,IoFilter是有加入顺序的,例如,先加入LoggingFilter再加入 ProtocolCodecFilter,和先加入ProtocolCodecFilter再加入LoggingFilter的效果是不一样的,前者 LoggingFilter写入日志的内容是ByteBuffer,而后者写入日志的是转换后具体的类,例如String。实际使用的时候,一定要处理好 过滤器的顺序。 ExecutorFilter 另一个重要的过滤器就是ExecutorFilter。这里,我需要先说明一下MINA的线程工作模式,MINA默认是单线程处理所有客户端的消息,也就 是说,即使你在一台8CPU的机器上面跑,可能也只用到一个CPU,另外,如果某次消息处理太耗时,就会导致其他消息等待,整体的吞吐量下降。很多朋友抱 怨MINA的性能差,其实是因为他们没有加入ExecutorFilter的缘故。ExecutorFilter设计的很精巧,大家可以仔细阅读一下源代 码,它会将同一个连接的消息合并起来按顺序调用,不会出现两个线程同时处理同一个连接的情况。 1.IoAcceptor acceptor = ...; 2.IoServiceConfig acceptorConfig = acceptor.getDefaultConfig(); 3.acceptorConfig.setThreadModel(ThreadModel.MANUAL); 这里再次提及IoFitler的顺序问题,一般情况下,我们会将ExecutorFilter放在ProtocolCodecFilter之后,因为我们 不需要多线程地执行ProtocolCodec操作,用单一线程来进行ProtocolCodec性能会比较高,而具体的业务逻辑可能还设计数据库操作, 因此更适合放在不同的线程中运行。 优化指南 MINA默认配置的性能并不是很高的,部分原因是MINA目前还保留初期版本的架构,另外一个原因是因为JVM的发展。 1.IoAcceptor acceptor = new SocketAcceptor(Runtime.getRuntime().availableProcessors() + 1, Executors.newCachedThreadPool()); 首先我们关闭默认的ThreadModel设置 ThreadModel是一个很简单的线程实现,用于IoService。但是它实在太弱,以至于在并发环境产生大量问题。在MINA 2.0中,ThreadModel直接被取消。你应该使用ExecutorFilter来实现线程。 1.acceptor.getDefaultConfig().getFilterChain().addLast("threadPool", new ExecutorFilter(Executors.newCachedThreadPool()); 然后我们增加I/O处理线程 每一个Acceptor/Connector都使用一个线程来处理连接,然后把连接发送给I/O processor进行读写操作,我们只可以修改I/O processor使用的线程数,用以下代码设置 当然是要将ExecutorFilter加入,上文已经很详细地描述了 笔者在开发过程中,多次遇到OutOfMemoryError,经过研究之后才发现原因。MINA默认是使用direct memory实现ByteBuffer池的方案(以下简称direct buffer),通过JNI在内存开辟一段空间来使用,该方案在早期的MINA版本中是一个非常好的特性,那是因为MINA开发初期,JVM并没有现在的 强大,带有池效果的direct buffer性能比较好。但是当我们使用-Xms -Xmx等指令增加JVM可使用的内存,那仅仅增加了堆的内存空间,而direct memory的空间并没有增加,导致MINA实际使用的时候经常出现OutOfMemoryError。如果你的确想使用direct memory,可以通过-XX:MaxDirectMemorySize选项来设置。不过笔者不建议这样做,因为最新的测试表明,在现代的JVM里 面,direct memory比堆的表现更差。这里可能有读者会觉得奇怪,为什么不用池,而要用堆呢,而且还需要gc。那是因为现在的JVM gc能力已经很强了,而且在并发环境里面,pool的同步也是一个性能的问题。我们可以通过这样的代码进行设置 MINA 2.0已经默认把直接内存分配改成堆,为了提供最好的性能和稳定性。 1.ByteBuffer.setUseDirectBuffers(false); 2.ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); 最后一条优化技巧就是,把你的应用部署在 Linux上, 并且打开Java NIO使用Linux epoll的功能。可能你还没听过epoll,但是你应该听过Lighttpd、Nginx、Squid等,得益于epoll,它们提供很高的网络性能, 还占用非常少的系统资源。JDK6已经默认把epoll配置打开,因此笔者建议把你的应用部署在JDK6上面,也同时因为JDK6还有别的优化特性。如果 你的应用必须部署在JDK5上,你也可以通过参数把epoll支持打开。 使用Mina传输Java对象 下面是所要传输的实体类UserInfo.java 01 package com.mina.model; 02 03 import java.io.Serializable; 04 05 /** 06 * @see Mina传输的实体类,要求其实现Serializable接口 07 */ 08 @SuppressWarnings("serial") 09 public class UserInfo implements Serializable{ 10 private String name; 11 12 public UserInfo(String name){ 13 this.name = name; 14 } 15 16 public String getName() { 17 return name; 18 } 19 } 下面是Mina编写的服务端主类MyServer.java 01 package com.mina.server; 02 03 import java.io.IOException; 04 import java.net.InetSocketAddress; 05 06 import org.apache.mina.core.service.IoAcceptor; 07 import org.apache.mina.core.session.IdleStatus; 08 import org.apache.mina.filter.codec.ProtocolCodecFilter; 09 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 10 import org.apache.mina.transport.socket.nio.NioSocketAcceptor; 11 12 public class MyServer { 13 public static void main(String[] args) throws IOException { 14 int bindPort = 9876; 15 16 IoAcceptor acceptor = new NioSocketAcceptor(); 17 18 acceptor.getSessionConfig().setReadBufferSize(2048); 19 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 20 21 //设定服务器解析消息的规则是以Object对象为单位进行传输,注意此时该对象需实现Serializable接口 22 acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); 23 24 acceptor.setHandler(new ServerHandler()); 25 26 acceptor.bind(new InetSocketAddress(bindPort)); 27 28 System.out.println("MinaServer is startup, and it`s listing on := " + bindPort); 29 } 30 } 下面是服务端的消息处理器ServerHandler.java 01 package com.mina.server; 02 03 import org.apache.mina.core.service.IoHandlerAdapter; 04 import org.apache.mina.core.session.IoSession; 05 06 import com.mina.model.UserInfo; 07 08 public class ServerHandler extends IoHandlerAdapter { 09 @Override 10 public void messageReceived(IoSession session, Object message) throws Exception { 11 UserInfo ui = (UserInfo)message; //我们已设定了服务器解析消息的规则是以UserInfo对象为单位进行传输 12 System.out.println("收到客户机发来的用户名:" + ui.getName()); 13 session.write(new UserInfo(ui.getName() + "==>>是个神秘的人")); 14 } 15 16 @Override 17 public void sessionOpened(IoSession session) throws Exception{ 18 System.out.println("InComing Client:" + session.getRemoteAddress()); 19 } 20 } 接下来是Mina编写的客户端主类MyClient.java 01 package com.mina.client; 02 03 import java.net.InetSocketAddress; 04 05 import org.apache.mina.core.service.IoConnector; 06 import org.apache.mina.filter.codec.ProtocolCodecFilter; 07 import org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory; 08 import org.apache.mina.transport.socket.nio.NioSocketConnector; 09 10 import com.mina.model.UserInfo; 11 12 public class MyClient { 13 public static void main(String[] args) { 14 IoConnector connector = new NioSocketConnector(); 15 16 connector.setConnectTimeoutMillis(30000); 17 18 connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ObjectSerializationCodecFactory())); 19 20 connector.setHandler(new ClientHandler(new UserInfo("张起灵"))); 21 22 connector.connect(new InetSocketAddress("127.0.0.1", 9876)); 23 } 24 } 最后是客户端的消息处理器ClientHandler.java 查看源码 打印? 01 package com.mina.client; 02 03 import org.apache.mina.core.service.IoHandlerAdapter; 04 import org.apache.mina.core.session.IoSession; 05 06 import com.mina.model.UserInfo; 07 08 public class ClientHandler extends IoHandlerAdapter { 09 private final UserInfo ui; 10 11 public ClientHandler(UserInfo ui){ 12 this.ui = ui; 13 } 14 15 @Override 16 public void sessionOpened(IoSession session) throws Exception { 17 session.write(ui); 18 } 19 20 @Override 21 public void messageReceived(IoSession session, Object message) throws Exception { 22 UserInfo ui = (UserInfo)message; 23 System.out.println("收到服务机发来的用户名:" + ui.getName()); 24 } 25 26 @Override 27 public void exceptionCaught(IoSession session, Throwable cause) throws Exception { 28 System.out.println("与" + session.getRemoteAddress() + "通信过程中出现错误:[" + cause.getMessage() + "]..连接即将关闭...."); 29 session.close(false); 30 session.getService().dispose(); 31 } 32 } 使用mina 作代理服务器例子 Main 01 importjava.net.InetSocketAddress; 02 03 importorg.apache.mina.core.service.IoConnector; 04 importorg.apache.mina.transport.socket.nio.NioSocketAcceptor; 05 importorg.apache.mina.transport.socket.nio.NioSocketConnector; 06 07 /** 08 * (
Entry point) Demonstrates how to write a very simple tunneling proxy 09 * using MINA. The proxy only logs all data passing through it. This is only 10 * suitable for text based protocols since received data will be converted into 11 * strings before being logged. 12 *
13 * Start a proxy like this:
14 * org.apache.mina.example.proxy.Main 12345 www.google.com 80
15 * and open http://localhost:12345 in a 16 * browser window. 17 *
18 * 19 * @author The Apache MINA Project (dev@mina.apache.org) 20 * @version $Rev$, $Date$ 21 */ 22 publicclassMain { 23 24 publicstaticvoidmain(String[] args) throwsException { 25 if(args.length != 3) { 26 System.out.println(Main.class.getName() 27 + "
"); 28 return; 29 } 30 31 // Create TCP/IP acceptor. 32 NioSocketAcceptor acceptor = newNioSocketAcceptor(); 33 34 // Create TCP/IP connector. 35 IoConnector connector = newNioSocketConnector(); 36 37 // Set connect timeout. 38 connector.setConnectTimeoutMillis(30*1000L); 39 40 ClientToProxyIoHandler handler = newClientToProxyIoHandler(connector, 41 newInetSocketAddress(args[1], Integer.parseInt(args[2]))); 42 43 // Start proxy. 44 acceptor.setHandler(handler); 45 acceptor.bind(newInetSocketAddress(Integer.parseInt(args[0]))); 46 47 System.out.println("Listening on port "+ Integer.parseInt(args[0])); 48 } 49 50 } [代码] ServerToProxyIoHandler 1 /** 2 * Handles the server to proxy part of the proxied connection. 3 * 4 * @author The Apache MINA Project (dev@mina.apache.org) 5 * @version $Rev$, $Date$ 6 * 7 */ 8 publicclassServerToProxyIoHandler extendsAbstractProxyIoHandler { 9 } [代码] AbstractProxyIoHandler 01 importjava.nio.charset.Charset; 02 03 importorg.apache.mina.core.buffer.IoBuffer; 04 importorg.apache.mina.core.service.IoHandlerAdapter; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.slf4j.Logger; 07 importorg.slf4j.LoggerFactory; 08 09 /** 10 * Base class of {@link org.apache.mina.core.service.IoHandler} classes which handle 11 * proxied connections. 12 * 13 * @author The Apache MINA Project (dev@mina.apache.org) 14 * @version $Rev$, $Date$ 15 * 16 */ 17 publicabstractclassAbstractProxyIoHandler extendsIoHandlerAdapter { 18 privatestaticfinalCharset CHARSET = Charset.forName("iso8859-1"); 19 publicstaticfinalString OTHER_IO_SESSION = AbstractProxyIoHandler.class.getName()+".OtherIoSession"; 20 21 privatefinalLogger logger = LoggerFactory.getLogger(getClass()); 22 23 @Override 24 publicvoidsessionCreated(IoSession session) throwsException { 25 session.suspendRead(); 26 session.suspendWrite(); 27 } 28 29 @Override 30 publicvoidsessionClosed(IoSession session) throwsException { 31 if(session.getAttribute( OTHER_IO_SESSION ) != null) { 32 IoSession sess = (IoSession) session.getAttribute(OTHER_IO_SESSION); 33 sess.setAttribute(OTHER_IO_SESSION, null); 34 sess.close(false); 35 session.setAttribute(OTHER_IO_SESSION, null); 36 } 37 } 38 39 @Override 40 publicvoidmessageReceived(IoSession session, Object message) 41 throwsException { 42 IoBuffer rb = (IoBuffer) message; 43 IoBuffer wb = IoBuffer.allocate(rb.remaining()); 44 rb.mark(); 45 wb.put(rb); 46 wb.flip(); 47 ((IoSession) session.getAttribute(OTHER_IO_SESSION)).write(wb); 48 rb.reset(); 49 logger.info(rb.getString(CHARSET.newDecoder())); 50 } 51 } [代码] ClientToProxyIoHandler 01 importjava.net.SocketAddress; 02 03 importorg.apache.mina.core.RuntimeIoException; 04 importorg.apache.mina.core.future.ConnectFuture; 05 importorg.apache.mina.core.future.IoFutureListener; 06 importorg.apache.mina.core.service.IoConnector; 07 importorg.apache.mina.core.session.IoSession; 08 09 /** 10 * Handles the client to proxy part of the proxied connection. 11 * 12 * @author The Apache MINA Project (dev@mina.apache.org) 13 * @version $Rev$, $Date$ 14 * 15 */ 16 publicclassClientToProxyIoHandler extendsAbstractProxyIoHandler { 17 privatefinalServerToProxyIoHandler connectorHandler = newServerToProxyIoHandler(); 18 19 privatefinalIoConnector connector; 20 21 privatefinalSocketAddress remoteAddress; 22 23 publicClientToProxyIoHandler(IoConnector connector, 24 SocketAddress remoteAddress) { 25 this.connector = connector; 26 this.remoteAddress = remoteAddress; 27 connector.setHandler(connectorHandler); 28 } 29 30 @Override 31 publicvoidsessionOpened(finalIoSession session) throwsException { 32 33 connector.connect(remoteAddress).addListener(newIoFutureListener() { 34 publicvoidoperationComplete(ConnectFuture future) { 35 try{ 36 future.getSession().setAttribute(OTHER_IO_SESSION, session); 37 session.setAttribute(OTHER_IO_SESSION, future.getSession()); 38 IoSession session2 = future.getSession(); 39 session2.resumeRead(); 40 session2.resumeWrite(); 41 } catch(RuntimeIoException e) { 42 // Connect failed 43 session.close(true); 44 } finally{ 45 session.resumeRead(); 46 session.resumeWrite(); 47 } 48 } 49 }); 50 } 51 } MINA 实现的简易 HTTP 服务器 [Java]代码 01 publicclassTest { 02 publicstaticvoidmain(String[] args) throwsIOException { 03 HttpServer httpServer; 04 httpServer = HttpServer.create(newInetSocketAddress(81), 5); 05 httpServer.createContext("/", newHandler()); 06 httpServer.start(); 07 } 08 09 staticclassHandler implementsHttpHandler { 10 publicvoidhandle(HttpExchange exchange) throwsIOException { 11 Headers requestHeaders = exchange.getRequestHeaders(); 12 Headers responseHeaders = exchange.getResponseHeaders(); 13 responseHeaders.set("Content-Type", "text/plain"); 14 exchange.sendResponseHeaders(200, 0L); 15 OutputStream responseBody = newBufferedOutputStream(exchange.getResponseBody(), 64*1024); 16 responseBody.write("Hello!".getBytes()); 17 responseBody.close(); 18 exchange.close(); 19 } 20 } 21 } Java 通过 HTTP 下载文件 Download.java 001 packagecore.spider; 002 003 importjava.io.*; 004 importjava.net.*; 005 importjava.util.*; 006 007 // This class downloads a file from a URL. 008 classDownload extendsObservable implementsRunnable { 009 010 // Max size of download buffer. 011 privatestaticfinalintMAX_BUFFER_SIZE = 1024; 012 013 // These are the status names. 014 publicstaticfinalString STATUSES[] = {"Downloading", 015 "Paused", "Complete", "Cancelled", "Error"}; 016 017 // These are the status codes. 018 publicstaticfinalintDOWNLOADING = 0; 019 publicstaticfinalintPAUSED = 1; 020 publicstaticfinalintCOMPLETE = 2; 021 publicstaticfinalintCANCELLED = 3; 022 publicstaticfinalintERROR = 4; 023 024 privateURL url; // download URL 025 privateintsize; // size of download in bytes 026 privateintdownloaded; // number of bytes downloaded 027 privateintstatus; // current status of download 028 029 // Constructor for Download. 030 publicDownload(URL url) { 031 this.url = url; 032 size = -1; 033 downloaded = 0; 034 status = DOWNLOADING; 035 036 // Begin the download. 037 download(); 038 } 039 040 // Get this download's URL. 041 publicString getUrl() { 042 returnurl.toString(); 043 } 044 045 // Get this download's size. 046 publicintgetSize() { 047 returnsize; 048 } 049 050 // Get this download's progress. 051 publicfloatgetProgress() { 052 return((float) downloaded / size) * 100; 053 } 054 055 // Get this download's status. 056 publicintgetStatus() { 057 returnstatus; 058 } 059 060 // Pause this download. 061 publicvoidpause() { 062 status = PAUSED; 063 stateChanged(); 064 } 065 066 // Resume this download. 067 publicvoidresume() { 068 status = DOWNLOADING; 069 stateChanged(); 070 download(); 071 } 072 073 // Cancel this download. 074 publicvoidcancel() { 075 status = CANCELLED; 076 stateChanged(); 077 } 078 079 // Mark this download as having an error. 080 privatevoiderror() { 081 status = ERROR; 082 stateChanged(); 083 } 084 085 // Start or resume downloading. 086 privatevoiddownload() { 087 Thread thread = newThread(this); 088 thread.start(); 089 } 090 091 // Get file name portion of URL. 092 privateString getFileName(URL url) { 093 String fileName = url.getFile(); 094 returnfileName.substring(fileName.lastIndexOf('/') + 1); 095 } 096 097 // Download file. 098 publicvoidrun() { 099 RandomAccessFile file = null; 100 InputStream stream = null; 101 102 try{ 103 // Open connection to URL. 104 HttpURLConnection connection = 105 (HttpURLConnection) url.openConnection(); 106 107 // Specify what portion of file to download. 108 connection.setRequestProperty("Range", 109 "bytes="+ downloaded + "-"); 110 111 // Connect to server. 112 connection.connect(); 113 114 // Make sure response code is in the 200 range. 115 if(connection.getResponseCode() / 100!= 2) { 116 error(); 117 } 118 119 // Check for valid content length. 120 intcontentLength = connection.getContentLength(); 121 if(contentLength < 1) { 122 error(); 123 } 124 125 // Set the size for this download if it hasn't been already set. 126 if(size == -1) { 127 size = contentLength; 128 stateChanged(); 129 } 130 131 // Open file and seek to the end of it. 132 file = newRandomAccessFile(getFileName(url), "rw"); 133 file.seek(downloaded); 134 135 stream = connection.getInputStream(); 136 while(status == DOWNLOADING) { 137 // Size buffer according to how much of the file is left to download. 138 bytebuffer[]; 139 if(size - downloaded > MAX_BUFFER_SIZE) { 140 buffer = newbyte[MAX_BUFFER_SIZE]; 141 } else{ 142 buffer = newbyte[size - downloaded]; 143 } 144 145 // Read from server into buffer. 146 intread = stream.read(buffer); 147 if(read == -1) 148 break; 149 150 // Write buffer to file. 151 file.write(buffer, 0, read); 152 downloaded += read; 153 stateChanged(); 154 } 155 156 // Change status to complete if this point was reached because downloading has finished. 157 if(status == DOWNLOADING) { 158 status = COMPLETE; 159 stateChanged(); 160 } 161 } catch(Exception e) { 162 error(); 163 } finally{ 164 // Close file. 165 if(file != null) { 166 try{ 167 file.close(); 168 } catch(Exception e) {} 169 } 170 171 // Close connection to server. 172 if(stream != null) { 173 try{ 174 stream.close(); 175 } catch(Exception e) {} 176 } 177 } 178 } 179 180 // Notify observers that this download's status has changed. 181 privatevoidstateChanged() { 182 setChanged(); 183 notifyObservers(); 184 } 185 186 publicstaticvoidmain(String args[]){ 187 try{ 188 Download d = newDownload(newURL("http://www.oschina.net/img/logo.gif")); 189 d.run(); 190 } catch(MalformedURLException e) { 191 // TODO Auto-generated catch block 192 e.printStackTrace(); 193 } 194 } 195 } 使用MINA框架的转发服务器和客户端 [文件] SmsObject.java ~ 803B 下载(29) 01 publicclassSmsObject { 02 privateintreceiver; 03 privateintdata_type; 04 privateintdata_receiver; 05 privateintdata_sender; 06 privateString data; 07 08 publicintgetReceiver(){ 09 returnreceiver; 10 } 11 12 publicintgetDataType(){ 13 returndata_type; 14 } 15 16 publicintgetDataReceiver(){ 17 returndata_receiver; 18 } 19 20 publicintgetDataSender(){ 21 returndata_sender; 22 } 23 24 publicString getData(){ 25 returndata; 26 } 27 28 publicvoidsetReceiver(intreceiver){ 29 this.receiver = receiver; 30 } 31 32 publicvoidsetDataType(intdata_type){ 33 this.data_type = data_type; 34 } 35 36 publicvoidsetDataReceiver(intdata_receiver){ 37 this.data_receiver = data_receiver; 38 } 39 40 publicvoidsetDataSender(intdata_sender){ 41 this.data_sender = data_sender; 42 } 43 44 publicvoidsetData(String data){ 45 this.data = data; 46 } 47 48 } [文件] Server.java ~ 1KB 下载(19) 01 importjava.net.InetSocketAddress; 02 importjava.nio.charset.Charset; 03 04 importorg.apache.mina.core.filterchain.DefaultIoFilterChainBuilder; 05 importorg.apache.mina.core.session.IdleStatus; 06 importorg.apache.mina.filter.codec.ProtocolCodecFilter; 07 importorg.apache.mina.transport.socket.SocketAcceptor; 08 importorg.apache.mina.transport.socket.nio.NioSocketAcceptor; 09 10 /** 11 * MINA Server 12 * @author Lazy 13 */ 14 15 publicclassServer { 16 17 privatestaticintbindPort = 9999; 18 19 /** 20 * @param args 21 */ 22 publicstaticvoidmain(String[] args) throwsException{ 23 // TODO Auto-generated method stub 24 SocketAcceptor acceptor = newNioSocketAcceptor(); 25 26 //setMinReadBufferSize(), setMaxReadBufferSize() 27 acceptor.getSessionConfig().setReadBufferSize(2048); 28 acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); 29 //acceptor.getManagedSessions() 30 31 //set MinaServerFilter 32 DefaultIoFilterChainBuilder chain = acceptor.getFilterChain(); 33 chain.addLast("codec", newProtocolCodecFilter(newCmccCodecFactory(Charset.forName("UTF-8")))); 34 35 //set MinaServerHandler 36 acceptor.setHandler(newServerHandler()); 37 38 //bind port 39 try 40 { 41 acceptor.bind(newInetSocketAddress(bindPort)); 42 System.out.println("Mina Server is Listing on:= "+ bindPort); 43 } 44 catch(Exception e) 45 { 46 e.printStackTrace(); 47 } 48 49 } 50 } [文件] ServerHandler.java ~ 3KB 下载(19) 001 importjava.util.ArrayList; 002 003 importorg.apache.mina.core.service.IoHandler; 004 importorg.apache.mina.core.session.IdleStatus; 005 importorg.apache.mina.core.session.IoSession; 006 007 //import org.slf4j.Logger; 008 //import org.slf4j.LoggerFactory; 009 010 publicclassServerHandler implementsIoHandler { 011 012 //SLF4J 013 //private final static Logger log = LoggerFactory.getLogger(ServerHandler.class); 014 015 privatestaticfinalCliLocator cli = newCliLocator(); 016 017 @Override 018 publicvoidexceptionCaught(IoSession arg0, Throwable arg1) 019 throwsException { 020 // TODO Auto-generated method stub 021 022 } 023 024 @Override 025 publicvoidmessageReceived(IoSession session, Object message) throwsException { 026 // TODO Auto-generated method stub 027 028 SmsObject sms = (SmsObject) message; 029 //log.info("The message is [" + sms.getMessage() + "]"); 030 031 /* 032 * other operate 033 034 System.out.println("================================================"); 035 System.out.println("Data From : " + session.getRemoteAddress()); 036 System.out.println("Receiver : [" + sms.getReceiver() + "]"); 037 System.out.println("Data Type : [" + sms.getDataType() + "]"); 038 System.out.println("Data Receiver : [" + sms.getDataReceiver() + "]"); 039 System.out.println("Data Sender : [" + sms.getDataSender() + "]"); 040 System.out.println("Data : [" + sms.getData() + "]"); 041 System.out.println("================================================"); 042 043 * */ 044 045 //The processing of registration information 046 Integer i = new Integer(255); 047 if( i.equals(sms.getReceiver()) && 048 i.equals(sms.getDataType()) && 049 i.equals(sms.getDataReceiver()) && 050 i.equals(sms.getDataSender())) { 051 052 cli.addCli(session, sms.getData()); 053 System.out.println("Client : " + session.getRemoteAddress() + " DONE"); 054 } else { 055 //Forwarding 056 ArrayList tempList = new ArrayList(); 057 tempList = cli.getCli(sms.getReceiver()); 058 059 System.out.println("tempting=======>" + session.getRemoteAddress() + " with receiver : " + sms.getReceiver()); 060 if(tempList != null) { 061 //System.out.println("true"); 062 for (IoSession session1 : tempList){ 063 System.out.println("Send =========>" + session1.getRemoteAddress()); 064 session1.write(sms); 065 } 066 System.out.println("================================================"); 067 } 068 else System.out.println("forwarding false"); 069 } 070 071 //Trigger the client 072 sms.setReceiver(i); 073 sms.setDataType(i); 074 sms.setDataReceiver(i); 075 sms.setDataSender(i); 076 sms.setData(" "); 077 session.write(sms); 078 079 } 080 081 @Override 082 public void messageSent(IoSession arg0, Object arg1) throws Exception { 083 // TODO Auto-generated method stub 084 085 } 086 087 @Override 088 public void sessionClosed(IoSession session) throws Exception { 089 // TODO Auto-generated method stub 090 091 //delete the timeout address 092 System.out.println("Client Closed :" + session.getRemoteAddress()); 093 cli.delCli(session); 094 /* 095 * other operate 096 * */ 097 } 098 099 @Override 100 public void sessionCreated(IoSession arg0) throws Exception { 101 // TODO Auto-generated method stub 102 103 } 104 105 @Override 106 public void sessionIdle(IoSession arg0, IdleStatus arg1) throws Exception { 107 // TODO Auto-generated method stub 108 109 } 110 111 @Override 112 public void sessionOpened(IoSession session) throws Exception { 113 // TODO Auto-generated method stub 114 System.out.println("Come :" + session.getRemoteAddress()); 115 /* 116 * other operate 117 * */ 118 } 119 120 } [文件] CliLocator.java ~ 2KB 下载(27) 01 importjava.util.ArrayList; 02 importjava.util.HashMap; 03 04 importorg.apache.mina.core.session.IoSession; 05 06 publicclassCliLocator { 07 08 privatestaticHashMap> cliLocation; 09 10 publicCliLocator() { 11 cliLocation = newHashMap>(); 12 } 13 14 publicvoidaddCli(IoSession session, String s) { 15 String[] sa = s.split(" "); 16 ArrayList tempList = newArrayList(); 17 18 for(inti=0; i"); 23 try { 24 System.out.print(cliLocation.containsKey(key)); 25 } 26 catch (Exception E) { 27 System.out.println(E.toString()); 28 } 29 */ 30 if(cliLocation.containsKey(key)) { 31 if(cliLocation.get(key) != null) 32 tempList = cliLocation.get(key); 33 } else{ 34 tempList.add(session); 35 cliLocation.put(key, tempList); 36 continue; 37 } 38 tempList.add(session); 39 cliLocation.put(key, tempList); 40 } 41 } 42 43 publicArrayList getCli(intreceiver) { 44 45 //System.out.println("Getting=====>" + receiver); 46 try{ 47 if(cliLocation.containsKey(receiver)) { 48 //System.out.println("true"); 49 returncliLocation.get(receiver); 50 } 51 else{ 52 //System.out.println("false"); 53 returnnull; 54 } 55 } 56 catch(Exception E) { 57 System.out.println(E.toString()); 58 returnnull; 59 } 60 } 61 62 publicvoiddelCli(IoSession session){ 63 ArrayList tempList = newArrayList(); 64 65 for(Integer inte : cliLocation.keySet()) { 66 tempList = cliLocation.get(inte); 67 if(tempList.contains(session)) { 68 tempList.remove(session); 69 } 70 } 71 } 72 73 } [文件] CmccCodecFactory.java ~ 792B 下载(21) 01 importjava.nio.charset.Charset; 02 03 importorg.apache.mina.core.session.IoSession; 04 importorg.apache.mina.filter.codec.ProtocolCodecFactory; 05 importorg.apache.mina.filter.codec.ProtocolDecoder; 06 importorg.apache.mina.filter.codec.ProtocolEncoder; 07 08 publicclassCmccCodecFactory implementsProtocolCodecFactory{ 09 10 privatefinalCmccEncoder encoder; 11 privatefinalCmccDecoder decoder; 12 13 publicCmccCodecFactory(){ 14 this(Charset.defaultCharset()); 15 } 16 17 publicCmccCodecFactory(Charset charset){ 18 this.encoder = newCmccEncoder(charset); 19 this.decoder = newCmccDecoder(charset); 20 } 21 22 @Override 23 publicProtocolDecoder getDecoder(IoSession session) throwsException{ 24 returndecoder; 25 } 26 27 @Override 28 publicProtocolEncoder getEncoder(IoSession session) throwsException{ 29 returnencoder; 30 } 31 } [文件] CmccDecoder.java ~ 2KB 下载(18) 01 importjava.nio.charset.Charset; 02 importjava.nio.charset.CharsetDecoder; 03 04 importorg.apache.mina.core.buffer.IoBuffer; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.CumulativeProtocolDecoder; 07 importorg.apache.mina.filter.codec.ProtocolDecoderOutput; 08 09 10 publicclassCmccDecoder extendsCumulativeProtocolDecoder{ 11 12 privatefinalCharset charset; 13 14 publicCmccDecoder(Charset charset){ 15 this.charset = charset; 16 } 17 18 @Override 19 protectedbooleandoDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throwsException{ 20 IoBuffer buffer = IoBuffer.allocate(300).setAutoExpand(true); 21 CharsetDecoder cd = charset.newDecoder(); 22 23 bytehead = (byte)192; 24 byteend = (byte)193; 25 intreceiver = -1, data_type = -1, data_receiver = -1, data_sender = -1; 26 String data=""; 27 intmatchCount = 0; 28 booleanflag = false; 29 30 while(in.hasRemaining()){ 31 byteb = in.get(); 32 buffer.put(b); 33 34 if(b == head){ 35 buffer.flip(); 36 buffer.clear(); 37 matchCount = 0; 38 flag = true; 39 }elseif(b == end){ 40 if(flag){ 41 buffer.flip(); 42 //System.out.println(buffer); 43 receiver = buffer.getInt(0); 44 data_type = buffer.getInt(4); 45 data_receiver = buffer.getInt(8); 46 data_sender = buffer.getInt(12); 47 /* 48 System.out.println(buffer); 49 System.out.println(matchCount); 50 */ 51 buffer.skip(16); 52 data = buffer.getString(matchCount-16, cd); 53 /* 54 System.out.println("====================================="); 55 for (int i = 16; i < 24; i++) 56 System.out.println((char)buffer.get(i)); 57 System.out.println("====================================="); 58 System.out.println(buffer); 59 */ 60 buffer.clear(); 61 matchCount = 0; 62 break; 63 } 64 }else{ 65 matchCount++; 66 } 67 } 68 69 SmsObject smsObject = newSmsObject(); 70 smsObject.setReceiver(receiver); 71 smsObject.setDataType(data_type); 72 smsObject.setDataReceiver(data_receiver); 73 smsObject.setDataSender(data_sender); 74 smsObject.setData(data); 75 out.write(smsObject); 76 returnfalse; 77 } 78 } [文件] CmccEncoder.java ~ 1KB 下载(18) 01 importjava.nio.charset.Charset; 02 importjava.nio.charset.CharsetEncoder; 03 04 importorg.apache.mina.core.buffer.IoBuffer; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.ProtocolEncoderAdapter; 07 importorg.apache.mina.filter.codec.ProtocolEncoderOutput; 08 09 10 publicclassCmccEncoder extendsProtocolEncoderAdapter{ 11 privatefinalCharset charset; 12 13 publicCmccEncoder(Charset charset){ 14 this.charset = charset; 15 } 16 17 @Override 18 publicvoidencode(IoSession session, Object message, ProtocolEncoderOutput out) throwsException{ 19 SmsObject sms = (SmsObject)message; 20 CharsetEncoder ce = charset.newEncoder(); 21 IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true); 22 23 bytehead = (byte)192; 24 byteend = (byte)193; 25 intreceiver = sms.getReceiver(); 26 intdata_type = sms.getDataType(); 27 intdata_receiver = sms.getDataReceiver(); 28 intdata_sender = sms.getDataSender(); 29 String data = sms.getData(); 30 31 buffer.put(head); 32 buffer.putInt(receiver); 33 buffer.putInt(data_type); 34 buffer.putInt(data_receiver); 35 buffer.putInt(data_sender); 36 buffer.putString(data, ce); 37 buffer.put(end); 38 39 buffer.flip(); 40 session.write(buffer); 41 } 42 } 使用mina框架实现cmpp2.0服务端 我自己写的使用mina框架实现cmpp2.0服务端,经过一段使用解决了几个bug现在比较稳定 [文件]源代码 ~ 4KB 下载(19) 001 packagecmpp; 002 003 importorg.apache.mina.core.service.IoHandlerAdapter; 004 importorg.apache.mina.core.session.IoSession; 005 importorg.apache.mina.filter.codec.ProtocolCodecFilter; 006 importorg.apache.mina.filter.executor.ExecutorFilter; 007 importorg.apache.mina.filter.executor.OrderedThreadPoolExecutor; 008 importorg.apache.mina.integration.jmx.IoServiceMBean; 009 010 importorg.apache.mina.transport.socket.SocketAcceptor; 011 importorg.apache.mina.transport.socket.SocketConnector; 012 importorg.apache.mina.transport.socket.nio.NioSocketAcceptor; 013 014 importorg.slf4j.Logger; 015 importorg.slf4j.LoggerFactory; 016 017 importjava.io.IOException; 018 importjava.lang.management.ManagementFactory; 019 importjava.net.InetSocketAddress; 020 021 importjava.util.concurrent.ThreadFactory; 022 importjava.util.concurrent.TimeUnit; 023 importjava.util.concurrent.atomic.AtomicInteger; 024 025 importjavax.management.InstanceAlreadyExistsException; 026 importjavax.management.MBeanRegistrationException; 027 importjavax.management.MBeanServer; 028 importjavax.management.MalformedObjectNameException; 029 importjavax.management.NotCompliantMBeanException; 030 importjavax.management.ObjectName; 031 032 /** 033 * TODO : Add documentation 034 * 035 * @author Apache MINA Project 036 * 037 */ 038 publicclassMinaCmpp extendsIoHandlerAdapter { 039 privatestaticfinalLogger logger = LoggerFactory 040 .getLogger(MinaCmpp.class); 041 042 publicstaticfinalintMSG_SIZE = 5000; 043 publicstaticfinalintMSG_COUNT = 10; 044 privatestaticfinalintPORT = 7890; 045 privatestaticfinalintBUFFER_SIZE = 8192; 046 047 publicstaticfinalString OPEN = "open"; 048 049 publicSocketAcceptor acceptor; 050 publicSocketConnector connector; 051 052 privatefinalObject LOCK = newObject(); 053 054 privatestaticfinalThreadFactory THREAD_FACTORY = newThreadFactory() { 055 publicThread newThread(finalRunnable r) { 056 returnnewThread(null, r, "MinaThread", 64* 1024); 057 } 058 }; 059 060 privateOrderedThreadPoolExecutor executor; 061 062 publicstaticAtomicInteger sent = newAtomicInteger(0); 063 064 publicMinaCmpp() throwsIOException { 065 executor = newOrderedThreadPoolExecutor(0, 1000, 60, TimeUnit.SECONDS, 066 THREAD_FACTORY); 067 068 acceptor = newNioSocketAcceptor(Runtime.getRuntime() 069 .availableProcessors() + 1); 070 acceptor.setReuseAddress(true); 071 acceptor.getSessionConfig().setReceiveBufferSize(BUFFER_SIZE); 072 073 acceptor.getFilterChain().addLast("threadPool", 074 newExecutorFilter(executor)); 075 acceptor.getFilterChain().addLast("codec", 076 newProtocolCodecFilter(newCmppProtocolCodecFactory())); 077 078 MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); 079 IoServiceMBean acceptorMBean = newIoServiceMBean(acceptor); 080 ObjectName acceptorName; 081 try{ 082 acceptorName = newObjectName(acceptor.getClass().getPackage() 083 .getName() 084 + ":type=acceptor,name=" 085 + acceptor.getClass().getSimpleName()); 086 mBeanServer.registerMBean(acceptorMBean, acceptorName); 087 088 // MBeanServer mBeanServer = 089 // ManagementFactory.getPlatformMBeanServer(); 090 // mBeanServer.registerMBean(new IoSessionMBean(session), 091 // new ObjectName(session.getClass() 092 // .getPackage().getName() 093 // + ":type=session,name=" + session.getClass().getSimpleName())); 094 095 } catch(MalformedObjectNameException e) { 096 // TODO Auto-generated catch block 097 e.printStackTrace(); 098 } catch(NullPointerException e) { 099 // TODO Auto-generated catch block 100 e.printStackTrace(); 101 } catch(InstanceAlreadyExistsException e) { 102 // TODO Auto-generated catch block 103 e.printStackTrace(); 104 } catch(MBeanRegistrationException e) { 105 // TODO Auto-generated catch block 106 e.printStackTrace(); 107 } catch(NotCompliantMBeanException e) { 108 // TODO Auto-generated catch block 109 e.printStackTrace(); 110 } 111 112 } 113 114 publicvoidstart() throwsException { 115 finalInetSocketAddress socketAddress = newInetSocketAddress( 116 "0.0.0.0", PORT); 117 acceptor.setHandler(newCmppIoHandler(LOCK)); 118 acceptor.bind(socketAddress); 119 logger.info("MinaCmpp�������������˿���"+ PORT); 120 } 121 122 @Override 123 publicvoidexceptionCaught(IoSession session, Throwable cause) { 124 if(!(cause instanceofIOException)) { 125 logger.error("Exception: ", cause); 126 } else{ 127 logger.info("I/O error: "+ cause.getMessage()); 128 } 129 session.close(true); 130 } 131 132 publicstaticvoidmain(String[] args) throwsException { 133 newMinaCmpp().start(); 134 } 135 } [文件]源代码 ~ 2KB 下载(16) 01 packagecmpp; 02 03 04 05 importorg.apache.mina.core.session.IoSession; 06 importorg.slf4j.Logger; 07 importorg.slf4j.LoggerFactory; 08 09 importcmpp.pdu.ActiveTest; 10 11 publicclassActiveThread extendsThread { 12 privateIoSession session = null; 13 privatestaticfinalLogger logger = LoggerFactory 14 .getLogger(ActiveThread.class); 15 privatelongheartbeatInterval = 60000; 16 privatelongheartbeatRetry = 3; 17 privatelongreconnectInterval = 10000; 18 publicstaticlonglastActiveTime = 0; 19 privatelonglastCheckTime = 0; 20 21 publicActiveThread(IoSession s) { 22 setDaemon(true); 23 this.session = s; 24 lastCheckTime = System.currentTimeMillis(); 25 lastActiveTime = System.currentTimeMillis(); 26 } 27 28 publicvoidrun() { 29 try{ 30 while(session.isConnected()) { 31 longcurrentTime = System.currentTimeMillis(); 32 if((currentTime - lastCheckTime) > heartbeatInterval) { 33 logger.info("CmppSession.checkConnection"); 34 if((currentTime - lastActiveTime) < (heartbeatInterval * heartbeatRetry)) { 35 logger.info("send ActiveTest"); 36 lastCheckTime = currentTime; 37 ActiveTest activeTest = newActiveTest(); 38 activeTest.assignSequenceNumber(); 39 activeTest.timeStamp = currentTime; 40 session.write(activeTest); 41 } else{ 42 logger.info("connection lost!"); 43 session.close(true); 44 break; 45 } 46 } 47 try{ 48 Thread.sleep(reconnectInterval); 49 } catch(InterruptedException e) { 50 // 51 } 52 } 53 } catch(Exception e) { 54 e.printStackTrace(); 55 } 56 } 57 } [文件]源代码 ~ 909B 下载(14) 01 packagecmpp; 02 03 importorg.apache.mina.core.buffer.IoBuffer; 04 importorg.apache.mina.core.session.AttributeKey; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.ProtocolCodecFactory; 07 importorg.apache.mina.filter.codec.ProtocolDecoder; 08 importorg.apache.mina.filter.codec.ProtocolDecoderOutput; 09 importorg.apache.mina.filter.codec.ProtocolEncoder; 10 11 /** 12 * TODO: Document me 13 * @author Apache MINA Project 14 * 15 */ 16 publicclassCmppProtocolCodecFactory implementsProtocolCodecFactory { 17 privateProtocolDecoder decoder = newCmppRequestDecoder(); 18 privateProtocolEncoder encoder = newCmppResponseEncoder(); 19 20 publicProtocolDecoder getDecoder(IoSession sessionIn) throwsException { 21 returndecoder; 22 } 23 24 publicProtocolEncoder getEncoder(IoSession sessionIn) throwsException { 25 returnencoder; 26 } 27 } [文件]源代码 ~ 862B 下载(12) 01 packagecmpp; 02 03 importorg.apache.mina.core.buffer.IoBuffer; 04 importorg.apache.mina.core.session.IoSession; 05 importorg.apache.mina.filter.codec.ProtocolEncoder; 06 importorg.apache.mina.filter.codec.ProtocolEncoderOutput; 07 08 importcmpp.pdu.CmppPDU; 09 10 /** 11 * TODO: Document me ! 12 * 13 * @author Apache MINA Project 14 */ 15 publicclassCmppResponseEncoder implementsProtocolEncoder { 16 publicvoidencode(IoSession session, Object message, 17 ProtocolEncoderOutput out) throwsException { 18 CmppPDU pdu = (CmppPDU) message; 19 byte[] bytes = pdu.getData().getBuffer(); 20 IoBuffer buf = IoBuffer.allocate(bytes.length, false); 21 22 buf.setAutoExpand(true); 23 // buf.putInt(bytes.length); 24 buf.put(bytes); 25 26 buf.flip(); 27 out.write(buf); 28 29 } 30 31 publicvoiddispose(IoSession session) throwsException { 32 33 } 34 } [文件]源代码 ~ 2KB 下载(16) 01 packagecmpp; 02 03 04 importorg.apache.mina.core.buffer.IoBuffer; 05 importorg.apache.mina.core.session.IoSession; 06 importorg.apache.mina.filter.codec.CumulativeProtocolDecoder; 07 importorg.apache.mina.filter.codec.ProtocolDecoderOutput; 08 importorg.slf4j.Logger; 09 importorg.slf4j.LoggerFactory; 10 11 importcmpp.pdu.CmppPDU; 12 importcmpp.pdu.CmppPDUParser; 13 importcmpp.sms.ByteBuffer; 14 15 16 /** 17 * TODO: Document me ! 18 * 19 * @author Apache MINA Project 20 */ 21 publicclassCmppRequestDecoder extendsCumulativeProtocolDecoder { 22 privatestaticfinalLogger logger = LoggerFactory 23 .getLogger(CmppRequestDecoder.class); 24 25 @Override 26 protectedbooleandoDecode(finalIoSession session, IoBuffer in, 27 ProtocolDecoderOutput out) throwsException { 28 29 // �������¼�������� 30 // 1. һ��ip����ֻ��һ��������Ϣ 31 // 2. һ��ip���а�һ��������Ϣ����һ����Ϣ��һ���� 32 // 3. һ��ip���а�һ����Ϣ��һ���� 33 // 4. һ��ip���а���������������Ϣ���ࣨѭ�������ڸ����decode�У� 34 35 if(in.remaining() > 4) { 36 logger.info("resv msg "+ in.toString()); 37 in.mark(); 38 intlength = in.getInt(); 39 logger.info("length="+ length + ",in.limit="+ in.limit()+",in.remaining="+in.remaining()); 40 if(length > (in.remaining()+4)) 41 { 42 in.rewind(); 43 returnfalse; 44 } 45 46 byte[] bytedata = newbyte[length-4]; 47 in.get(bytedata); 48 ByteBuffer buffer = newByteBuffer(); 49 buffer.appendInt(length); 50 buffer.appendBytes(bytedata); 51 CmppPDU pdu = CmppPDUParser.createPDUFromBuffer(buffer); 52 if(pdu == null) returnfalse; 53 logger.info(pdu.dump()); 54 out.write(pdu); 55 returntrue; 56 57 } 58 59 returnfalse; 60 } 61 } [文件] CmppIoHandler.java ~ 6KB 下载(14) 001 packagecmpp; 002 003 importorg.apache.mina.core.buffer.IoBuffer; 004 importorg.apache.mina.core.service.IoHandlerAdapter; 005 importorg.apache.mina.core.session.IoSession; 006 importorg.apache.mina.integration.jmx.IoSessionMBean; 007 importorg.slf4j.Logger; 008 importorg.slf4j.LoggerFactory; 009 010 importcmpp.CmppConstant; 011 012 importcmpp.ActiveThread; 013 importcmpp.pdu.CmppPDU; 014 importcmpp.sms.ByteBuffer; 015 importcmpp.sms.ShortMessage; 016 importstaticcmpp.MinaCmpp.MSG_COUNT; 017 importstaticcmpp.MinaCmpp.OPEN; 018 019 importjava.io.IOException; 020 importjava.lang.management.ManagementFactory; 021 022 importjava.text.SimpleDateFormat; 023 importjava.util.Date; 024 importjava.util.concurrent.ExecutorService; 025 importjava.util.concurrent.Executors; 026 importjava.util.concurrent.atomic.AtomicInteger; 027 028 importjavax.management.MBeanServer; 029 importjavax.management.ObjectName; 030 031 /** 032 * TODO: Document me ! 033 * 034 * @author Apache MINA Project 035 * 036 */ 037 publicclassCmppIoHandler extendsIoHandlerAdapter { 038 privatestaticfinalLogger logger = LoggerFactory 039 .getLogger(CmppIoHandler.class); 040 publicstaticAtomicInteger received = newAtomicInteger(0); 041 publicstaticAtomicInteger closed = newAtomicInteger(0); 042 privatefinalObject LOCK; 043 publicstaticbooleanConnect = false; 044 publicstaticbooleanFirstmsg = true; 045 privateExecutorService exec = Executors.newSingleThreadExecutor(); 046 publicCmppIoHandler(Object lock) { 047 LOCK = lock; 048 } 049 050 @Override 051 publicvoidexceptionCaught(IoSession session, Throwable cause) { 052 if(!(cause instanceofIOException)) { 053 logger.error("Exception: ", cause); 054 } else{ 055 logger.info("I/O error: "+ cause.getMessage()); 056 } 057 session.close(true); 058 } 059 060 @Override 061 publicvoidsessionOpened(IoSession session) throwsException { 062 logger.info("Session "+ session.getId() + " is opened"); 063 064 // ����ActivePDU-Thread 065 // ExecutorService exec = Executors.newSingleThreadExecutor(); 066 // exec.execute(new ActiveThread(session)); 067 Thread t = newThread(newActiveThread(session)); 068 t.setDaemon(true); 069 t.start(); 070 session.resumeRead(); 071 072 } 073 074 @Override 075 publicvoidsessionCreated(IoSession session) throwsException { 076 logger.info("Creation of session "+ session.getId()); 077 session.setAttribute(OPEN); 078 session.suspendRead(); 079 080 } 081 082 @Override 083 publicvoidsessionClosed(IoSession session) throwsException { 084 session.removeAttribute(OPEN); 085 logger.info("{}> Session closed", session.getId()); 086 finalintclsd = closed.incrementAndGet(); 087 088 if(clsd == MSG_COUNT) { 089 synchronized(LOCK) { 090 LOCK.notifyAll(); 091 } 092 } 093 } 094 095 @Override 096 publicvoidmessageReceived(IoSession session, Object message) 097 throwsException { 098 CmppPDU pdu = (CmppPDU) message; 099 logger.info("MESSAGE: "+ pdu.header.getCommandId() + ":" 100 + pdu.header.getSequenceNumber() + "on session " 101 + session.getId()); 102 finalintrec = received.incrementAndGet(); 103 if(Firstmsg == true|| Connect == true) { 104 Firstmsg = false; 105 switch(pdu.header.getCommandId()) { 106 caseCmppConstant.CMD_CONNECT: 107 cmpp.pdu.Connect con = (cmpp.pdu.Connect) pdu; 108 cmpp.pdu.ConnectResp conresp = (cmpp.pdu.ConnectResp) con 109 .getResponse(); 110 session.write(conresp); 111 logger.info("conresp:"+ pdu.header.getSequenceNumber() 112 + " on session "+ session.getId()); 113 Connect = true; 114 // Thread t2 = new Thread(new SendMoThread(session)); 115 // t2.setDaemon(true); 116 // t2.start(); 117 exec.execute(newSendMoThread(session)); 118 break; 119 caseCmppConstant.CMD_ACTIVE_TEST: 120 cmpp.pdu.ActiveTest activeTest = (cmpp.pdu.ActiveTest) pdu; 121 cmpp.pdu.ActiveTestResp activeTestResp = (cmpp.pdu.ActiveTestResp) activeTest 122 .getResponse(); 123 session.write(activeTestResp); 124 logger.info("active_test:"+ pdu.header.getSequenceNumber() 125 + " on session "+ session.getId()); 126 break; 127 caseCmppConstant.CMD_ACTIVE_TEST_RESP: 128 cmpp.pdu.ActiveTestResp activeTestRsp = (cmpp.pdu.ActiveTestResp) pdu; 129 pdu.dump(); 130 logger.info("activeTestRsp:"+ pdu.header.getSequenceNumber() 131 + " on session "+ session.getId()); 132 ActiveThread.lastActiveTime = System.currentTimeMillis(); 133 break; 134 caseCmppConstant.CMD_SUBMIT: 135 cmpp.pdu.Submit submit = (cmpp.pdu.Submit) pdu; 136 submit.dump(); 137 cmpp.pdu.SubmitResp subresp = (cmpp.pdu.SubmitResp) submit 138 .getResponse(); 139 subresp.setMsgId(cmpp.pdu.Tools.GetRspid()); 140 session.write(subresp); 141 logger.info("subresp:"+ pdu.header.getSequenceNumber() 142 + " on session "+ session.getId()); 143 // ����״̬���� 144 cmpp.pdu.Deliver deliver = sendMsgStat(submit); 145 session.write(deliver); 146 break; 147 caseCmppConstant.CMD_DELIVER_RESP: 148 cmpp.pdu.DeliverResp delresp = (cmpp.pdu.DeliverResp) pdu; 149 delresp.dump(); 150 logger.info("DeliverResp:"+ pdu.header.getSequenceNumber() 151 + " on session "+ session.getId()); 152 break; 153 default: 154 logger.warn("Unexpected PDU received! PDU Header: " 155 + pdu.header.getData().getHexDump()); 156 break; 157 } 158 } 159 if(rec == MSG_COUNT) { 160 synchronized(LOCK) { 161 LOCK.notifyAll(); 162 } 163 } 164 165 // session.close(true); 166 } 167 168 privatecmpp.pdu.Deliver sendMsgStat(cmpp.pdu.Submit submit) { 169 // TODO Auto-generated method stub 170 cmpp.pdu.Deliver delive = newcmpp.pdu.Deliver(); 171 delive.setMsgId(cmpp.pdu.Tools.GetMsgid()); 172 delive.setDstId(submit.getMsgSrc()); 173 delive.setServiceId(submit.getServiceId()); 174 delive.setSrcTermId(submit.getDestTermId()[0]); 175 delive.setIsReport((byte) 1); 176 delive.assignSequenceNumber(); 177 ShortMessage sm = newShortMessage(); 178 ByteBuffer messageData = newByteBuffer(); 179 messageData.appendBytes(submit.getMsgId(), 8); 180 messageData.appendString("0", 7); 181 SimpleDateFormat sdf = newSimpleDateFormat("yyMMddHHmm"); 182 messageData.appendString(sdf.format(newDate()), 10); 183 messageData.appendString(sdf.format(newDate()), 10); 184 messageData.appendString(submit.getDestTermId()[0], 32); 185 messageData.appendInt(30); 186 sm.setMessage(messageData.getBuffer(), (byte) 04); 187 delive.setSm(sm); 188 delive.setLinkId(submit.getLinkId()); 189 returndelive; 190 } 191 } [文件] CmppConstant.java ~ 4KB 下载(15) 001 packagecmpp; 002 003 004 publicclassCmppConstant { 005 publicstaticfinalbyteCOMMAND_NAME_LENGTH=12; 006 publicstaticfinalbyteTRANSMITTER = (byte) 0x00; 007 publicstaticfinalbyteRECEIVER = (byte) 0x01; 008 publicstaticfinalbyteTRANSCEIVER = (byte) 0x02; 009 010 publicstaticfinalintCONNECTION_CLOSED = 0; 011 publicstaticfinalintCONNECTION_OPENED = 1; 012 013 publicstaticfinallongACCEPT_TIMEOUT = 60000; 014 publicstaticfinallongRECEIVER_TIMEOUT = 60000; 015 publicstaticfinallongCOMMS_TIMEOUT = 60000; 016 publicstaticfinallongQUEUE_TIMEOUT = 60000; 017 publicstaticfinallongRECEIVE_BLOCKING = 0; 018 publicstaticfinallongCONNECTION_RECEIVE_TIMEOUT = 0; 019 publicstaticintPDU_HEADER_SIZE = 12; 020 021 //CMPP Command Set 022 publicstaticfinalintCMD_CONNECT = 0x00000001; 023 publicstaticfinalintCMD_CONNECT_RESP = 0x80000001; 024 publicstaticfinalintCMD_TERMINATE = 0x00000002; 025 publicstaticfinalintCMD_TERMINATE_RESP = 0x80000002; 026 publicstaticfinalintCMD_SUBMIT = 0x00000004; 027 publicstaticfinalintCMD_SUBMIT_RESP = 0x80000004; 028 publicstaticfinalintCMD_DELIVER = 0x00000005; 029 publicstaticfinalintCMD_DELIVER_RESP = 0x80000005; 030 publicstaticfinalintCMD_QUERY = 0x00000006; 031 publicstaticfinalintCMD_QUERY_RESP = 0x80000006; 032 publicstaticfinalintCMD_CANCEL = 0x00000007; 033 publicstaticfinalintCMD_CANCEL_RESP = 0x80000007; 034 publicstaticfinalintCMD_ACTIVE_TEST = 0x00000008; 035 publicstaticfinalintCMD_ACTIVE_TEST_RESP = 0x80000008; 036 037 038 039 040 //Command_Status Error Codes 041 publicstaticfinalintESME_ROK = 0x00000000; 042 043 //Interface_Version 044 publicstaticfinalbyteVERSION = 0x20; 045 046 //Address_TON 047 publicstaticfinalbyteGSM_TON_UNKNOWN = 0x00; 048 publicstaticfinalbyteGSM_TON_INTERNATIONAL = 0x01; 049 publicstaticfinalbyteGSM_TON_NATIONAL = 0x02; 050 publicstaticfinalbyteGSM_TON_NETWORK = 0x03; 051 publicstaticfinalbyteGSM_TON_SUBSCRIBER = 0x04; 052 publicstaticfinalbyteGSM_TON_ALPHANUMERIC = 0x05; 053 publicstaticfinalbyteGSM_TON_ABBREVIATED = 0x06; 054 publicstaticfinalbyteGSM_TON_RESERVED_EXTN = 0x07; 055 056 //Address_NPI 057 publicstaticfinalbyteGSM_NPI_UNKNOWN = 0x00; 058 publicstaticfinalbyteGSM_NPI_E164 = 0x01; 059 publicstaticfinalbyteGSM_NPI_ISDN = GSM_NPI_E164; 060 publicstaticfinalbyteGSM_NPI_X121 = 0x03; 061 publicstaticfinalbyteGSM_NPI_TELEX = 0x04; 062 publicstaticfinalbyteGSM_NPI_LAND_MOBILE = 0x06; 063 publicstaticfinalbyteGSM_NPI_NATIONAL = 0x08; 064 publicstaticfinalbyteGSM_NPI_PRIVATE = 0x09; 065 publicstaticfinalbyteGSM_NPI_ERMES = 0x0A; 066 publicstaticfinalbyteGSM_NPI_INTERNET = 0x0E; 067 publicstaticfinalbyteGSM_NPI_WAP_CLIENT_ID = 0x12; 068 publicstaticfinalbyteGSM_NPI_RESERVED_EXTN = 0x0F; 069 070 //Port Value 071 publicstaticfinalintMIN_VALUE_PORT = 1024; 072 publicstaticfinalintMAX_VALUE_PORT = 65535; 073 074 //Address Length 075 publicstaticfinalintMIN_LENGTH_ADDRESS = 7; 076 077 // list of character encodings 078 // see http://java.sun.com/j2se/1.3/docs/guide/intl/encoding.doc.html 079 // from rt.jar 080 081 // American Standard Code for Information Interchange 082 publicstaticfinalString ENC_ASCII = "ASCII"; 083 // Windows Latin-1 084 publicstaticfinalString ENC_CP1252 = "Cp1252"; 085 // ISO 8859-1, Latin alphabet No. 1 086 publicstaticfinalString ENC_ISO8859_1 = "ISO8859_1"; 087 // Sixteen-bit Unicode Transformation Format, big-endian byte order 088 // with byte-order mark 089 publicstaticfinalString ENC_UTF16_BEM = "UnicodeBig"; 090 // Sixteen-bit Unicode Transformation Format, big-endian byte order 091 publicstaticfinalString ENC_UTF16_BE = "UnicodeBigUnmarked"; 092 // Sixteen-bit Unicode Transformation Format, little-endian byte order 093 // with byte-order mark 094 publicstaticfinalString ENC_UTF16_LEM = "UnicodeLittle"; 095 // Sixteen-bit Unicode Transformation Format, little-endian byte order 096 publicstaticfinalString ENC_UTF16_LE = "UnicodeLittleUnmarked"; 097 // Eight-bit Unicode Transformation Format 098 publicstaticfinalString ENC_UTF8 = "UTF8"; 099 // Sixteen-bit Unicode Transformation Format, byte order specified by 100 // a mandatory initial byte-order mark 101 publicstaticfinalString ENC_UTF16 = "UTF-16"; 102 103 } [文件] CmppPDUParser.java ~ 3KB 下载(6) 001 /* 002 * Created on 2005-5-30 003 * 004 * TODO To change the template for this generated file go to 005 * Window - Preferences - Java - Code Style - Code Templates 006 */ 007 package cmpp.pdu; 008 009 import cmpp.CmppConstant; 010 import cmpp.sms.ByteBuffer; 011 import cmpp.sms.PDUException; 012 import cmpp.sms.SmsObject; 013 014 /** 015 * @author intermax 016 * 017 * TODO To change the template for this generated type comment go to 018 * Window - Preferences - Java - Code Style - Code Templates 019 */ 020 publicclassCmppPDUParser extendsSmsObject { 021 022 publicstaticCmppPDU createPDUFromBuffer(ByteBuffer buffer) { 023 CmppPDU pdu = null; 024 CmppPDUHeader pduHeader = newCmppPDUHeader(); 025 try{ 026 pduHeader.setData(buffer); 027 switch(pduHeader.getCommandId()) { 028 caseCmppConstant.CMD_SUBMIT_RESP: 029 SubmitResp submitResp = newSubmitResp(); 030 submitResp.header = pduHeader; 031 submitResp.setBody(buffer); 032 pdu = submitResp; 033 break; 034 caseCmppConstant.CMD_DELIVER: 035 Deliver deliver = newDeliver(); 036 deliver.header = pduHeader; 037 deliver.setBody(buffer); 038 pdu = deliver; 039 break; 040 caseCmppConstant.CMD_ACTIVE_TEST: 041 ActiveTest activeTest = newActiveTest(); 042 activeTest.header = pduHeader; 043 pdu = activeTest; 044 break; 045 caseCmppConstant.CMD_ACTIVE_TEST_RESP: 046 ActiveTestResp activeTestResp = newActiveTestResp(); 047 activeTestResp.header = pduHeader; 048 pdu = activeTestResp; 049 break; 050 caseCmppConstant.CMD_DELIVER_RESP: 051 DeliverResp deliverResp = newDeliverResp(); 052 deliverResp.header = pduHeader; 053 deliverResp.setBody(buffer); 054 pdu = deliverResp; 055 break; 056 caseCmppConstant.CMD_SUBMIT: 057 Submit submit = newSubmit(); 058 submit.header = pduHeader; 059 submit.setBody(buffer); 060 pdu = submit; 061 break; 062 caseCmppConstant.CMD_QUERY: 063 Query query = newQuery(); 064 query.header = pduHeader; 065 query.setBody(buffer); 066 pdu = query; 067 break; 068 caseCmppConstant.CMD_QUERY_RESP: 069 QueryResp queryResp = newQueryResp(); 070 queryResp.header = pduHeader; 071 queryResp.setBody(buffer); 072 pdu = queryResp; 073 break; 074 caseCmppConstant.CMD_CONNECT: 075 Connect login = newConnect(); 076 login.header = pduHeader; 077 login.setBody(buffer); 078 pdu = login; 079 break; 080 caseCmppConstant.CMD_CONNECT_RESP: 081 ConnectResp loginResp = newConnectResp(); 082 loginResp.header = pduHeader; 083 loginResp.setBody(buffer); 084 pdu = loginResp; 085 break; 086 caseCmppConstant.CMD_CANCEL: 087 Cancel cancel = newCancel(); 088 cancel.header = pduHeader; 089 cancel.setBody(buffer); 090 pdu = cancel; 091 break; 092 caseCmppConstant.CMD_CANCEL_RESP: 093 CancelResp cancelResp = newCancelResp(); 094 cancelResp.header = pduHeader; 095 cancelResp.setBody(buffer); 096 pdu = cancelResp; 097 break; 098 default: 099 logger.error("Unknown Command! PDU Header: "+ pduHeader.getData().getHexDump()); 100 break; 101 } 102 } catch(PDUException e) { 103 logger.error("Error parsing PDU: ", e); 104 } 105 returnpdu; 106 } 107 } [文件] CmppPDU.java ~ 3KB 下载(7) 001 /* 002 * Created on 2005-5-7 003 * 004 * TODO To change the template for this generated file go to 005 * Window - Preferences - Java - Code Style - Code Templates 006 */ 007 package cmpp.pdu; 008 009 import cmpp.sms.PDU; 010 011 012 /** 013 * @author lucien 014 * 015 * TODO To change the template for this generated type comment go to Window - 016 * Preferences - Java - Code Style - Code Templates 017 */ 018 public abstract class CmppPDU extends PDU { 019 020 private static int sequenceNumber = 0; 021 022 private boolean sequenceNumberChanged = false; 023 024 public CmppPDUHeader header = null; 025 026 public CmppPDU() { 027 header = new CmppPDUHeader(); 028 } 029 030 public CmppPDU(int commandId) { 031 header = new CmppPDUHeader(); 032 header.setCommandId(commandId); 033 } 034 035 /** Checks if the header field is null and if not, creates it. */ 036 privatevoidcheckHeader() { 037 if(header == null) { 038 header = newCmppPDUHeader(); 039 } 040 } 041 042 publicintgetCommandLength() { 043 checkHeader(); 044 returnheader.getCommandLength(); 045 } 046 047 publicintgetCommandId() { 048 checkHeader(); 049 returnheader.getCommandId(); 050 } 051 052 publicintgetSequenceNumber() { 053 checkHeader(); 054 returnheader.getSequenceNumber(); 055 } 056 057 publicvoidsetCommandLength(intcmdLen) { 058 checkHeader(); 059 header.setCommandLength(cmdLen); 060 } 061 062 publicvoidsetCommandId(intcmdId) { 063 checkHeader(); 064 header.setCommandId(cmdId); 065 } 066 067 publicvoidsetSequenceNumber(intseqNr) { 068 checkHeader(); 069 header.setSequenceNumber(seqNr); 070 } 071 072 publicvoidassignSequenceNumber() { 073 assignSequenceNumber(false); 074 } 075 076 publicvoidassignSequenceNumber(booleanalways) { 077 if((!sequenceNumberChanged) || always) { 078 synchronized(this) { 079 setSequenceNumber(++sequenceNumber); 080 } 081 sequenceNumberChanged = true; 082 } 083 } 084 085 publicvoidresetSequenceNumber() { 086 setSequenceNumber(0); 087 sequenceNumberChanged = false; 088 } 089 090 publicbooleanequals(Object object) { 091 if((object != null) && (object instanceofCmppPDU)) { 092 CmppPDU pdu = (CmppPDU) object; 093 returnpdu.getSequenceNumber() == getSequenceNumber(); 094 } else{ 095 returnfalse; 096 } 097 } 098 099 publicString getSequenceNumberAsString() { 100 intdata = header.getSequenceNumber(); 101 byte[] intBuf = newbyte[4]; 102 intBuf[3] = (byte) (data & 0xff); 103 intBuf[2] = (byte) ((data >>> 8) & 0xff); 104 intBuf[1] = (byte) ((data >>> 16) & 0xff); 105 intBuf[0] = (byte) ((data >>> 24) & 0xff); 106 returnnewString(intBuf); 107 } 108 109 publicabstractbooleanisRequest(); 110 111 publicabstractbooleanisResponse(); 112 113 publicString dump() { 114 returnname() + " dump() unimplemented"; 115 } 116 117 118 } [文件] CmppPDUHeader.java ~ 2KB 下载(7) view source print? 01 /* 02 * Created on 2005-5-7 03 * 04 * TODO To change the template for this generated file go to 05 * Window - Preferences - Java - Code Style - Code Templates 06 */ 07 package cmpp.pdu; 08 09 10 import cmpp.CmppConstant; 11 import cmpp.sms.ByteBuffer; 12 import cmpp.sms.ByteData; 13 import cmpp.sms.NotEnoughDataInByteBufferException; 14 import cmpp.sms.PDUException; 15 16 17 /** 18 * @author lucien 19 * 20 * TODO To change the template for this generated type comment go to 21 * Window - Preferences - Java - Code Style - Code Templates 22 */ 23 publicclassCmppPDUHeader extendsByteData { 24 privateintcommandLength = CmppConstant.PDU_HEADER_SIZE; 25 privateintcommandId = 0; 26 privateintsequenceNumber = 0; 27 28 publicByteBuffer getData() { 29 ByteBuffer buffer = newByteBuffer(); 30 buffer.appendInt(getCommandLength()); 31 buffer.appendInt(getCommandId()); 32 buffer.appendInt(getSequenceNumber()); 33 returnbuffer; 34 } 35 36 publicvoidsetData(ByteBuffer buffer) throwsPDUException { 37 try{ 38 commandLength = buffer.removeInt(); 39 commandId = buffer.removeInt(); 40 sequenceNumber = buffer.removeInt(); 41 } catch(NotEnoughDataInByteBufferException e) { 42 thrownewPDUException(e); 43 } 44 } 45 46 publicintgetCommandLength() { 47 returncommandLength; 48 } 49 50 publicintgetCommandId() { 51 returncommandId; 52 } 53 54 publicintgetSequenceNumber() { 55 returnsequenceNumber; 56 } 57 58 publicvoidsetCommandLength(intcmdLen) { 59 commandLength = cmdLen; 60 } 61 62 publicvoidsetCommandId(intcmdId) { 63 commandId = cmdId; 64 } 65 66 publicvoidsetSequenceNumber(intseqNr) { 67 sequenceNumber = seqNr; 68 } 69 70 }