京东弹性数据库之BinLake订阅服务

SandyBejah 2年前
   <p style="text-align: center;"><img src="https://simg.open-open.com/show/322c4cef521dac1d3b0940790cebeb4f.jpg"></p>    <p><strong>京东弹性数据库</strong></p>    <p>京东弹性数据库是京东商城十年数据库生产经验总结与升华,兼容MySQL协议,适合海量数据的事务处理、分析计算、动态扩展、灵活复制协议、自动备份恢复、自动历史结转、日志订阅、可全面容器化部署的分布式数据库产品系列。</p>    <p><strong>BinLake订阅服务</strong></p>    <p>一站式MySQL BinLog日志实时采集、统一分发、消费订阅和监控服务。是京东弹性数据库产品系列中首先推出的产品。</p>    <p>BinLake订阅服务平台具有以下优势:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/14093552ec468475f6b5b5067dd63d37.jpg"></p>    <p><strong>BinLake架构设计</strong></p>    <p>BinLake整体架构设计如下:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/2e42c01e1d32c8622ff94a32732aa995.jpg"></p>    <p>说明如下:</p>    <p>1 <strong> BinLake总共包括三大服务组件 </strong></p>    <p>1.1 Wave服务</p>    <p>Wave服务完成实际的数据库Binary Log的持续采集、管理和分发写入到下游的消息发布和订阅系统中。在BinLake集群中会存在N个Wave服务,这些Wave服务共同组成一个无状态集群。</p>    <p>1.2 Tower服务</p>    <p>Tower服务是整个BinLake的管理中心,提供BinLake接入服务的申请、完成Wave服务、数据源、接入应用的管理。当用户申请接入到BinLake中时,会登录到Tower服务提供的申请界面,填写申请接入BinLake的应用信息、数据源信息和Topic信息,Tower服务会按照用户提供的信息做如下判断,并完成用户接入申请,接入流程如下:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/7c1a158b87b121c94b039132f1dfc45c.jpg"></p>    <p>如果不同申请者申请相同数据源的数据采集,由Tower管理端依据其申请的采集规则(如指定表,指定库),如果规则相同,默认复用相同规则的Topic,也可强制生成新的Topic进行订阅。</p>    <p>1.3 Judge服务</p>    <p>Judge服务主要完成两个功能:Wave节点监控信息采集和loadBalance决策。</p>    <ul>     <li> <p>Wave节点监控信息采集:</p> </li>    </ul>    <p>通过在各个Wave服务节点部署agent采集各个Wave服务节点上的监控信息,包括:服务器的内存使用、系统负载、CPU负载、网络负载、JVM的堆内存使用、GC信息、每个Wave服务中的instance个数等,采集到的所有这些信息都会在后续的loadBalance中作为基础metics,参与到最终的loadBalance决策中。</p>    <ul>     <li> <p>loadBalance决策:</p> </li>    </ul>    <p>新应用接入到BinLake时,若需要采集的数据源在BinLake现有的数据源池中不存在,则需要针对于新的数据源在相应的Wave服务上创建对应的instance(数据源与instance是1对1的关系)。那么在创建instance的时候,就需要选在在哪个Wave服务上创建。这时就会请求Judge服务提供的loadBalance决策接口,若Judge服务中没有配置loadBalance plugin,则会返回一个随机的Wave服务节点的IP,那么就会在该随机的Wave服务上创建instance;若配置了loadBalance的plugin,则从Judge服务提供的loadBalance决策接口获得建议Wave服务节点,并从该节点创建新的instance。</p>    <p>2 <strong> <strong>BinLake</strong> <strong>依赖于两大外部服务</strong> </strong></p>    <p>2.1 ZooKeeper</p>    <p>BinLake使用zookeeper服务进行Wave无状态集群的管理、状态同步和消息通知等,包括:</p>    <p>【1】Instance的自动化创建与初始化</p>    <p>【2】Instance的HA</p>    <p>【3】数据源offset实时追踪</p>    <p>【4】binlog分发失败重试</p>    <p>【5】数据源切换自适配</p>    <p>【6】Tower元数据管理</p>    <p>【7】instance消息通知</p>    <p>2.2 消息发布与订阅系统</p>    <p>目前BinLake可以无缝集成JMQ和Kafka,从而进行消息的发布和订阅管理。instance采集到的BinLog Event会发布到JMQ或者Kafka的Topic中,实际的业务应用只需要订阅和消费对应的topic,既可以实时的获得BinLog Event,并在后续的业务逻辑中对获得的Binlog Event进行处理即可。</p>    <p><strong>BinLake部署拓扑</strong></p>    <p>在BinLake服务实际部署时,其拓扑结构如下:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/5566690c6fb32c9ec457c8c222b733b4.jpg"></p>    <p>对上述部署拓扑图说明如下:</p>    <p>(1)一台Tower服务器:用于用户元数据、过滤规则、应用和订阅信息管理</p>    <p>(2)2N+1台ZooKeeper服务器:用于构建一个zookeeper集群,从而进行Wave集群管理和消息通知等</p>    <p>(3)一台Judge服务器:用户采集负载信息,并提供负载均衡建议决策。其中负载信息的采集是通过部署在各个Wave服务器上的Judge-Agent进程定期推送给Judge服务的</p>    <p>(4)N台Wave服务器:构成Wave集群。每台Wave服务器上部署两种服务:</p>    <ul>     <li> <p>Wave服务:用于数据库binary log的采集并分发给下游MQ集群(Kafka或者JMQ)</p> </li>     <li> <p>Judge-Agent服务:用于定期采集Wave服务器的系统以及Wave服务的负载和监控信息,并调用Judge服务提供的Restful接口,推送给Judge服务</p> </li>    </ul>    <p>(5)N台已经存在的线上MySQL服务器:不属于BinLake提供的服务器,是使用的已经存在的MySQL服务器,作为BinLake的数据源</p>    <p>(6)N台已经存在的MQ服务器:不属于BinLake提供的服务器,是已经存在的MQ服务器,处于Wave服务的下游,Wave服务会将采集到的Bianry Log Events分发给MQ集群中的Topic</p>    <p>BinLake实现原理</p>    <p>1 <strong>Wave服务实现原理</strong></p>    <p>Instance的高可用</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/844d58c3ef7cf5976d51fd6997efc5b4.jpg"></p>    <p>BinLake采用ZooKeeper来实现Instance的高可用。每个wave服务在ZooKeeper上都会对应有一个节点,整体流程如下:</p>    <p>1、一个新的topic产生后,Tower将会创建一个新的topic节点。</p>    <p>2、所有的wave服务都会在新的topic节点下创建对应的节点,ZooKeeper将会根据loadBalance决策从这些节点中选取一个作为主,然后在主对应的wave上将创建instance为topic提供服务。</p>    <p>3、当提供服务的主节点对应的wave宕机或不可用时,该wave与节点的连接将被断开,然后该节点会被从可用的节点信息中移除,移除后ZooKeeper集群会选出一个新的可用的节点成为主,其对应的wave会创建instance为该topic提供服务,从而实现高可用。</p>    <h3><strong>高效采集BinaryLog</strong></h3>    <p>(1) 异步NIO网络服务器</p>    <p>采用java nio高性能网络模型,尽可能提高资源利用率,增大服务吞吐量,并发量</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/21fc4132e060fc0860456de17107282a.jpg"></p>    <p>整个工作过程如下:</p>    <p>1.与任何MySQL实例(数据源MySQL)都会创建一个socket长链接,而每个socket长链接都会由Wave服务进程中的一个监听线程,这里暂称为:WatchTread(Wave服务中有一个与CPU个数相符的线程池,该线程就从该线程池中取出)进行持续监听</p>    <p>2.任何一个与MySQL实例相连的长链接产生EventLog,WatchTread都会得到通知(异步notify机制),然后从将该EventLog放入队列中。</p>    <p>3.线程池中的Worker Thread从队列中取出EventLog进行处理(将EventLog格式化为protobuffer格式,然后将格式化之后的消息放入Wave服务端的buffer)。</p>    <p>4.buffer size打满之后,批量发往MQ的消息队列中。</p>    <p>各个线程之间工作互不影响,不会因为转换导致socket读事件阻塞,整个设计采用生产消费模型。</p>    <h3><strong>分发BinLog Event到Topic</strong></h3>    <p>(1) 批量发送到每个topic</p>    <p>binlog event采用批量发送的方式发往消息队列MQ,消除发送端性能瓶颈</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/0bdfa30df5039d995648d3d5f87af787.jpg"></p>    <p>流程如下:</p>    <p>1.获取从转换器converter获取到一条消息,放入buffer</p>    <p>2.重复步骤1,直到buffer的size达到指定阈值,然后执行步骤3</p>    <p>3.将buffer中的消息批量发往MQ消息队列,并重复步骤1,2</p>    <p>批量发送过程采用异步notify机制,不会导致循环cpu彪高发生。</p>    <p>2 <strong>Tower服务实现原理</strong></p>    <p>instance复用及日志过滤</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/1477ba59cea75682d5b6169d84f6eb91.jpg"></p>    <p>说明如下:</p>    <p>(1)Wave中的每个instance与实际的数据源MySQL实例一一对应,针对于同一个MySQL实例,无论多少个业务方采集,都会只有一个instance进行采集</p>    <p>(2)每个业务方都可以针对于一个MySQL实例中的数据设置filter,从而只过滤出自己感兴趣的EventLog,也可以查阅某个instance对应的所有的过滤规则,并复用过滤规则</p>    <p>(3)Wave中的instance对采集到的Eventlog,使用设置的过滤规则依次进行过滤,然后发送到指定的一个或者多个Topic中。</p>    <p><strong>Judge服务实现原理</strong></p>    <p><img src="https://simg.open-open.com/show/0edf79193f7db509a046c04988c4d843.jpg"></p>    <p>从图可以看到整个监控和负载主要由Agent、Alarm和Judge几个模块构成,它们作用的对象都是Wave服务, 此外还依赖JMQ、ES、Jimdb等中间件。</p>    <p>整个工作流程可以归纳为:</p>    <p>1.Agent定时采集JVM、网络、CPU、wave服务TPS等信息,封装成Metric发到JMQ</p>    <p>2.Alarm服务监听JMQ,取到Metric后匹配报警策略,匹配成功则实时向用户发告警信息</p>    <p>3.Judge服务监听JMQ,取到Metric后:</p>    <p>3.1.先将原始数据存入ES</p>    <p>3.2. 根据Metric当前一个小时内的负载进行增量分析,分析结果放入缓存</p>    <p>3.3.Judge服务定期从ES获取一个小时前Metric历史数据分析,分析结果存入缓存</p>    <p>4.Wave服务调用Judge服务的获取负载均衡接口,Judge服务取得3.2和3.3中的分析结果,通过指定的负载算法 计算得到获取负载结果,返回给Wave。 </p>    <p>其中,Agent中采集器模块和Judge中的负载均衡等模块都设计成了插件模式,可以灵活的设置或切换。</p>    <p><strong>BinLake特性</strong></p>    <p>(1)保证Binary Log Event的 At-Least-Once Delivery</p>    <p>(2)集群化服务:提供无状态集群服务,可以通过硬件的横向扩展提高性能</p>    <p>(3)高可用:在无人工参与的情况下,对宕机的服务节点自动恢复</p>    <p>(4)负载自适应:整个集群可以根据每个服务节点的负载情况,自动调整各个集群的工作负荷,从而使集群中的每个节点的负载相当</p>    <p>(5)动态扩容:借助于弹性集群,当集群整体负载升高时,自动创建Wave 容器,并自动加入到集群</p>    <p><strong>BinLake使用限制</strong></p>    <p>(1)目前版本只能采集和订阅MySQL的Binary Log</p>    <p>(2)MySQL的Binary Log的Format只能是Row Base的</p>    <p>(3)MySQL中的表必须要有主键</p>    <p>(4)目前的过滤规则最细粒度支持到表级别</p>    <p><strong>运维支持</strong></p>    <p>京东弹性数据库研发团队为业务部门提供如下服务:</p>    <p style="text-align: center;"><img src="https://simg.open-open.com/show/b9fdc5c293fc8f690e611fe324855022.jpg"></p>    <p> </p>    <p> </p>    <p>来自:http://mp.weixin.qq.com/s?__biz=MzI1NzQyOTM3Ng==&mid=2247484319&idx=1&sn=781a06627a31cdb55cbef344c7888f97&chksm=ea16d970dd6150661487ddfecc53640395e5faabe6341f2dcb44d3c0081e3a7c6c40c8dc3370&mpshare=1&scene=1&srcid=0302CV8KFh6W89hjfZoyY2ik#rd</p>    <p> </p>