Kaa学习指南(Java版)

mxpje 6年前

来自: http://segmentfault.com/a/1190000004389762

Kaa项目学习有一个月时间, 由于是代码量非常庞大的开源项目, 上手起来还是比较困难的, 本文章主要是要和大家分享我的学习过程,遇到的困难,以及我们应该从哪里入手阅读代码比较好。

环境搭建

部署Kaa官方虚拟机Demo,里面包含很多很有用的事例, 可以直接运行, 会有很直观的感受、方便了解Kaa的各个业务, 起到很好的入门作用。

根据官方文档从编译代码开始, 到安装部署调试环境。在搭建环境中我们可以了解到Kaa是运行在JVM环境中的, 具体使用的外部组件有:Zookeeper、PostgreSQL 、MongoDB。Kaa本身有4个服务, kaa-bootstrap(集群)、 kaa-operations(集群)、 kaa-control(主备)、 kaa-admin(用户管理界面)。我们需要了解这些服务的 具体分工 。了解PostgreSQL、MongoDB的基本查询操作。

阅读文档

阅读Kaa的 设计文档 ,了解各个业务类型的。

阅读代码

从Notification Demo入手(下载源码), 从官方给的源码,我们可以分析出客户端程序运行的入口。其实很简单, 只需要如下两行代码, 就可以将KaaClient启动起来。

/** new KaaClient 实例。 new DesktopKaaPlatformContext初始化Kaa上下文信息,主要是将sdk中client.properties信息同步到对象中;    * 还有初始化4个Executor(KaaClient start时候, 执行init()才将线程实例化)。他们分别是:    * lifeCycleExecutor(Kaa启动依赖本线程,如果该线程执行stop其他所有任务将停止,所以说他是kaaClient的生命周期),    * apiExecutor(),    * callbackExecutor(),              * scheduledExecutor(延时任务,典型例子DefaultChannelManager.onServerFailed())   **/  kaaClient = Kaa.newClient(new DesktopKaaPlatformContext());  kaaClient.start(); // 将其启动起来

我们阅读KaaClient的代码时, 要抛弃读WebServer代码的思想。因为这里没有Spring, 没有依赖注入, 只有setter,对象会传来传去, 传到很深的地方,所以我们会看到很多set对象的地方。客户端是多线程的,我们能看到很多创建线程的地方, 我们要能够把其串联起来。

Kaa.newClient主要执行的代码逻辑在其父类 AbstractKaaClient

