当当 Elastic-job 开源项目的十项特性

jopen 6年前


张亮:当当网架构师、当当技术委员会成员、消息中间件组负责人。对架构设计、分布式、优雅代码等领域兴趣浓厚。目前主导当当应用框架ddframe研发,并负责推广及撰写技术白皮书。

一、为什么需要作业(定时任务)?

作业即定时任务。一般来说,系统可使用消息传递代替部分使用作业的场景。两者确有相似之处。可互相替换的场景,如队列表。将待处理的数据放入队列 表,然后使用频率极短的定时任务拉取队列表的数据并处理。这种情况使用消息中间件的推送模式可更好的处理实时性数据。而且基于数据库的消息存储吞吐量远远 小于基于文件的顺序追加消息存储。

当当 Elastic-job 开源项目的十项特性

但在某些场景下则不能互换:

  1. 时间驱动 OR 事件驱动:内部系统一般可以通过事件来驱动,但涉及到外部系统,则只能使用时间驱动。如:抓取外部系统价格。每小时抓取,由于是外部系统,不能像内部系统一样发送事件触发事件。
  2. 批量处理 OR 逐条处理:批量处理堆积的数据更加高效,在不需要实时性的情况下比消息中间件更有优势。而且有的业务逻辑只能批量处理,如:电商公司与快递公司结算,一个月结算一次,并且根据送货的数量有提成。比如,当月送货超过1000则额外给快递公司多1%优惠。
  3. 非实时性 OR 实时性:虽然消息中间件可以做到实时处理数据,但有的情况并不需要如的实时。如:VIP用户降级,如果超过1年无购买行为,则自动降级。这类需求没有强烈的时间要求,不需要按照时间精确的降级VIP用户。
  4. 系统内部 OR 系统解耦。作业一般封装在系统内部,而消息中间件可用于系统间解耦。

二、当当之前在使用什么作业系统?

当当之前使用的作业系统比较散乱,各自为战,大致分为以下4种:

  1. Quartz:Java事实上的定时任务标准。但Quartz关注点在于定时任务而非数据,并无一套根据数据处理而定制化的流程。虽然Quartz可以基于数据库实现作业的高可用,但缺少分布式并行执行作业的功能。
  2. TBSchedule:阿里早期开源的分布式任务调度系统。代码略陈旧,使用timer而非线程池执行任务调度。众所周知,timer在处理异常状况时是有缺陷的。而且TBSchedule作业类型较为单一,只能是获取/处理数据一种模式。还有就是文档缺失比较严重。
  3. Crontab:Linux系统级的定时任务执行器。缺乏分布式和集中管理功能。
  4. Perl:遗留系统使用,目前已不符合公司的Java化战略。

三、elastic-job的来历

elastic-job原本是当当Java应用框架ddframe的一部分,本名dd-job。

ddframe包括编码规范,开发框架,技术规范,监控以及分布式组件。ddframe规划分为4个演进阶段,目前处于第2阶段。3、4阶段涉及的技术组件不代表当当没有使用,只是ddframe还未统一规划。

当当 Elastic-job 开源项目的十项特性

ddframe由各种模块组成,均已dd-开头,如dd-container,dd-soa,dd-rdb,dd-job等。当当希望将 ddframe的各个模块与公司环境解耦并开源以反馈社区。之前开源的Dubbo扩展版本DubboX即是dd-soa的核心模块。而本次介绍的 elastic-job则是dd-job的开源部分,其中监控(但开源了监控方法)和ddframe核心接入等部分并未开源。

四、elastic-job包含的功能

  • 分布式:最重要的功能,如果任务不能在分布式的环境下执行,那么直接使用Quartz就可以了。
  • 任务分片:是elastic-job中最重要也是最难理解的概念。任务的分布式执行,需要将一个任务拆分为n个独立的任务项,然后由分布式的服务器分别执行某一个或几个分片项。
  • 弹性扩容缩容:将任务拆分为n个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下 线,elastic-job将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。举例说明:有3台服务器,分为10个片。则分片项分配如 下:{server1: [0,1,2], server2: [3,4,5], server3: [6,7,8,9]}。如果一台服务器崩溃,则分片项分配如下:{server1: [0,1,2,3,4], server2: [5,6,7,8,9]}。如果新增一台服务器,则分片项分配如下:{server1: [0,1], server2: [2,3] , server3: [4,5,6] , server4: [7,8,9]}。
  • 稳定性:在服务器无波动的情况下,并不会重新分片;即使服务器有波动,下次分片的结果也会根据服务器IP和作业名称哈希值算出稳定的分片顺序,尽量不做大的变动。
  • 高性能:elastic-job会将作业运行状态的必要信息更新到注册中心,但为了考虑性能问题,可以牺牲一些功能,而换取性能的提升。
  • 幂等性:elastic-job可牺牲部分性能用以保证同一分片项不会同时在两个服务器上运行。
  • 失效转移:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。
  • 状态监控:监控作业的运行状态,可以监控数据处理功能和失败次数,作业运行时间等。是幂等性,失效转移必须的功能。
  • 多作业模式:作业可分为简单和数据流处理两种模式,数据流又分为高吞吐处理模式和顺序性处理模式,其中高吞吐处理模式可以开启足够多的线程快速的处理数据,而顺序性处理模式将每个分片项分配到一个独立线程,用于保证同一分片的顺序性,这点类似于kafka的分区顺序性。
  • 其他一些功能,如错过任务重执行,单机并行处理,容错处理,Spring命名空间支持,运维平台等。

五、elastic-job的部署和使用

将使用elastic-job框架的jar/war连接同一个基于Zookeeper的注册中心即可。

当当 Elastic-job 开源项目的十项特性

作业框架执行数据并不限于数据库,且作业框架本身是不对数据进行关联的。作业可以用于处理数据,文件,API等任何操作。

使用elastic-job所需要关注的仅仅是将业务处理逻辑和框架所分配的分片项匹配并处理,如:如果分片项是1,则获取id以1结尾的数据处理。所以如果是处理数据的话,最佳实践是将作业分片项规则和数据中间层规则对应。

通过上面的部署图可以看出来,作业分片只是个逻辑概念,分片和实际数据其实框架是不做任何匹配关系的。而根据分片项和实际业务如何关联,是成功使 用elastic-job的关键所在。为了不让代码写起来很无聊,看起来像if(shardingItem == 1) {do xxx} else if (shardingItem == 2) {do xxx},elastic-job提供了自定义参数,可将分片项序号和实际业务做映射。比如设置为1=北京,2=上海。那么代码中可以通过北京或是上海的 枚举,从业务中的北京仓库或上海仓库取数据。elastic-job更多的还是关注作业调度和分布式分配,处理数据还是交由数据中间层更好些。

诚如刚才所说,最佳实践是将作业分片项规则和数据中间层规则对应,省去作业分片时,再次适配数据中间层的分片逻辑。