在YARN上运行Spark

jopen 8年前

在YARN 上运行 Spark

在Spark0.6.0 版本开始支持 YARN 模式,随后的版本在逐渐地完善。

在YARN 上启动 Spark

确保HADOOP_CONF_DIR或YARN_CONF_DIR属性的值已经指向了Hadoop 集群的配置文件。 Spark 通常使用这些配置信息来向 HDFS 写入数据和连接到 YARN 资源管理器。这个目录下所有的文件将会被分发到 YARN 集群中,所以所有应用使用的容器都使用同样的配置。如果 Java 的系统属性或 YARN 没有管理的环境变量等配置,它们应该在 Spark  的应用配置项中配置。

在YARN 上启动 Spark 有两种部署模式。在Cluster 模式中, Spark 的 driver 程序运行在被 YARN 管理的集群中的任何一个 master 进程中,并且 client 初始化应用后可以退出。在 Client 模式中, driver 程序运行在 client 进程中,并且这个应用程序的 master 只能被用来从 YARN 上请求资源。

和Spark Standalone 和 Mesos 模式不同的是, master 的地址被指定在 --master 参数中,在 YARN 模式中, ResourceManager 的地址可以在 Hadoop 的配置文件中找到。这样, --master 的的参数是 yarn 。

在cluster 模式中启动 Spark 应用程序:

$ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

举例:

$ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode cluster \

--driver-memory 4g \

--executor-memory 2g \

--executor-cores 1 \

--queue thequeue \

lib/spark-examples*.jar \

10

上面的应用例子将会启动一个YARN client 程序,它将会启动默认的应用 Master 。而 SparkPi 将会作为应用 Master 的一个子线程运行。 client 将会周期性地轮询应用 Master 来达到转态的更新并把它们显示在 console 终端。一旦你的应用程序运行完毕, client 将会退出。

在client 模式中启动 Spark 应用和 cluster 模式一样,只是将 cluster 替换为 client 。如下所示:

$ ./bin/spark-shell --master yarn --deploy-mode client

添加其他Jar

在cluster 模式中, driver 程序和 client 在不同的机器上,所以只对于本机的可行的 SparkContext.addJar 将会失效。为了使 client 继续能使用 SparkContext.addJar, 可以在创建命令时给 --jars 选项赋值。

$ ./bin/spark-submit --class my.main.Class \

--master yarn \

--deploy-mode cluster \

--jars my-other-jar.jar,my-other-other-jar.jar

my-main-jar.jar

app_arg1 app_arg2

预备

在YARN 上运行 Spark 要求一个支持 YARN 的一个二进制发布包。你可以在官网上下载,也可以自己编译一个。

配置

Spark on YARN 上的许多配置和其他模式基本上一样。

调试你的应用程序

在YARN 中, executor 和应用 master 运行在“ containers ”(容器)中。 应用程序运行完毕后, YARN 提供了两种存放容器日志的方式。如果日志聚合服务被开启的话(通过 yarn.log-aggregation-enable来配置),容器日志将会被拷贝到 HDFS 中并且删除本机上的日志文件。这些日志文件使用 yarn logs 命令可以在任何一台集群中的机器看到。如下:

yarn logs -applicationId <app ID>

上面的命令将会打印出应用程序申请到的所有容器日志文件的内容。你也可以通过HDFS shell 或 API 来直接看这些容器文件。这些日志文件的目录可以查看 YARN 配置( yarn.nodemanager.remote-app-log-dir and yarn.nodemanager.remote-app-log-dir-suffix)。这些日志在 Spark Web UI 的“ Executors ”的选项卡中也能看到。你需要启动 Spark history server 和 MapReduce history server 并且正确地在 yarn-site.xml配置好 yarn.log.server.url选项。这个Spark history server UI 的日志 URL 将会把重定向到 MapReduce 的 history server ,从而显示日志信息。