AbstractKaaClient(KaaClientPlatformContext context, KaaClientStateListener listener) throws IOException, GeneralSecurityException {          /*          * KaaClientPlatformContext 中包含:          * 3个线程 + 1个定时任务线程 (Java Executor), 但未启动 SimpleExecutorContext          * */          this.context = context;          this.stateListener = listener;          if (context.getProperties() != null) {              this.properties = context.getProperties();          } else { // 客户端SDK示例代码 仅仅new了一个 DesktopKaaPlatformContext, 需要初始化client properties              /*              *  是对client.properties 文件进行解析              *  1.获取client.properties              *  2.解析bootstrap server 列表              *  3.等其他sdk中包含的信息              */              this.properties = new KaaClientProperties();          }            // 传递 Base64 编解码能力, BootstrapServers信息是Base64编码记录的          this.properties.setBase64(context.getBase64());            // 获取bootstrapServers信息          Map<TransportProtocolId, List<TransportConnectionInfo>> bootstrapServers = this.properties.getBootstrapServers();          if (bootstrapServers == null || bootstrapServers.isEmpty()) {              throw new RuntimeException("Unable to obtain list of bootstrap servers."); // NOSONAR          }            // 对bootstrapServer列表进行打乱操作          for (Map.Entry<TransportProtocolId, List<TransportConnectionInfo>> cursor : bootstrapServers.entrySet()) {              Collections.shuffle(cursor.getValue());          }            /*          * kaaClientState 数据初始化. kaaClient第一次启动是没有status.properties文件的, 当应用关闭时, Kaa status信息会持久化到status.properties文件中          * */          kaaClientState = new KaaClientPropertiesState(context.createPersistentStorage(), context.getBase64(), this.properties);            /*          * transportContext 实例化, 每个transport都有的功能是生成sync请求信息, 和接收sync请求服务器端的返回信息后,交给对应的manger处理具体业务.          * notes: sync操作就是客户端向服务器发起请求的操作. 当请求内容要有定制的是, 这里是修改点.          * */          TransportContext transportContext = buildTransportContext(properties, kaaClientState);            /*          * bootstrapManager主要功能是, 向bootstrapServer获取 operationServer的服务器列表, 和对operationServer列表的管理          * */          bootstrapManager = buildBootstrapManager(properties, kaaClientState, transportContext);            /*          * channelManager是对channel的管理, 目前有两个channel, bootStrapChannel(HTTP协议, 客户端和bootstrap交互的消息通道), operationChannel(MQTT协议, 客户端和operationServer交互的消息通道)          * notes: mqtt协议是包装在TCP协议上次的协议, 实际编程就是sock编程, 只是多了一次用mqtt协议编解码的过程.          * */          channelManager = buildChannelManager(bootstrapManager, bootstrapServers);            /*          * failoverManager是对channel发生故障后进行的业务操作          * */          failoverManager = buildFailoverManager(channelManager);            /*          * channelManager 和 failoverManager是相互依赖的          * */          channelManager.setFailoverManager(failoverManager);            /*          *  初始化 bootstrapChannel,operationsChannel. 将这两个channel添加到channelManger中管理.          *  添加channel到channelManager的channel list中之后, 每添加一个channel会启动一个使用该channel的线程, 线程会去处理一个阻塞队列. 也就是说会有会有地方去往这个队列中放消息的.          *  其实这两个阻塞队列就是sync操作的中转站. 当需要向服务端获取信息的时候, 向队列中放不同类型TransportType的SyncTask就会自动去执行sync操作了.          * */          initializeChannels(channelManager, transportContext);            bootstrapManager.setChannelManager(channelManager);          bootstrapManager.setFailoverManager(failoverManager);            /*          * 如下manger是处理来着各自transport的操作. transport是来着          * */          profileManager = buildProfileManager(properties, kaaClientState, transportContext);          notificationManager = buildNotificationManager(properties, kaaClientState, transportContext);          eventManager = buildEventManager(properties, kaaClientState, transportContext);          endpointRegistrationManager = buildRegistrationManager(properties, kaaClientState, transportContext);          logCollector = buildLogCollector(properties, kaaClientState, transportContext);          configurationManager = buildConfigurationManager(properties, kaaClientState, transportContext, context.getExecutorContext());            /*          * 将manager注入到transport中, 为了将manger的业务处理能力透传给transport          * */          transportContext.getRedirectionTransport().setBootstrapManager(bootstrapManager);          transportContext.getBootstrapTransport().setBootstrapManager(bootstrapManager);          transportContext.getProfileTransport().setProfileManager(profileManager);          transportContext.getEventTransport().setEventManager(eventManager);          transportContext.getNotificationTransport().setNotificationProcessor(notificationManager);          transportContext.getConfigurationTransport().setConfigurationHashContainer(configurationManager.getConfigurationHashContainer());          transportContext.getConfigurationTransport().setConfigurationProcessor(configurationManager.getConfigurationProcessor());          transportContext.getUserTransport().setEndpointRegistrationProcessor(endpointRegistrationManager);          transportContext.getLogTransport().setLogProcessor(logCollector);          transportContext.initTransports(this.channelManager, this.kaaClientState);            /*          * 构建EventFamily, 给客户端编程暴露接口          * */          eventFamilyFactory = new EventFamilyFactory(eventManager, context.getExecutorContext());  }
</div>