【MINA】用mina做业务服之间的通信,实现业务负载均衡思路

学习mina目的还是搭建通信架构,学完mina我们了解了如何实现客户端和服务端,也就是一个正常channel我们是知道怎么建立的

但是问题是,我们应用环境通信分为两种

1.前后端通信


 

其实这个比较好实现,提供一个mina server端,供前端语言通过socket建连接就行,这个通信就算是ok了,编解码等通信解析的细节这里不讲了

以前的游戏服务端架构业务多用短连接,聊天用长连接,聊天的部分其实就是上面表述的情况

现在是长连接的天下,聊天依旧是长连接,业务也做成长连接,实现了真正意义上的长连接游戏架构,这其实就表述了一种当下典型架构,

就是后端提供两个开放的通信端口【即两个mina server】,供前端的socket连接,一个负责聊天,登录,注册,另一个负责其他业务,这样就实现了协议通信的负载均衡

2.后端的业务服通信【这是本文的重点】


 

那么后端的业务就不需要负载均衡吗?比如job,异步更新db,活动副本等

当然也是需要的,怎么做那,先拿1中的做个解释

                         mainserevr[聊天,登录,注册]---nodeserver[其他业务]

这两个mina sever端已经建立起来了,但是两个server之间还不能通信,我们有两个选择,要么在mainserevr上起个mina client去连nodeserver,要么在nodeserver

上起个mina client去连mainserevr,思路肯定是这样的,一旦这个通道建立了,其实互为server和client的,会有一个iosession被通道持有,只要有这个iosession,

就可以主动write,当然对于通道的另一端可以response,也可以通过取得iosession来主动写

实现方式,我们在nodeserevr上提供一个mainserverClient这样一个spring的bean去连接mainserver,这样在nodeserver上就可以向mainserevr发消息了

 

3.带着这个思路设计一下


 

我把游戏中的业务分为

     public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node
	public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server
	public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server
	public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 异步DB
	public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活动
	public static final String SERVER_TYPE_OTHER_STR = "other";// 其他
	public static final String SERVER_TYPE_GM_STR = "GM";//管理端

 

每次启动一种server时,首先启动一次mina serevr,然后启动多个mina client去连接其他的mina server,

比如启动nodeserevr 服务端,然后启动多个client分别连接mainserevr,jobserevr等的服务端,这样我就可以

在nodeserver上给其他业务serevr发请求了,具体启动哪些client看需要

 

搞一个启动server类型的方法

public static ClassPathXmlApplicationContext start(String serverTypeStr) {
        try {
                        //关闭连接池的钩子线程
            ProxoolFacade.disableShutdownHook();
                        //spring 的核心配置文件
            String xmlFile = "applicationContext.xml";

            ....
            log.info("启动 {} server................", serverTypeName);

            // 设置到系统环境变量
            System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");
            System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,
                    serverTypeName);

            // final ClassPathXmlApplicationContext parent = new
            // ClassPathXmlApplicationContext(
            // xmlFile);
            String fileName = null;

              //这是把spring的住配置文件拆分了一部分内容出来,目前是只加载本server需要的bean
            if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {
                fileName = "wolf/app_nodeserver.xml";
            } else {
                fileName = "wolf/app_server.xml";
            }

            //手动启动spring
            final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                    new String[] { xmlFile, fileName });

            if (context != null) {
                ServiceLocator.getInstance().setApplicationContext(context);
            }

            // 启动socket server
            final WolfServer server = (WolfServer) ServiceLocator
                    .getSpringBean("wolf_server");
            server.setServerType(serverType);
                        //这个调用就是我们熟悉的启动mina server端
            server.start();

            //这个动用做两件事,选区需要的serevr类型建立mina client连接
            startClient(server);

                        //钩子线程用来监听应用停止,为了做停止时的后续处理
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                public void run() {
                    _shutdown();
                }
            }, "shutdownHookThread"));

              //为了支持web,springMVC,内置一个web server
            if (NodeSessionMgr.SERVER_TYPE_MAIN_STR
                    .equalsIgnoreCase(serverTypeStr)) {
                JettyServer jettyServer = (JettyServer) ServiceLocator
                        .getSpringBean("jettyServer");
                jettyServer.start();
            }

            log.info("start {} end................", serverTypeName);
            return context;

        } catch (Exception e) {
            e.printStackTrace();
            shutdown();
        } finally {

        }
        return null;
    }

 

在看下startClient(server);

private static void startClient(WolfServer server) {
        // asyncdbServer只会被连接,不会主动连接其他server
                // 这部分目的是过滤那些不需要主动连比人的serevr,比武我这里的异步db,和活动服
        if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB
                || server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {
            return;
        }

        // 发送game Server ip port到mainserver
        Map<String, Object> params = new HashMap<String, Object>();
        params.put("nodeServerIp", server.getIp());
        params.put("nodeServerPort", server.getPort());
        params.put("serverType", server.getServerType());

        //我需要mainserevr的client,就弄个bean在本服
        final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator
                .getSpringBean("mainServerClient");

        //这个位置其实就是mina的client连server端
        mainServerClient.init();
        Object localAddress = mainServerClient.registerNode(params);

                
         //同上,需要jobserevr的client
        final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator
                .getSpringBean("jobServerClient");
        if (jobServerClient != null) {
            jobServerClient.init();
            Map<String, Object> params1 = new HashMap<String, Object>();
            params1.putAll(params);
            jobServerClient.registerNode(params1);
        }
        // }

        .....

    }

 

