mina2.0框架分析

yuqicaida 贡献于2016-07-20

作者 微软用户  创建于2011-05-28 05:29:00   修改者微软用户  修改于2011-05-28 06:34:00字数28642

文档摘要:整个框架最核心的几个包是:org.apache.mina.core.service, org.apache.mina.core.session, org.apache.mina.core.polling以及org.apache.mina.transport.socket。
关键词:

mina2.0框架源码剖析 1. org.apache.mina.core.service 整个框架最核心的几个包是:org.apache.mina.core.service, org.apache.mina.core.session, org.apache.mina.core.polling以及org.apache.mina.transport.socket。       这一篇先来看org.apache.mina.core.service。第一个要说的接口是IoService,它是所有IoAcceptor和IoConnector的基接口.对于一个IoService,有哪些信息需要我们关注呢? 1.1. 底层的元数据信息TransportMetadata,比如底层的网络服务提供(NIO,ARP,RXTX等); 1.2. 通过这个服务创建一个新会话时,新会话的默认配置IoSessionConfig; 1.3. 此服务所管理的所有会话; 1.4. 与这个服务相关所产生的事件所对应的监听者(IoServiceListener); 1.5. 处理这个服务所管理的所有连接的处理器(IoHandler); 1.6. 每个会话都有一个过滤器链(IoFilterChain),每个过滤器链通过其对应的IoFilterChainBuilder来负责构建; 1.7. 由于此服务管理了一系列会话,因此可以通过广播的方式向所有会话发送消息,返回结果是一个WriteFuture集,后者是一种表示未来预期结果的数据结构; 1.8. 服务创建的会话(IoSession)相关的数据通过IoSessionDataStructureFactory来提供; 1.9. 发送消息时有一个写缓冲队列。 1.10. 服务的闲置状态有三种:读端空闲,写端空闲,双端空闲。 1.11. 还提供服务的一些统计信息,比如时间,数据量等。       IoService这个服务是对于服务器端的接受连接和客户端发起连接这两种行为的抽象。      再来从服务器看起,IoAcceptor是IoService 的子接口,它用于绑定到指定的ip和端口,从而接收来自客户端的连接请求,同时会fire相应的客户端连接成功接收/取消/失败等事件给自己的IoHandle去处理。当服务器端的Accpetor从早先绑定的ip和端口上取消绑定时,默认是所有的客户端会话会被关闭,这种情况一般出现在服务器挂掉了,则客户端收到连接关闭的提示。这个接口最重要的两个方法是bind()和unbind(),当这两个方法被调用时,服务端的连接接受线程就启动或关闭了。      再来看一看客户端的连接发起者接口IoConnector,它的功能和IoAcceptor基本对应的,它用于尝试连接到服务器指定的ip和端口,同时会fire相应的客户端连接事件给自己的IoHandle去处理。当connet方法被调用后用于连接服务器端的线程就启动了,而当所有的连接尝试都结束时线程就停止。尝试连接的超时时间可以自行设置。Connect方法返回的结果是ConnectFuture,这和前面说的WriteFuture类似,在后面会有一篇专门讲这个模式的应用。      前面的IoAcceptor和IoConnector就好比是两个负责握手的仆人,而真正代表会话的实际I/O操作的接口是IoProcessor,它对现有的Reactor模式架构的Java NIO框架继续做了一层封装。它的泛型参数指明了它能处理的会话类型。接口中最重要的几个方法, add用于将指定会话加入到此Processor中, 让它负责处理与此会话相关的所有I/O操作。由于写操作会有一个写请求队列,flush就用于对指定会话的写请求队列进行强制刷数据。remove方法用于从此Processor中移除和关闭指定会话,这样就可以关闭会话相关联的连接并释放所有相关资源。updateTrafficMask方法用于控制会话的I/O行为,比如是否允许读/写。        然后来说说IoHandle接口,Mina中的所有I/O事件都是通过这个接口来处理的,这些事件都是上面所说的I/O Processor发出来的,要注意的一点是同一个I/O Processor线程是负责处理多个会话的。包括下面这几个事件的处理: public interface IoHandler  {     void sessionCreated(IoSession session) throws Exception;//会话创建     void sessionOpened(IoSession session) throws Exception;//打开会话,与sessionCreated最大的区别是它是从另一个线程处调用的     void sessionClosed(IoSession session) throws Exception;//会话结束,当连接关闭时被调用     void sessionIdle(IoSession session, IdleStatus status) throws Exception;//会话空闲     void exceptionCaught(IoSession session, Throwable cause) throws Exception;//异常捕获,Mina会自动关闭此连接     void messageReceived(IoSession session, Object message) throws Exception;//接收到消息     void messageSent(IoSession session, Object message) throws Exception;//发送消息  } oHandlerAdapter就不说了,简单地对IoHandler使用适配器模式封装了下,让具体的IoHandler子类从其继承后,从而可以对自身需要哪些事件处理拥有自主权。       来看看IoServiceListener接口,它用于监听IoService相关的事件。 public interface IoServiceListener extends EventListener  {     void serviceActivated(IoService service) throws Exception;//激活了一个新service     void serviceIdle(IoService service, IdleStatus idleStatus) throws Exception; // service闲置     void serviceDeactivated(IoService service) throws Exception;//挂起一个service     void sessionCreated(IoSession session) throws Exception;//创建一个新会话     void sessionDestroyed(IoSession session) throws Exception;//摧毁一个新会话 } IoServiceListenerSupport类就是负责将上面的IoService和其对应的各个IoServiceListener包装到一起进行管理。下面是它的成员变量: private final IoService service;     private final List listeners = new CopyOnWriteArrayList();     private final ConcurrentMap managedSessions = new ConcurrentHashMap();//被管理的会话集(其实就是服务所管理的会话集)     private final Map readOnlyManagedSessions = Collections.unmodifiableMap(managedSessions);//上面的会话集的只读版     private final AtomicBoolean activated = new AtomicBoolean();//被管理的服务是否处于激活状态 激活事件就以会话创建为例来说明: public void fireSessionCreated(IoSession session)  {         boolean firstSession = false;         if (session.getService() instanceof IoConnector)  {//若服务类型是Connector,则说明是客户端的连接服务               synchronized (managedSessions)  {//锁住当前已经建立的会话集                 firstSession = managedSessions.isEmpty();//看服务所管理的会话集是否为空集            }         }         if (managedSessions.putIfAbsent(Long.valueOf(session.getId()), session) != null)  { // If already registered, ignore.             return;   }   if (firstSession) {//第一个连接会话,fire一个虚拟的服务激活事件       fireServiceActivated();   }   //呼叫过滤器的事件处理  session.getFilterChain().fireSessionCreated();// 会话创建  session.getFilterChain().fireSessionOpened();//会话打开  int managedSessionCount = managedSessions.size();   //统计管理的会话数目  if (managedSessionCount > largestManagedSessionCount) {      largestManagedSessionCount = managedSessionCount;  } cumulativeManagedSessionCount ++;  //呼叫监听者的事件处理函数  for (IoServiceListener l : listeners)  {     try   {        l.sessionCreated(session);     } catch (Throwable e) {        ExceptionMonitor.getInstance().exceptionCaught(e);     }  }         } 这里值得注意的一个地方是断开连接会话,设置了一个监听锁,直到所有连接会话被关闭后才放开这个锁。 private void disconnectSessions()      {         if (!(service instanceof IoAcceptor))          {//确保服务类型是IoAcceptor             return;         }         if (!((IoAcceptor) service).isCloseOnDeactivation())          {// IoAcceptor是否设置为在服务失效时关闭所有连接会话             return;         }         Object lock = new Object();//监听锁         IoFutureListener listener = new LockNotifyingListener(lock);         for (IoSession s : managedSessions.values())         {             s.close().addListener(listener);//为每个会话的close动作增加一个监听者         }         try          {             synchronized (lock)             {                 while (!managedSessions.isEmpty())                 {//所管理的会话还没有全部结束,持锁等待                     lock.wait(500);                 }             }         } catch (InterruptedException ie)         {             // Ignored         }     }     private static class LockNotifyingListener implements IoFutureListener      {         private final Object lock;         public LockNotifyingListener(Object lock)          {             this.lock = lock;         }         public void operationComplete(IoFuture future)  {                      synchronized (lock)              {                 lock.notifyAll();             }         }     } 2. mina2.0框架源码剖析二 上一篇介绍了几个核心的接口,这一篇主要介绍实现这些接口的抽象基类。首先是实现IoService接口的AbstractIoService类。它包含了一个Executor来处理到来的事件。每个AbstractIoService都一个AtomicInteger类型的id号,确保每个id的唯一性。 它内部的Executor可以选择是从外部传递进构造函数中,也可以在实例内部自行构造,若是后者,则它将是ThreadPoolExecutor类的一个实例,即是Executor线程池中的一员。代码如下: if (executor == null)          {             this.executor = Executors.newCachedThreadPool();             createdExecutor = true;         }          else          {             this.executor = executor;             createdExecutor = false;         } 其中有一个IdleStatusChecker成员,它用来对服务的空闲状态进行检查,在一个服务激活时会将服务纳入到检查名单中,而在服务失效时会将服务从名单中剔除。会单独开一个线程进行具体的空闲检查,这是通过下面这个线程类来负责的: private class NotifyingTaskImpl implements NotifyingTask  {         private volatile boolean cancelled;//取消检查标志         private volatile Thread thread;         public void run() {             thread = Thread.currentThread();             try {                 while (!cancelled)  {                     //每隔1秒检查一次空闲状态                     long currentTime = System.currentTimeMillis();                     notifyServices(currentTime);                     notifySessions(currentTime);                     try  {                         Thread.sleep(1000);                     } catch (InterruptedException e)  {                         // will exit the loop if interrupted from interrupt()                     }                 }             }  Finally  {                 thread = null;             }         } } 具体的空闲检查代码如下,超过能容忍的最大空闲时间,就会fire出SessionIdle事件,上文也说过空闲有三种类型:读端空,写端空,双端空。 notifyIdleSession1(s, currentTime,s.getConfig().getIdleTimeInMillis(IdleStatus.BOTH_IDLE),IdleStatus.BOTH_IDLE,Math.max(s.getLastIoTime(),s.getLastIdleTime(IdleStatus.BOTH_IDLE))); private static void notifyIdleSession1(AbstractIoSession session, long currentTime,long idleTime, IdleStatus status, long lastIoTime)  {         if (idleTime > 0 && lastIoTime != 0                 && currentTime - lastIoTime >= idleTime)  {             session.getFilterChain().fireSessionIdle(status);         } }  在释放资源的方法时,首先去获取释放锁disposalLock才行,然后具体的释放动作是通过dispose0完成的,接着取消掉空闲检查线程,此外,若线程是内部创建的线程池中的一员,则通过线程池去关闭线程。 public final void dispose()  {         IoFuture disposalFuture;         synchronized (disposalLock)  {//获取释放锁             disposalFuture = this.disposalFuture;             if (!disposing) {                 disposing = true;                 try {                     this.disposalFuture = disposalFuture = dispose0();//具体释放动作                 } catch (Exception e) {                     ExceptionMonitor.getInstance().exceptionCaught(e);                 } finally {                     if (disposalFuture == null) {                         disposed = true;                     }                 }             }         }         idleStatusChecker.getNotifyingTask().cancel();         if (disposalFuture != null) {//无中断地等待释放动作完成             disposalFuture.awaitUninterruptibly();          }         if (createdExecutor)  {通过线程池去关闭线程             ExecutorService e = (ExecutorService) executor;             e.shutdown();             while (!e.isTerminated())  {      try {          e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);          } catch (InterruptedException e1) {             // Ignore; it should end shortly.          }     } }         disposed = true; } 再来看会话初始化完成后的动作每个session都保持有自己的属性映射图,在会话结束初始化时,应该设置这个AttributeMap。 ((AbstractIoSession) session).setAttributeMap(session.getService().getSessionDataStructureFactory().getAttributeMap(session)); 除此以为,还应该为会话配置写请求队列: ((AbstractIoSession) session).setWriteRequestQueue(session.getService().getSessionDataStructureFactory().getWriteRequestQueue(session)); 在初始化时会在会话的属性中加入一项SESSION_CREATED_FUTURE,这个属性会在连接真正建立后从会话中去除。  if (future != null && future instanceof ConnectFuture)  {            session.setAttribute(DefaultIoFilterChain.SESSION_CREATED_FUTURE, future);  } 用户特定的初始化动作在finishSessionInitialization0这个方法中自行实现。 3. mina2.0框架源码剖析三 AbstractIoAcceptor类继承自AbstractIoService基类,并实现了IoAcceptor接口,它主要的成员变量是本地绑定地址。  private final List defaultLocalAddresses = new ArrayList();  private final List unmodifiableDefaultLocalAddresses = Collections.unmodifiableList(defaultLocalAddresses);  private final Set boundAddresses = new HashSet(); 在调用bind或unbind方法时需要先获取绑定锁bindLock,具体的绑定操作还是在bind0这个方法中实现的。一旦绑定成功后,就会向服务监听者发出服务激活的事件(ServiceActivated),同理,解除绑定也是在unbind0这个方法中具体实现的。一旦解除绑定成功后,就会向服务监听者发出服务激活的事件(ServiceDeActivated)。       AbstractIoConnector类继承自AbstractIoService基类,并实现了IoConnect接口,连接超时检查间隔时间默认是50毫秒,超时时间默认为1分钟,用户可以自行配置。此类中重要的方法就是connect方法,其中调用了具体的连接逻辑实现connect0, protected abstract ConnectFuture connect0(SocketAddress remoteAddress,SocketAddress localAddress, IoSessionInitializer sessionInitializer); AbstractIoConnector在AbstractIoService的基础上,在会话初始化结束时增加了一个功能,就是加入了一个监听者,当连接请求被取消时立即结束此会话。 protected final void finishSessionInitialization0(             final IoSession session, IoFuture future) {         // In case that ConnectFuture.cancel() is invoked before         // setSession() is invoked, add a listener that closes the         // connection immediately on cancellation.         future.addListener(new IoFutureListener() {             public void operationComplete(ConnectFuture future) {                 if (future.isCanceled()) {                     session.close();                 }             }         }); } 下面再来看一个IoProcessor接口的基本实现类SimpleIoProcessorPool,它的泛型参数是AbstractIoSession的子类,表示此Processor管理的具体会话类型。并且这个类还实现了池化,它会将多个IoSession分布到多个IoProcessor上去管理。下面是文档中给出的一个示例: // Create a shared pool.  SimpleIoProcessorPool pool =           new SimpleIoProcessorPool(NioProcessor.class, 16);    // Create two services that share the same pool.  SocketAcceptor acceptor = new NioSocketAcceptor(pool);  SocketConnector connector = new NioSocketConnector(pool);    // Release related resources.  connector.dispose();  acceptor.dispose();  pool.dispose(); 与Processor池有关的包括如下这些成员变量: private static final int DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1;//处理池大小,默认是处理器数+1, 便于多核分布处理 private final IoProcessor[] pool;//IoProcessor池 private final AtomicInteger processorDistributor = new AtomicInteger(); Processor池的构造过程,其中有三种构造函数供选择来构造一个Processor : 1. 带参数 ExecutorService 的构造函数. 2. 带参数为 Executor的构造函数. 3. 默认构造函数 pool = new IoProcessor[size];//构建池                  boolean success = false;         try {             for (int i = 0; i < pool.length; i ++) {                 IoProcessor processor = null;                  //有三种构造函数供选择来构造一个Processor                 try {                     try {                         processor = processorType.getConstructor(ExecutorService.class).newInstance(executor);                     } catch (NoSuchMethodException e) {                         // To the next step                     }                                          if (processor == null) {                         try {                             processor = processorType.getConstructor(Executor.class).newInstance(executor);                         } catch (NoSuchMethodException e) {                             // To the next step                         }                     }                                          if (processor == null) {                         try {                             processor = processorType.getConstructor().newInstance();                         } catch (NoSuchMethodException e) {                             // To the next step                         }                     }                 } catch (RuntimeException e) {                     throw e;                 } catch (Exception e) {                     throw new RuntimeIoException(                             "Failed to create a new instance of " + processorType.getName(), e);                 }                 pool[i] = processor;             }                          success = true;         } finally {             if (!success) {                 dispose();             }         } 从Processor池中分配一个processor的过程,注意一个processor是可以同时管理多个session的。 private IoProcessor getProcessor(T session)  {//返回session所在的processor,若没分配,则为之分配一个         IoProcessor p = (IoProcessor) session.getAttribute(PROCESSOR);//看session的属性中是否保存对应的Processor         if (p == null)  {//还没为此session分配processor             p = nextProcessor();//从池中取一个processor             IoProcessor oldp =                 (IoProcessor) session.setAttributeIfAbsent(PROCESSOR, p);             if (oldp != null)  {//原来的processor                 p = oldp;             }         }         return p;     }     private IoProcessor nextProcessor()      {//从池中分配一个Processor         checkDisposal();         return pool[Math.abs(processorDistributor.getAndIncrement()) % pool.length];     } 4. mina2.0框架源码剖析四 前面几篇介绍完了org.apache.mina.core.service这个包,现在进入org.apache.mina.core.session,这个包主要是围绕IoSession展开的,包括会话的方方面面。 IoSession接口与底层的传输层类型无关(也就是不管是TCP还是UDP),它表示通信双端的连接。它提供用户自定义属性,可以用于在过滤器和处理器之间交换用户自定义协议相关的信息。 每个会话都有一个Service为之提供服务,同时有一个Handler负责此会话的I/O事件处理。最重要的两个方法是read和write,这两个方法都是异步执行,若要真正完成必须在其返回结果上进行等待。关闭会话的方法close是异步执行的,也就是应当等待返回的CloseFuture,此外,还有另一种关闭方式closeOnFlush,它和close的区别是会先flush掉写请求队列中的请求数据, 再关闭会话,但同样是异步的。会话的读写类型是可配置的,在运行中可设置此端是否可读写。 一个会话主要包含两个方面的数据,属性映射图,写请求队列,在这里作者使用了工厂模式来为新创建的会话提供这些数据结构。 public interface IoSessionDataStructureFactory  {     IoSessionAttributeMap getAttributeMap(IoSession session) throws Exception;     WriteRequestQueue getWriteRequestQueue(IoSession session) throws Exception; } IoSessionConfig接口用于表示会话的配置信息,主要包括:读缓冲区大小,会话数据吞吐量,计算吞吐量时间间隔,指定会话端的空闲时间,写请求操作超时时间。在这个接口中有一个方法值得注意 void setUseReadOperation(boolean useReadOperation); 通过它来设置IoSession的read方法是否启用,若启用的话,则所有接收到的消息都会存储在内部的一个阻塞队列中,好处在于可以更方便用户对信息的处理,但对于某些应用来说并不管用,而且还会造成内存泄露,因此默认情况下这个选项是不开启的。 IoSessionInitializer接口定义了一个回调函数,这在AbstractIoService这个类中的finishSessionInitialization方法中已经见识过它的使用了,用于把用户自定义的会话初始化行为剥离出来。 public interface IoSessionInitializer  {     void initializeSession(IoSession session, T future); } IoSessionRecycler接口为一个无连接的传输服务提供回收现有会话的服务,主要的方法是: IoSession recycle(SocketAddress localAddress, SocketAddress remoteAddress); 一个会话的读写能力控制通过TrafficMask类来描述,主要是SelectionKey.OP_READ和SelectionKey.OP_WRITE结合。此类使用单例模式实现,还提供了与,或,非,异或等位操作来动态控制会话读写能力。 Mina中的I/O事件类型如下: public enum IoEventType {     SESSION_CREATED,//会话创建     SESSION_OPENED,//会话打开     SESSION_CLOSED,//会话关闭     MESSAGE_RECEIVED,//接收到消息     MESSAGE_SENT,//发送消息     SESSION_IDLE,//空闲     EXCEPTION_CAUGHT,//异常捕获     WRITE,     CLOSE,     SET_TRAFFIC_MASK,//设置读写能力 }  IoEvent类实现了Runnable接口,表示一个I/O事件或一个I/O请求,包括事件类型,所属的会话,事件参数值。最重要的方法就是fire,根据事件类型向会话的过滤器链上的众多监听者发出事件到来的信号。 public void fire() {         switch (getType()) {         case MESSAGE_RECEIVED:             getSession().getFilterChain().fireMessageReceived(getParameter());             break;         case MESSAGE_SENT:             getSession().getFilterChain().fireMessageSent((WriteRequest) getParameter());             break;         case WRITE:             getSession().getFilterChain().fireFilterWrite((WriteRequest) getParameter());             break;         case SET_TRAFFIC_MASK:             getSession().getFilterChain().fireFilterSetTrafficMask((TrafficMask) getParameter());             break;         case CLOSE:             getSession().getFilterChain().fireFilterClose();             break;         case EXCEPTION_CAUGHT:             getSession().getFilterChain().fireExceptionCaught((Throwable) getParameter());             break;         case SESSION_IDLE:             getSession().getFilterChain().fireSessionIdle((IdleStatus) getParameter());             break;         case SESSION_OPENED:             getSession().getFilterChain().fireSessionOpened();             break;         case SESSION_CREATED:             getSession().getFilterChain().fireSessionCreated();             break;         case SESSION_CLOSED:             getSession().getFilterChain().fireSessionClosed();             break;         default:             throw new IllegalArgumentException("Unknown event type: " + getType());         }     } Mina的会话中,有三种类型的闲置状态:1)READER_IDLE ,这表示从远端没有数据到来,读端空闲。2)WRITER_IDLE ,这表示写端没有在写数据。3)BOTH_IDLE,读端和写端都空闲。 为了节约会话资源,可以让用户设置当空闲超过一定时间后关闭此会话,因为此会话可能在某一端出问题了,从而导致另一端空闲超过太长时间。这可以通过使用IoSessionConfig.setIdleTime(IdleStatus,int)来完成,空闲时间阀值在会话配(IoSessionConfig)中设置。 前面介绍过IoSessionDataStructureFactor接口为会话提供所需要的数据结构,DefaultIoSessionDataStructureFactory是其一个默认实现类。它提供的写请求队列内部是一个初始大小为16的循环队列,并且在插入队列尾部和从队列头部取数据时都必须满足互斥同步。 private static class DefaultWriteRequestQueue implements WriteRequestQueue  {         private final Queue q = new CircularQueue(16);         public void dispose(IoSession session) {         }         public void clear(IoSession session) {             q.clear();         }         public synchronized boolean isEmpty(IoSession session) {             return q.isEmpty();         }         public synchronized void offer(IoSession session, WriteRequest writeRequest) {             q.offer(writeRequest);         }         public synchronized WriteRequest poll(IoSession session) {             return q.poll();         } } 5. mina2.0框架源码剖析五 前面介绍过IoSessionRecycler是负责回收不再使用的会话的接口,ExpiringSessionRecycler是其一个实现类,用于回收超时失效的会话。 private ExpiringMap sessionMap;//待处理的会话集 private ExpiringMap.Expirer mapExpirer;//负责具体的回收工作 sessionMap的键是由本地地址和远端地址共同组成的,值是这两个地址对应的会话。 Expirer类实现了Runnable接口,这个线程负责监控ExpiringMap,并把ExpiringMap中超过阀值的元素从ExpiringMap中移除。这个线程调用了setDaemon(true),因此是作为守护线程在后台运行。具体的处理过程如下: private void processExpires()  {   long timeNow = System.currentTimeMillis();//当前时间   for (ExpiringObject o : delegate.values())    {       if (timeToLiveMillis <= 0)        {           continue;       }       long timeIdle = timeNow - o.getLastAccessTime();//时间差       if (timeIdle >= timeToLiveMillis)        {//超时         delegate.remove(o.getKey());         for (ExpirationListener listener : expirationListeners)          {//呼叫监听者            listener.expired(o.getValue());         }       }    } } 启动/关闭超时检查线程都需要进行封锁机制,这里使用的是读写锁: private final ReadWriteLock stateLock = new ReentrantReadWriteLock();         public void startExpiring()          {             stateLock.writeLock().lock();             try              {                 if (!running)                  {                     running = true;                     expirerThread.start();                 }             }              finally              {                 stateLock.writeLock().unlock();             }         }         public void stopExpiring()          {             stateLock.writeLock().lock();             try              {                 if (running)                  {                     running = false;                     expirerThread.interrupt();                 }             }              finally              {                 stateLock.writeLock().unlock();             }         } 会话超时监听者: private class DefaultExpirationListener implements ExpirationListener {         public void expired(IoSession expiredSession) {             expiredSession.close();//关闭超时的会话         } } 6. org.apache.mina.core.session 上文的内容还有一些没有结尾,这篇补上。在ExpiringMap类中,使用了一个私有内部类ExpiringObject来表示待检查超时的对象,它包括三个域,键,值,上次访问时间,以及用于上次访问时间这个域的读写锁:  private K key;  private V value;  private long lastAccessTime;  private final ReadWriteLock lastAccessTimeLock = new ReentrantReadWriteLock(); 而ExpiringMap中包括了下述几个变量: private final ConcurrentHashMap delegate;//超时代理集合,保存待检查对象 private final CopyOnWriteArrayList> expirationListeners;//超时监听者 private final Expirer expirer;//超时检查线程 现在再来看看IoSession的一个抽象实现类AbstractIoSession。这是它的几个重要的成员变量: private IoSessionAttributeMap attributes;//会话属性映射图 private WriteRequestQueue writeRequestQueue;//写请求队列 private WriteRequest currentWriteRequest;//当前写请求 当要结束当前会话时,会发送一个一个写请求CLOSE_REQUEST。而closeFuture这个CloseFuture会在连接关闭时状态被设置为”closed”,它的监听器是SCHEDULED_COUNTER_RESETTER。 close和closeOnFlush都是异步的关闭操作,区别是前者立即关闭连接,而后者是在写请求队列中放入一个CLOSE_REQUEST,并将其即时刷新出去,若要真正等待关闭完成,需要调用方在返回的CloseFuture等待 public final CloseFuture close() {         synchronized (lock) {             if (isClosing()) {                 return closeFuture;             } else {                 closing = true;             }         }         getFilterChain().fireFilterClose();//fire出关闭事件         return closeFuture;     }     public final CloseFuture closeOnFlush() {         getWriteRequestQueue().offer(this, CLOSE_REQUEST);         getProcessor().flush(this);         return closeFuture;     } 下面来看看读数据的过程: public final CloseFuture close() {         synchronized (lock) {             if (isClosing()) {                 return closeFuture;             } else {                 closing = true;             }         }         getFilterChain().fireFilterClose();//fire出关闭事件         return closeFuture;     }     public final CloseFuture closeOnFlush() {         getWriteRequestQueue().offer(this, CLOSE_REQUEST);         getProcessor().flush(this);         return closeFuture;     }     private Queue getReadyReadFutures() {//返回可被读数据队列         Queue readyReadFutures =             (Queue) getAttribute(READY_READ_FUTURES_KEY);//从会话映射表中取出可被读数据队列         if (readyReadFutures == null) {//第一次读数据             readyReadFutures = new CircularQueue();//构造一个新读数据队列             Queue oldReadyReadFutures =                 (Queue) setAttributeIfAbsent(                         READY_READ_FUTURES_KEY, readyReadFutures);             if (oldReadyReadFutures != null) {                 readyReadFutures = oldReadyReadFutures;             }         }         return readyReadFutures;     }     public final ReadFuture read() {//读数据         if (!getConfig().isUseReadOperation()) {//会话配置不允许 读数据(这是默认情况)             throw new IllegalStateException("useReadOperation is not enabled.");         }         Queue readyReadFutures = getReadyReadFutures();//获取已经可被读数据队列         ReadFuture future;         synchronized (readyReadFutures) {//锁住读数据队列             future = readyReadFutures.poll();//取队头数据             if (future != null) {                 if (future.isClosed()) {//关联的会话已经关闭了,让读者知道此情况                     readyReadFutures.offer(future);                 }             } else {                 future = new DefaultReadFuture(this);                 getWaitingReadFutures().offer(future); //将此数据插入等待被读取数据的队列,这个代码和上面的getReadyReadFutures类似,只是键值不同而已             }         }         return future;     } 再来看写数据到指定远端地址的过程,可以写三种类型数据:IoBuffer,整个文件或文件的部分区域,这会通过传递写请求给过滤器链条来完成数据向目的远端的传输。 public final WriteFuture write(Object message, SocketAddress remoteAddress) {         FileChannel openedFileChannel = null;         try          {             if (message instanceof IoBuffer&& !((IoBuffer) message).hasRemaining())              {// 空消息                 throw new IllegalArgumentException(                 "message is empty. Forgot to call flip()?");             }              else if (message instanceof FileChannel)              {//要发送的是文件的某一区域                 FileChannel fileChannel = (FileChannel) message;                 message = new DefaultFileRegion(fileChannel, 0, fileChannel.size());             }             else if (message instanceof File)              {//要发送的是文件,打开文件通道                 File file = (File) message;                 openedFileChannel = new FileInputStream(file).getChannel();                 message = new DefaultFileRegion(openedFileChannel, 0, openedFileChannel.size());             }         }          catch (IOException e)          {             ExceptionMonitor.getInstance().exceptionCaught(e);             return DefaultWriteFuture.newNotWrittenFuture(this, e);         }         WriteFuture future = new DefaultWriteFuture(this);          getFilterChain().fireFilterWrite(                 new DefaultWriteRequest(message, future, remoteAddress)); //构造写请求,通过过滤器链发送出去,写请求中指明了要发送的消息,目的地址,以及返回的结果   //如果打开了一个文件通道(发送的文件的部分区域或全部),就必须在写请求完成时关闭文件通道         if (openedFileChannel != null) {             final FileChannel finalChannel = openedFileChannel;             future.addListener(new IoFutureListener() {                 public void operationComplete(WriteFuture future) {                     try {                         finalChannel.close();//关闭文件通道                     } catch (IOException e) {                         ExceptionMonitor.getInstance().exceptionCaught(e);                     }                 }             });         }         return future;//写请求成功完成     } 最后,来看看一个WriteRequestQueue的实现,唯一加入的一个功能就是若在队头发现是请求关闭,则会去关闭会话。 private class CloseRequestAwareWriteRequestQueue implements WriteRequestQueue {         private final WriteRequestQueue q;//内部实际的写请求队列         public CloseRequestAwareWriteRequestQueue(WriteRequestQueue q) {             this.q = q;         }         public synchronized WriteRequest poll(IoSession session) {             WriteRequest answer = q.poll(session);             if (answer == CLOSE_REQUEST) {                 AbstractIoSession.this.close();                 dispose(session);                 answer = null;             }             return answer;         }         public void offer(IoSession session, WriteRequest e) {             q.offer(session, e);         }         public boolean isEmpty(IoSession session) {             return q.isEmpty(session);         }         public void clear(IoSession session) {             q.clear(session);         }         public void dispose(IoSession session) {             q.dispose(session);         }     } 7. org.apache.mina.core. polling 前面介绍完了org.apache.mina.core.session这个包,现在开始进入org.apache.mina.core. polling包。这个包里包含了实现基于轮询策略(比如NIO的select调用或其他类型的I/O轮询系统调用(如epoll,poll,kqueue等)的基类。 先来看AbstractPollingIoAcceptor这个抽象基类,它继承自AbstractIoAcceptor,两个泛型参数分别是所处理的会话和服务器端socket连接。底层的sockets会被不断检测,并当有任何一个socket需要被处理时就会被唤醒去处理。这个类封装了服务器端socket的bind,accept和dispose等动作,其成员变量Executor负责接受来自客户端的连接请求,另一个AbstractPollingIoProcessor用于处理客户端的I/O操作请求,如读写和关闭连接。 其最重要的几个成员变量是: private final Queue registerQueue = new ConcurrentLinkedQueue();//注册队列 private final Queue cancelQueue = new ConcurrentLinkedQueue();//取消注册队列 private final Map boundHandles = Collections.synchronizedMap(new HashMap());//本地地址到服务器socket的映射表 先来看看当服务端调用bind后的处理过程: protected final Set bind0(List localAddresses) throws Exception {         AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);//注册请求         registerQueue.add(request);//加入注册队列中,等待worker处理         //创建一个Worker实例,开始工作         startupWorker();         wakeup();         request.awaitUninterruptibly();         // 更新本地绑定地址         Set newLocalAddresses = new HashSet();         for (H handle : boundHandles.values()) {             newLocalAddresses.add(localAddress(handle));         }         return newLocalAddresses;     } 真正的负责接收客户端请求的工作都是Worker线程完成的: private class Worker implements Runnable {         public void run() {             int nHandles = 0;             while (selectable) {                 try {                     // Detect if we have some keys ready to be processed                     boolean selected = select();//检测是否有 SelectionKey已经可以被处理了                     nHandles += registerHandles();//注册服务器sockets句柄,这样做的目的是将Selector的状态置于OP_ACCEPT,并绑定到所监听的端口上,表明接受了可以接收的来自客户端的连接请求,                     if (selected) {                         processHandles(selectedHandles());//处理可以被处理的SelectionKey状态为OP_ACCEPT的服务器socket句柄集(即真正处理来自客户端的连接请求)                     }                     nHandles -= unregisterHandles();//检查是否有取消连接的客户端请求                     if (nHandles == 0) {                         synchronized (lock) {                             if (registerQueue.isEmpty()                                     && cancelQueue.isEmpty()) {//完成工作                                 worker = null;                                 break;                             }                         }                     }                 } catch (Throwable e) {                     ExceptionMonitor.getInstance().exceptionCaught(e);                     try {                         Thread.sleep(1000);//线程休眠一秒                     } catch (InterruptedException e1) {                         ExceptionMonitor.getInstance().exceptionCaught(e1);                     }                 }             }             if (selectable && isDisposing()) {//释放资源                 selectable = false;                 try {                     if (createdProcessor) {                         processor.dispose();                     }                 } finally {                     try {                         synchronized (disposalLock) {                             if (isDisposing()) {                                 destroy();                             }                         }                     } catch (Exception e) {                         ExceptionMonitor.getInstance().exceptionCaught(e);                     } finally {                         disposalFuture.setDone();                     }                 }             }         } private int registerHandles() {//注册服务器sockets句柄         for (;;) {             AcceptorOperationFuture future = registerQueue.poll();             Map newHandles = new HashMap();             List localAddresses = future.getLocalAddresses();             try {                 for (SocketAddress a : localAddresses) {                     H handle = open(a);//打开指定地址,返回服务器socket句柄                     newHandles.put(localAddress(handle), handle);//加入地址—服务器socket映射表中                 }                 boundHandles.putAll(newHandles);//更新本地绑定地址集                 // and notify.                 future.setDone();//完成注册过程                 return newHandles.size();             } catch (Exception e) {                 future.setException(e);             } finally {                 // Roll back if failed to bind all addresses.                 if (future.getException() != null) {                     for (H handle : newHandles.values()) {                         try {                             close(handle);//关闭服务器socket句柄                         } catch (Exception e) {                             ExceptionMonitor.getInstance().exceptionCaught(e);                         }                     }                     wakeup();                 }             }         }     }         private void processHandles(Iterator handles) throws Exception {//处理来自客户端的连接请求             while (handles.hasNext()) {                 H handle = handles.next();                 handles.remove();                 T session = accept(processor, handle);//为一个服务器socket句柄handle真正接收来自客户端的请求,在给定的所关联的processor上返回会话session                 if (session == null) {                     break;                 }                 finishSessionInitialization(session, null, null);//结束会话初始化                 // add the session to the SocketIoProcessor                 session.getProcessor().add(session);             }         }     } 这个类中有个地方值得注意,就是wakeup方法,它是用来中断select方法的,当注册队列或取消注册队列发生变化时需要调用它,可以参看本类的一个子类NioSocketAcceptor的实现: protected boolean select() throws Exception {         return selector.select() > 0;     }     protected void wakeup() {         selector.wakeup();     } 我们可以查阅jdk文档,它对Selector的select方法有如下解释:选择一组键,其相应的通道已为 I/O 操作准备就绪。 此方法执行处于阻塞模式的选择操作。仅在至少选择一个通道、调用此选择器的 wakeup 方法、当前的线程已中断,或者给定的超时期满(以先到者为准)后此方法才返回。 参考资料《Java NIO非阻塞服务器示例》 8. org.apache.mina.transport.socket 这篇来看看AbstractPollingIoConnector抽象类,它用于用于实现客户端连接的轮询策略。处理逻辑基本上和上一篇文章说的AbstractPollingIoAcceptor类似,它继承自AbstractIoConnector,两个泛型参数分别是所处理的会话和客户端socket连接。底层的sockets会被不断检测,并当有任何一个socket需要被处理时就会被唤醒去处理。这个类封装了客户端socket的bind,connect和dispose等动作,其成员变量Executor用于发起连接请求,另一个AbstractPollingIoProcessor用于处理已经连接客户端的I/O操作请求,如读写和关闭连接。 其最重要的几个成员变量是: private final Queue connectQueue = new ConcurrentLinkedQueue();//连接队列 private final Queue cancelQueue = new ConcurrentLinkedQueue();// 取消连接队列 先来看看当服务端调用connect后的处理过程: protected final ConnectFuture connect0(             SocketAddress remoteAddress, SocketAddress localAddress,             IoSessionInitializer sessionInitializer) {         H handle = null;         boolean success = false;         try {             handle = newHandle(localAddress);             if (connect(handle, remoteAddress)) {//若已经连接服务器成功                 ConnectFuture future = new DefaultConnectFuture();                 T session = newSession(processor, handle);//创建新会话                 finishSessionInitialization(session, future, sessionInitializer);//结束会话初始化                 session.getProcessor().add(session);//将剩下的处理交给IoProcessor                 success = true;                 return future;             }             success = true;         } catch (Exception e) {             return DefaultConnectFuture.newFailedFuture(e);         } finally {             if (!success && handle != null) {                 try {                     close(handle);                 } catch (Exception e) {                     ExceptionMonitor.getInstance().exceptionCaught(e);                 }             }         }         ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);         connectQueue.add(request);//连接请求加入连接队列中         startupWorker();//开启工作线程处理连接请求         wakeup();//中断select操作         return request;     } 真正的负责处理客户端请求的工作都是Worker线程完成的: private class Worker implements Runnable {         public void run() {             int nHandles = 0;             while (selectable) {                 try {                       int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);//等待超时时间                     boolean selected = select(timeout);//在超时时限内查看是否有可以被处理的选择键(状态                     nHandles += registerNew();//取出连接队列队头的连接请求,将其注册一个用于连接的新的客户端socket, 并把它加入连接轮询池中                     if (selected) {                         nHandles-= processSessions(selectedHandles());//处理连接请求                     }                     processTimedOutSessions(allHandles());//处理超时连接请求                     nHandles -= cancelKeys();                     if (nHandles == 0) {                         synchronized (lock) {                             if (connectQueue.isEmpty()) {                                 worker = null;                                 break;                             }                         }                     }                 } catch (Throwable e) {                     ExceptionMonitor.getInstance().exceptionCaught(e);                     try {                         Thread.sleep(1000);                     } catch (InterruptedException e1) {                         ExceptionMonitor.getInstance().exceptionCaught(e1);                     }                 }             }             if (selectable && isDisposing()) {                 selectable = false;                 try {                     if (createdProcessor) {                         processor.dispose();                     }                 } finally {                     try {                         synchronized (disposalLock) {                             if (isDisposing()) {                                 destroy();                             }                         }                     } catch (Exception e) {                         ExceptionMonitor.getInstance().exceptionCaught(e);                     } finally {                         disposalFuture.setDone();                     }                 }             }         }     } private int registerNew() {         int nHandles = 0;         for (; ;) {             ConnectionRequest req = connectQueue.poll();//取连接队列队头请求             if (req == null) {                 break;             }             H handle = req.handle;             try {                 register(handle, req);//注册一个用于连接的新的客户端socket, 并把它加入连接轮询池中                 nHandles ++;             } catch (Exception e) {                 req.setException(e);                 try {                     close(handle);                 } catch (Exception e2) {                     ExceptionMonitor.getInstance().exceptionCaught(e2);                 }             }         }         return nHandles;     } private int processSessions(Iterator handlers) {//处理连接请求         int nHandles = 0;         while (handlers.hasNext()) {             H handle = handlers.next();             handlers.remove();             ConnectionRequest entry = connectionRequest(handle);             boolean success = false;             try {                 if (finishConnect(handle)) {//连接请求成功完成,创建一个新会话                     T session = newSession(processor, handle);                     finishSessionInitialization(session, entry, entry.getSessionInitializer());//结束会话初始化                     session.getProcessor().add(session);//将剩下的工作交给IoProcessor去处理                     nHandles ++;                 }                 success = true;             } catch (Throwable e) {                 entry.setException(e);             } finally {                 if (!success) {//若连接失败,则将此连接请求放到取消连接队列中                     cancelQueue.offer(entry);                 }             }         }         return nHandles;     } private void processTimedOutSessions(Iterator handles) {//处理超时的连接请求         long currentTime = System.currentTimeMillis();//当前时间         while (handles.hasNext()) {             H handle = handles.next();             ConnectionRequest entry = connectionRequest(handle);             if (currentTime >= entry.deadline) {//当前时间已经超出了连接请求的底限                 entry.setException(                         new ConnectException("Connection timed out."));                 cancelQueue.offer(entry);//将此连接请求放入取消连接队列中             }         }     } private int cancelKeys() {//把取消队列中的连接请求给cancel掉         int nHandles = 0;         for (; ;) {             ConnectionRequest req = cancelQueue.poll();             if (req == null) {                 break;             }             H handle = req.handle;             try {                 close(handle);//关闭对应的客户端socket             } catch (Exception e) {                 ExceptionMonitor.getInstance().exceptionCaught(e);             } finally {                 nHandles ++;             }         }         return nHandles;     }

下载文档到电脑,查找使用更方便

文档的实际排版效果,会与网站的显示效果略有不同!!

需要 10 金币 [ 分享文档获得金币 ] 0 人已下载

下载文档