mina中的aceptor模式实现

lhwyflyl01 贡献于2014-06-21

作者 李红  创建于2012-02-12 09:38:00   修改者李红  修改于2012-02-12 09:40:00字数7204

文档摘要:Reactor模式实现不同,mina中采用了Multiple Reactor的方式,由NioSocketAcceptor和IoProcessor分别承担多个Reactor的职责。NioSocketAcceptor和NioProcessor使用不同selector,能够更加充分的榨取服务器的性能。
关键词:

mina中的aceptor模式实现参考了Doug Lea 在《Scalable IO in Java》中的reactor。     从上面来两个图可以看出:与传统的单个Reactor模式实现不同,mina中采用了Multiple Reactor的方式,由NioSocketAcceptor和IoProcessor分别承担多个Reactor的职责。NioSocketAcceptor和NioProcessor使用不同selector,能够更加充分的榨取服务器的性能。 acctptor主要负责 1. 绑定一个/多个端口,开始监听 2. 处理客户端的建链请求 3. 关闭一个/多个监听端口 processor主要负责 1. 接受客户端发送的数据,并转发给业务逻辑成处理 2. 发送数据到客户端 首先看一个最简单的mina服务端程序, SocketAcceptor acceptor = new NioSocketAcceptor(); // 设定一个事件处理器 acceptor.setHandler(new EchoProtocolHandler()); // 绑定一个监听端口 acceptor.bind(new InetSocketAddress(PORT)); 就这3句话一个服务端程序就ok了,让我们看看,mina在背后做了点什么。 1、NioSocketAcceptor的初始化 1.1 在NioSocketAcceptor的构造函数中,把NioProcessor也构造出来了 protected AbstractPollingIoAcceptor(IoSessionConfig sessionConfig, Class> processorClass) { this(sessionConfig, null, new SimpleIoProcessorPool(processorClass), true); } 1.2 SimpleIoProcessorPool是一个NioProcessor池,默认大小是cpu个数+1,这样能够充分利用多核的cpu private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1; public SimpleIoProcessorPool(Class> processorType) { this(processorType, null, DEFAULT_SIZE); } 1.3 调用init方法,准备selector protected void init() throws Exception { selector = Selector.open(); } 到此,生产线已经装配好,就等按下开关,就可以正常运行了。 2.绑定端口 2.1 在调用NioSocketAcceptor.bind()函数的时候最终调用AbstractPollingIoAcceptor.bind0,这是NioSocketAcceptor的关键所在 protected final Set bind0( List localAddresses) throws Exception { /*2.1.1registerQueue是一个待监听动作列表,每次要新绑定一个端口,增加一个register动作,在工作者线程中处理*/ AcceptorOperationFuture request = new AcceptorOperationFuture( localAddresses); registerQueue.add(request); /*2.1.2 创建并启动工作者线程*/ startupWorker(); wakeup(); //堵塞直至监听成功 request.awaitUninterruptibly(); //以下省略 } 2.1.1 通过实现生产者/消费者问题来处理绑定端口开始监听动作,其中生产者是bind动作,消费者在Work.run方法中,registQueue是消息队列 2.1.2 在线程池中启动工作者线程 2.1.3 堵塞直至绑定监听端口成功 public class DefaultIoFuture implements IoFuture { public IoFuture awaitUninterruptibly() { synchronized (lock) { while (!ready) {//当ready为true时候,跳出循环 waiters++; try { lock.wait(DEAD_LOCK_CHECK_INTERVAL); } catch (InterruptedException ie) { // Do nothing : this catch is just mandatory by contract } finally { waiters--; if (!ready) { checkDeadLock(); } } } } return this; } } 真正的绑定端口开始监听动作是在Woker线程中执行的 取消绑定操作与绑定操作类似,暂时先不描述。 3.AbstractPollingIoAcceptor的工作者线程是NioSocketAcceptor的核心所在,完成了以下三个主要功能: 1、处理绑定监听端口请求 2、处理取消监听端口绑定请求 3、处理socket连接请求 1,2主要是在系统初始化或者系统关闭的时候在registerQuerue/cancelQueue中增加一个消息,3是系统运行时NioSocketAcceptor处理socket建链请求的关键。1,2,3的主要内容在工作者线程Work.run方法中。 NioSocketAcceptor继承自父类AbstractPollingIoAcceptor的Worker.run private class Worker implements Runnable { public void run() { int nHandles = 0; while (selectable) { try { // Detect if we have some keys ready to be processed boolean selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port in which this class will // listen on nHandles += registerHandles(); if (selected) { processHandles(selectedHandles()); } // check to see if any cancellation request has been made. nHandles -= unregisterHandles(); if (nHandles == 0) { synchronized (lock) { if (registerQueue.isEmpty() && cancelQueue.isEmpty()) { worker = null; break; } } } } catch (Throwable e) { ExceptionMonitor.getInstance().exceptionCaught(e); try { Thread.sleep(1000); } catch (InterruptedException e1) { ExceptionMonitor.getInstance().exceptionCaught(e1); } } } if (selectable && isDisposing()) { selectable = false; try { if (createdProcessor) { processor.dispose(); } } finally { try { synchronized (disposalLock) { if (isDisposing()) { destroy(); } } } catch (Exception e) { Except ionMonitor.getInstance().exceptionCaught(e); } finally { disposalFuture.setDone(); } } } } 3.1 首先系统在一个无限循环中不停的允许,处理socket建链请求/端口监听/取消端口监听请求,selectable是一个标志位,初始化的时候置为true,要关闭时候置为false,则系统退出循环。 3.2 使用selector监听是否有连接请求操作。 public void run() { int nHandles = 0; while (selectable) { try { // Detect if we have some keys ready to be processed boolean selected = select(); //以下省略 } 注意,虽然 boolean selected = select();是一个堵塞操作,但是run方法不会陷入死循环。因为即使没有新的连接请求到达,但是每次bind/unbind都会调用NioSocketAccepto.wakeup唤醒处于select状态的selctor。 protected void wakeup() { selector.wakeup(); } 而系统退出是,关闭的acceptor的dispose方法最终会调用unbind,所以退出时不会有问题。 3.2 从registerQueue中获取绑定请求消息,开始绑定某个端口,并开始监听,准备相关上下文信息 public void run() { int nHandles = 0; while (selectable) { try { // Detect if we have some keys ready to be processed boolean selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port in which this class will // listen on nHandles += registerHandles(); //以下省略 } [/code] 对于registerHandleres真正完成了对端口的监听 [code="java"] private int registerHandles() { for (;;) { AcceptorOperationFuture future = registerQueue.poll(); if (future == null) { return 0; } Map newHandles = new HashMap(); List localAddresses = future.getLocalAddresses(); try { for (SocketAddress a : localAddresses) { H handle = open(a); newHandles.put(localAddress(handle), handle); } boundHandles.putAll(newHandles); // and notify. future.setDone(); return newHandles.size(); } catch (Exception e) { future.setException(e); } finally { // Roll back if failed to bind all addresses. if (future.getException() != null) { for (H handle : newHandles.values()) { try { close(handle); } catch (Exception e) { ExceptionMonitor.getInstance().exceptionCaught(e); } } wakeup(); } } } } 3.2.1 registerHandlers将registerQueue中的所有绑定监听消息取出来,在循环中处理,注意一个细节是,一个绑定监听消息AcceptorOperationFuture可能要绑定监听多个ip的同一个端口 3.2.2 registerHandlers的一个关键动作是 [code="java"] H handle = open(a);[/code] 参看NioSocketAcceptor.open方法的实现,发现这就是关键 protected ServerSocketChannel open(SocketAddress localAddress) throws Exception { ServerSocketChannel c = ServerSocketChannel.open(); boolean success = false; try { c.configureBlocking(false); // Configure the server socket, c.socket().setReuseAddress(isReuseAddress()); // XXX: Do we need to provide this property? (I think we need to remove it.) c.socket().setReceiveBufferSize( getSessionConfig().getReceiveBufferSize()); // and bind. c.socket().bind(localAddress, getBacklog()); c.register(selector, SelectionKey.OP_ACCEPT); success = true; } finally { if (!success) { close(c); } } return c; } 在这个方法中真正开始监听一个端口,并设置相关细节 1.采用非堵塞方式 2.可从用端口 3.socket读取的缓冲区大小 4.listen队列的长度 并且在selector中注册了对SelectionKey.OP_ACCEPT的关注。 3.2.3 通过 future.setDone();唤醒bind线程,告知绑定操作已经成功,否则bind线程还将继续堵塞。 3.3 如果发现有连接请求过来,则处理之 private class Worker implements Runnable { public void run() { int nHandles = 0; while (selectable) { try { // Detect if we have some keys ready to be processed boolean selected = select(); // this actually sets the selector to OP_ACCEPT, // and binds to the port in which this class will // listen on nHandles += registerHandles(); if (selected) { processHandles(selectedHandles()); } //以下省略 processHandles是负责完成新建一个连接,并将这个连接交给NioProcessor监控 private void processHandles(Iterator handles) throws Exception { while (handles.hasNext()) { H handle = handles.next(); handles.remove(); T session = accept(processor, handle); if (session == null) { break; } finishSessionInitialization(session, null, null); // add the session to the SocketIoProcessor session.getProcessor().add(session); } } 3.3.1 accept方法创建了一个NioSocketSession, 3.3.2 finishSessionInitialization对这个session进行初始化 3.3.3 将session交给NioProcessor管理,这里有两个地方需要注意 3.3.3.1 processor是一个SimpleIoProcessorPool,里面有个多个NioProcessor,SimpleIoProcessor将轮询取一个processor负责管理新建的NioSocketSession 3.3.3.2 NioProcessor.add方法只是将NioSocketSession放到一个newSessions的队列中,并启动NioProcessor的工作者线程。不会马上生效,要等NioProcessor的worker线程执行addNew的时候,才会真正开始管理新增的session,这个与Acceptor的bind类似 public final void add(T session) { if (isDisposing()) { throw new IllegalStateException("Already disposed."); } newSessions.add(session); startupWorker(); } 至此NioSocketAcceptor的基本实现已经描述完毕,相信读者对也有一个初步的认识。 思考题: 1. 描述一下NioSocketAcceptor处理一个新连接请求的全过程; 2. 下面代码中,mina做了什么,是怎么关闭监听端口的。 [code="java"] NioSocketAcceptor acceptor = new NioSocketAcceptor(); //省略 accetpro.dispose(); [/code]

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 6 金币 [ 分享文档获得金币 ] 0 人已下载

下载文档