再看下WolfClientService.init()

public void init() {
        if (start)
            return;
        if (wolfClient == null) {
            log.error("wolf client is null");
            return;
        }
         //mina 的client 连接 mina server
        wolfClient.start();
        if (wolfClient.isConnected())
            start = true;
    }

 

再看下wolfclient.start()

/**
     * 连接一个服务器,并指定处理接收到的消息的处理方法
     * 
     */
    public void start() {
        // this.context.put("resultMgr", this.resultMgr);

        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_9"), processorNum);
        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_0"), corePoolSize);
        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_4"), maxPoolSize);

        if (this.serverIp == null || this.serverIp.equals("")) {
            logger.error(clientName + "没有配置serverIp,不启动.........");
            return;
        }
        String threadPrefix = clientName + "[" + this.serverIp + ":"
                + this.serverPort + "]";
        // exector = Executors.newCachedThreadPool(new
        // NamingThreadFactory(threadPrefix));
        processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,
                processorNum);

        // connector = new NioSocketConnector((Executor) exector, processor);
        connector = new NioSocketConnector(processor);

        // connector.getSessionConfig().setReuseAddress(true);
        DefaultIoFilterChainBuilder chain = connector.getFilterChain();

        if (useLogFilter == 2) {
            chain.addLast("logging", new LoggingFilter());
        }
        // codec filter要放在ExecutorFilter前,因为读写同一个socket connection的socket
        // buf不能并发(事实上主要是读,写操作mina已经封装成一个write Queue)
        chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 设置编码过滤器

        // 添加心跳过滤器,客户端只接受服务端的心跳请求,不发送心跳请求
        // connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);
        // 这里的KeepAliveFilter必须在codec之后,因为KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,则可以在codec之前
        // KeepAliveFilter到底在ExecutorFilter之前好还是之后好,我也不确定
        KeepAliveFilter filter = new KeepAliveFilter(
                new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),
                IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),
                keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,
                30);
        chain.addLast("ping", filter);

        // 添加执行线程池
        executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,
                keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(
                        threadPrefix));

        // 这里是预先启动corePoolSize个处理线程
        executor.prestartAllCoreThreads();

        chain.addLast("exec", new ExecutorFilter(executor,
                IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
                IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
                IoEventType.SESSION_OPENED));

        if (useWriteThreadPool) {
            executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,
                    maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                    new NamingThreadFactory(threadPrefix + "write"));
            executorWrite.prestartAllCoreThreads();
            chain.addLast("execWrite", new ExecutorFilter(executorWrite,
                    IoEventType.WRITE, IoEventType.MESSAGE_SENT));

        }
        // ,logger.isDebugEnabled() ? new
        // LoggingIoEventQueueHandler("execWrite") : nulls

        // 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log
        // 可以配置在ExecutorFilter之后:是为了在工作线程中打印log,不是在NioProcessor中打印
        if (useLogFilter == 1) {
            chain.addLast("logging", new LoggingFilter());
        }

        connector.setHandler(handler);

        connector.getSessionConfig().setReuseAddress(true);
        connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);
        logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                .getString("WolfClient_1")
                + serverIp + ":" + serverPort);
        ConnectFuture cf = null;

        long start = System.currentTimeMillis();
        while (true) {
                        //这地很关键,是个无线循环,每10秒连接一次,直到可以和服务端建立连接,否则一支循环下去
            cf = connector.connect(serverAddress);// 建立连接
            cf.awaitUninterruptibly(10000L);
            if (!cf.isConnected()) {
                if ((System.currentTimeMillis() - start) > timeout) {
                    throw new RuntimeException(
                            com.youxigu.dynasty2.i18n.MarkupMessages
                                    .getString("WolfClient_5")
                                    + serverIp + ":" + serverPort);
                }
                if (cf.getException() != null) {
                    logger.error(com.youxigu.dynasty2.i18n.MarkupMessages
                            .getString("WolfClient_6"), serverIp + ":"
                            + serverPort, cf.getException().getMessage());
                }
                try {
                    Thread.sleep(10000);
                } catch (Exception e) {
                }

                continue;
            }

                        //这就是终极目标了,我们的目的就是在serevr的客户端的bean里,可以拿到这个iosession
            this.setSession(cf.getSession());

            logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                    .getString("WolfClient_10")
                    + serverIp + ":" + serverPort);
            shutDown = false;
            if (handler instanceof WolfMessageChain) {
                WolfMessageChain wmc = WolfMessageChain.class.cast(handler);
                wmc.init(context);
            }

            break;
        }

    }

 

这样后端的业务通信网就可以轻松的建立起来,之后想怎么通信就看你的了  

 

posted on 2015-08-18 11:14  dagangzi  阅读(2417)  评论(0编辑  收藏  举报