Spark的任务调度

jopen 9年前

本文尝试从源码层面梳理Spark在任务调度与资源分配上的做法。




先从Executor和SchedulerBackend说起。Executor是真正执行任务的进程,本身拥有若干cpu和内存,可以执行以线程为单位的计算任务,它是资源管理系统能够给予的最小单位。SchedulerBackend是spark提供的接口,定义了许多与Executor事件相关的处理,包括:新的executor注册进来的时候记录executor的信息,增加全局的资源量(核数),进行一次makeOffer;executor更新状态,若任务完成的话,回收core,进行一次makeOffer;其他停止executor、remove executor等事件。下面由makeOffer展开。

makeOffer的目的是在有资源更新的情况下,通过调用scheduler的resourceOffers方法来触发它对现有的任务进行一次分配,最终launch新的tasks。这里的全局 scheduler就是TaskScheduler,实现是TaskSchedulerImpl,它可以对接各种SchedulerBackend的实现,包括standalone的,yarn的,mesos的。SchedulerBackend在做makeOffer的时候,会把现有的 executor资源以WorkerOfffer列表的方式传给scheduler,即以worker为单位,将worker信息及其内的资源交给 scheduler。scheduler拿到这一些集群的资源后,去遍历已提交的tasks并根据locality决定如何launch tasks。

TaskScheduler里,resourceOffers方法会将已经提交的tasks进行一次优先级排序,这个排序算法目前是两种:FIFO或FAIR。得到这一份待运行的tasks后,接下里就是要把schedulerBackend交过来的worker资源信息合理分配给这些tasks。分配前,为了避免每次都是前几个worker被分到tasks,所以先对WorkerOffer列表进行一次随机洗牌。接下来就是遍历tasks,看workers的资源“够不够”,“符不符合”task,ok 的话task就被正式launch起来。注意,这里资源"够不够"是很好判断的,在TaskScheduler里设置了每个task启动需要的cpu个数,默认是1,所以只需要做核数的大小判断和减1操作就可以遍历分配下去。而"符不符合"这件事情,取决于每个tasks的locality设置。

task 的locality有五种,按优先级高低排:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。也就是最好在同个进程里,次好是同个node(即机器)上,再次是同机架,或任意都行。task有自己的locality,如果本次资源里没有想要的 locality资源,怎么办呢?spark有一个spark.locality.wait参数,默认是3000ms。对于 process,node,rack,默认都使用这个时间作为locality资源的等待时间。所以一旦task需要locality,就可能会触发delay scheduling。

到这里,对于任务的分配,资源的使用大致有个了解。实际上,TaskScheduler的resourceOffer里还触发了 TaskSetManager的resourceOffer方法,TaskSetManager的resourceOffer是会检查task的 locality并最终调用DAGScheduler去launch这个task。这些类的名字以及他们彼此的调用关系,看起来是比较乱的。我简单梳理下。

这件事情要从Spark的DAG切割说起。Spark RDD通过其transaction和action操作,串起来形成了一个DAG。action的调用,触发了DAG的提交和整个job的执行。触发之后,由DAGScheduler这个全局唯一的面向stage的DAG调度器来切分DAG,根据是否 shuffle来切成多个小DAG,即stage。凡是RDD之间是窄依赖的,都归到一个stage里,这里面的每个操作都对应成MapTask,并行度就是各自RDD的partition数目。凡是遇到宽依赖的操作,那么就把这一次操作切为一个stage,这里面的操作对应成ResultTask,结果 RDD的partition数就是并行度。MapTask和ResultTask分别可以简单理解为传统MR的Map和Reduce,切分他们的依据本质上就是shuffle。所以shuffle之前,大量的map是可以同partition内操作的。每个stage对应的是多个MapTask或多个 ResultTask,这一个stage内的task集合成一个TaskSet类,由TaskSetManager来管理这些task的运行状态,locality处理(比如需要delay scheduling)。这个TaskSetManager是Spark层面上的,如何管理自己的tasks,即任务线程,这一层与底下资源管理是剥离的。我们上面提到的TaskSetManager的resourceOffer方法,是task与底下资源的交互,这个资源交互的协调人是 TaskScheduler,也是全局的,TaskScheduler对接的是不同的SchedulerBackend的实现(比如 mesos,yarn,standalone),如此来对接不同的资源管理系统。同时,对资源管理系统来说,他们要负责的是进程,是worker上起几个进程,每个进程分配多少资源。所以这两层很清楚,spark本身计算框架内管理线程级别的task,每个stage都有一个TaskSet,本身是个小DAG,可以丢到全局可用的资源池里跑;spark下半身的双层资源管理部分掌控的是进程级别的executor,不关心task怎么摆放,也不关心task运行状态,这是TaskSetManager管理的事情,两者的协调者就是TaskScheduler及其内的SchedulerBackend实现。

SchedulerBackend 的实现,除去local模式的不说,分为细粒度和粗粒度两种。细粒度只有Mesos(mesos有粗细两种粒度的使用方式)实现了,粗粒度的实现者有 yarn,mesos,standalone。拿standalone模式来说粗粒度,每台物理机器是一个worker,worker一共可以使用多少 cpu和内存,启动时候可以指定每个worker起几个executor,即进程,每个executor的cpu和内存是多少。在我看来,粗粒度与细粒度的主要区别,就是粗粒度是进程long-running的,计算线程可以调到executor上跑,但executor的cpu和内存更容易浪费。细粒度的话,可以存在复用,可以实现抢占等等更加苛刻但促进资源利用率的事情。这俩概念还是AMPLab论文里最先提出来并在Mesos里实现的。AMPLab 在资源使用粒度甚至任务分配最优的这块领域有不少论文,包括Mesos的DRF算法、Sparrow调度器等。所以standalone模式下,根据 RDD的partition数,以及每个task需要的cpu数,可以很容易计算每台物理机器的负载量、资源的消耗情况、甚至知道TaskSet要分几批才能跑完一个stage。

来自:http://blog.csdn.net/pelick/article/details/41866845
</div>