当日志聚合服务关闭时,日志被保留在每台机器的YARN_APP_LOGS_DIR目录下,该目录通常被用来配置为/tmp/logs或$HADOOP_HOME/logs/userlogs,这取决于Hadoop 的版本和安装。查看一个容器的日志信息需要到对应的主机上的这个目录下查找。子目录名称通过应用 ID 和容器 ID 来构成。这种日志在 Spark WebUI 的 Executors 选项卡中也能看到并且不要求启动 MapReduce history server ,因为不需要读取 HDFS 上的数据。

回顾一下每个容器创建的环境,增加yarn.nodemanager.delete.debug-delay-sec到一个大数值(比如36000),并且在容器上创建的节点上的yarn.nodemanager.local-dirs中得到应用程序的缓存。这个目录包括创建的脚本,JARs 和用于创建每个容器的所有环境变量。它对于调试 classpath 问题是特别有用的。(注意允许这种方式在集群的设置和所有节点的重启需要管理员权限,这样的话它宿主机上不可用。)

对每个应用的 master 或 executors 使用自定义的 log4j 配置的话, 请看下:

  • 用spark-submit 上传一个自己编写的 log4j.properties 文件,通过 --files 参数把它和应用一起提交。
  • 给每个driver 添加值 -Dlog4j.configuration=<location of configuration file>到spark.driver.extraJavaOptions选项中。注意如果使用该选项的话,该文件需要在所有的节点上都存在。

l 上传$SPARK_CONF_DIR/log4j.properties文件后,它会和其他的配置一样自己更新。注意如果多个option 指定时,上面介绍的那种 option 比这种有更高的优先权。

Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).

注意,对于第一种option 而言,所有的 executors 和应用程序 master 将会使用同样的 log4j 配置,当他们运行在一样的节点上可能会出问题(例如:写入到同样的日志文件中,也就是并发写,不难理解吧)

如果在 Yarn 中你需要一个合适的位置来存放日志文件,通过在你的 log4j.properties 中配置 spark.yarn.app.container.log.dir,那么 yarn 可以更好的聚合它们并展示。例如:

log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.对于Streaming 程序而言,配置 RollingFileAppender和yarn 的日志文件目录 将避免大日志文件造成的磁盘移除,而且,日志也可以很好地被 YARN 使用。

重点提示

  • 在调度决策中主要的资源请求是否得到,取决于正在使用的调度器和它的具体配置。
  • 在cluster 模式中, Spark executors 和 driver 将会使用为 YARN 配置的本地文件目录( Hadoop YARN 配置项  yarn.nodemanager.local-dirs)。如果使用特定的spark.local.dir,它将会失效。在client 模式中, Spark executors 将会使用 YARN 配置的本地目录,但 Spark driver 将使用 spark.local.dir选项定义好的。这是因为Client 模式下 Spark driver 只是 Spark 的 executor 在执行,没有运行在 YARN 集群中。
  • --files和--archives选项支持和 Hadoop 一样使用 # 来指定文件别名。例如:你可以指定 --files localtest.txt#appSees.txt,那么它将会把本地文件的 localtest.txt 文件上传到 HDFS 中,可理解为,它在 HDFS 中文件名将是 appSees.tx ,在 YARN 中使用 appSees.txt 文件名即可。
  • --jars 选项,如果你在使用本地文件和运行在cluster 模式时, SparkContext.addJar函数将会起作用。如果你正在用HDFS 、 HTTP 、 HTTPS 、 FTP 的文件时,它不需要。

Spark 属性配置项,可根据如下列表进行参数的调整:

Property Name Default Meaning
spark.yarn.am.memory 512m Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m , 2g ). In cluster mode, use spark.driver.memory instead.

Use lower-case suffixes, e.g. k , m , g , t , and p , for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.

spark.driver.cores 1 Number of cores used by the driver in YARN cluster mode. Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN Application Master. In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN Application Master instead.
spark.yarn.am.cores 1 Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead.
spark.yarn.am.waitTime 100s In cluster mode, time for the YARN Application Master to wait for the SparkContext to be initialized. In client mode, time for the YARN Application Master to wait for the driver to connect to it.
spark.yarn.submit.file.replication The default HDFS replication (usually 3 ) HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives.
spark.yarn.preserve.staging.files false Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them.
spark.yarn.scheduler.heartbeat.interval-ms 3000 The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN's configuration for the expiry interval, i.e. yarn.am.liveness-monitor.expiry-interval-ms .
spark.yarn.scheduler.initial-allocation.interval 200ms The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms . The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached.
spark.yarn.max.executor.failures numExecutors * 2, with minimum of 3 The maximum number of executor failures before failing the application.
spark.yarn.historyServer.address (none) The address of the Spark history server, e.g. host.com:18080 . The address should not contain a scheme ( http:// ). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For example, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to ${hadoopconf-yarn.resourcemanager.hostname}:18080 .
spark.yarn.dist.archives (none) Comma separated list of archives to be extracted into the working directory of each executor.
spark.yarn.dist.files (none) Comma-separated list of files to be placed in the working directory of each executor.
spark.executor.instances 2 The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled . If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used.
spark.yarn.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
spark.yarn.driver.memoryOverhead driverMemory * 0.10, with minimum of 384 The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
spark.yarn.am.memoryOverhead AM memory * 0.10, with minimum of 384 Same as spark.yarn.driver.memoryOverhead , but for the YARN Application Master in client mode.
spark.yarn.am.port (random) Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend.
spark.yarn.queue default The name of the YARN queue to which the application is submitted.
spark.yarn.jar (none) The location of the Spark jar file, in case overriding the default location is desired. By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a jar on HDFS, for example, set this configuration to hdfs:///some/path .
spark.yarn.access.namenodes (none) A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032 . The Spark application must have access to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters.
spark.yarn.appMasterEnv.[EnvironmentVariableName] (none) Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN. The user can specify multiple of these and to set multiple environment variables. In cluster mode this controls the environment of the Spark driver and in client mode it only controls the environment of the executor launcher.
spark.yarn.containerLauncherMaxThreads 25 The maximum number of threads to use in the YARN Application Master for launching executor containers.
spark.yarn.am.extraJavaOptions (none) A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead.
spark.yarn.am.extraLibraryPath (none) Set a special library path to use when launching the YARN Application Master in client mode.
spark.yarn.maxAppAttempts yarn.resourcemanager.am.max-attempts in YARN The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration.
spark.yarn.am.attemptFailuresValidityInterval (none) Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. This feature is not enabled if not configured, and only supported in Hadoop 2.6+.
spark.yarn.submit.waitAppCompletion true In YARN cluster mode, controls whether the client waits to exit until the application completes. If set to true , the client process will stay alive reporting the application's status. Otherwise, the client process will exit after submission.
spark.yarn.am.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes AM will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.executor.nodeLabelExpression (none) A YARN node label expression that restricts the set of nodes executors will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored.
spark.yarn.tags (none) Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
spark.yarn.keytab (none) The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. (Works also with the "local" master)
spark.yarn.principal (none) Principal to be used to login to KDC, while running on secure HDFS. (Works also with the "local" master)
spark.yarn.config.gatewayPath (none) A path that is valid on the gateway host (the host where a Spark application is started) but may differ for paths for the same resource in other nodes in the cluster. Coupled with spark.yarn.config.replacementPath

, this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.

The replacement path normally will contain a reference to some environment variable exported by YARN (and, thus, visible to Spark containers).

For example, if the gateway node has Hadoop libraries installed on /disk1/hadoop , and the location of the Hadoop install is exported by YARN as the HADOOP_HOME environment variable, setting this value to /disk1/hadoop and the replacement path to $HADOOP_HOME will make sure that paths used to launch remote processes properly reference the local YARN configuration.

spark.yarn.config.replacementPath (none) See spark.yarn.config.gatewayPath .
spark.yarn.security.tokens.${service}.enabled

true

Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled. By default, delegation tokens for all supported services are retrieved when those services are configured, but it's possible to disable that behavior if it somehow conflicts with the application being run.

Currently supported services are: hive , hbase

来自: http://www.cnblogs.com/yourarebest/p/5115512.html