分析spark在yarn-client和yarn-cluster模式下启动
23 Feb 2021文章分析
spark在yarn-client、yarn-cluster模式下启动的流程,yarn是apache开源的一个资源管理的组件。JobTracker在yarn中大致分为了三块:一部分是ResourceManager,负责Scheduler及ApplicationsManager;一部分是ApplicationMaster,负责job生命周期的管理;最后一部分是JobHistoryServer,负责日志的展示;
先看一个spark官网上通过yarn提交用户应用程序的spark-submit脚本,从该脚本开始分析在yarn环境下执行的流程。
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \ # can be client for client mode
--executor-memory 20G \
--num-executors 50 \
/path/to/examples.jar \
1000
在分析源码前需要在父pom.xml中引入yarn资源代码模块,使得其class文件加载到classpath中。
<!-- See additional modules enabled by profiles below -->
<module>resource-managers/yarn</module>
与standalone模式应用启动一样,SparkSubmit#runMain(SparkSubmitArguments, Boolean)是应用程序的入口。由于是在yarn环境下启动,在前期准备submit环境时会有差异,差异点在prepareSubmitEnvironment(SparkSubmitArguments, Option[HadoopConfiguration])方法,在方法中会依args.master、args.deployMode进行模式匹配,当master为yarn时,会将childMainClass设置为org.apache.spark.deploy.yarn.YarnClusterApplication作为资源调度的启动类。
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Set the cluster manager
val clusterManager: Int = args.master match {
case "yarn" => YARN
case "yarn-client" | "yarn-cluster" =>
logWarning(s"Master ${args.master} is deprecated since 2.0." +
" Please use master \"yarn\" with specified deploy mode instead.")
YARN
}
if (deployMode == CLIENT) {
/* 在client模式下 用户程序直接在submit内通过反射机制执行,此时用户自己打的jar和--jars指定的jar都会被加载到classpath中 */
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
/* YARN_CLUSTER_SUBMIT_CLASS在cluster模式下为org.apache.spark.deploy.yarn.YarnClusterApplication */
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
(childArgs, childClasspath, sparkConf, childMainClass)
}
submit()需要的环境准备好之后,通过mainClass构建spark应用,由于目前分析在yarn client模式下的启动,mainClass并不是SparkApplication的实例。因而,app类型为JavaMainApplication。
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
/* standalone模式在 SparkSubmit#prepareSubmitEnvironment(args)中将childMainClass设置为RestSubmissionClient */
app.start(childArgs.toArray, sparkConf)
在start()方法中会通过反射获取得到main方法,然后进行调用执行用户jar包中的代码。进入用户程序(main方法)之后,存在两个重要的类SparkConf和SparkContext,根据config配置信息实例化context上下文。
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
mainMethod.invoke(null, args)
}
/* spark application中使用sparkConf和sparkContext加载环境相关配置 */
val config = new SparkConf().setAppName("spark-app")
.set("spark.app.id", "spark-mongo-connector")
val sparkContext = new SparkContext(config)
在SparkContext#createTaskScheduler(SparkContext, String, String)方法中会根据master确定scheduler和backend。由于master为yarn,在getClusterManager(String)中确定cm的类型为YarnClusterManager。在yarn-client模式下调用createTaskScheduler()和createSchedulerBackend()通过masterUrl和deployMode可得 scheduler为YarnScheduler、backend为YarnClientSchedulerBackend。
/* 当masterUrl为外部资源时 (Yarn、Mesos、K8s),走此处的逻辑: (yarn)cluster模式走YarnClusterScheduler、
(yarn)client走YarnScheduler用于资源调度 */
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
进入YarnClientSchedulerBackend#start()方法,创建client对象去提交任务,然后调用client.submitApplication()使用AM向ResourceManager申请资源。在super.start()中会启动CoarseGrainedSchedulerBackend,等待app的启动成功。
override def start() {
/* 动态申请资源的时候才会调用 SchedulerBackendUtils#getInitialTargetExecutorNumber */
totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
client = new Client(args, conf)
/* 将Application提交之后 # 可看ApplicationMaster#main()的启动 */
bindToYarn(client.submitApplication(), None)
// SPARK-8687: Ensure all necessary properties have already been set before
// we initialize our driver scheduler backend, which serves these properties
// to the executors
/* 调用YarnSchedulerBackend的父类CoarseGrainedSchedulerBackend#start()方法,在start()方法里实现自己 */
super.start()
waitForApplication()
}
进一步看client.submitApplication()提交应用给AppMaster前,如何初始化ContainerContext运行环境、java opts和运行AM的指令,进入createContainerLaunchContext()方法,client模式下amClass为org.apache.spark.deploy.yarn.ExecutorLauncher。在yarn client模式下,都是有appMaster向resourceManager申请--num-executor NUM参数指定的数目。
/**
* Set up a ContainerLaunchContext to launch our ApplicationMaster container.
* This sets up the launch environment, java options, and the command for launching the AM.
*/
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) {
// 设置环境变量及spark-java-opts
val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
/*
* 这个函数的主要作用是将用户自己打的jar包(--jars指定的jar发送到分布式缓存中去),并设置了spark.yarn.user.jar
* 和spark.yarn.secondary.jars这两个参数, 然后这两个参数会被封装程 --user-class-path 传递给
* executor使用
*/
val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources.asJava)
amContainer.setEnvironment(launchEnv.asJava)
// Add Xmx for AM memory
javaOpts += "-Xmx" + amMemory + "m"
val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
javaOpts += "-Djava.io.tmpdir=" + tmpDir
/* 判断是否在cluster集群环境来确定AMclass, client模式下为ExecutorLauncher, 通过AMclass及一些参数构建command 进而构建amContainer */
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
amContainer
}
在super.start()需要重点看一下YarnSchedulerBackend的父类CoarseGrainedSchedulerBackend的start()方法,方法体内创建了一个driverEndpoint的RPC客户端。在YarnSchedulerBackend类中覆盖了createDriverEndpointRef()方法,用子类YarnDriverEndpoint替代DriverEndpoint并重写了其onDisconnected()方法(是由于协议的不同)。
/* YarnSchedulerBackend启动时实例化,负责根ApplicationMaster进行通信 */
private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
override def start() {
// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
}
yarn-client代码分析完之后,进入ApplicationMaster#main(Array[String]),在上文client#createContainerLaunchContext()时,指定amClass为org.apache.spark.deploy.yarn.ExecutorLauncher(main方法中封装了ApplicationMaster),最终调用runExecutorLauncher()运行executor。
private def runExecutorLauncher(): Unit = {
val hostname = Utils.localHostName
val amCores = sparkConf.get(AM_CORES)
rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
amCores, true)
// The client-mode AM doesn't listen for incoming connections, so report an invalid port.
registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))
// The driver should be up and listening, so unlike cluster mode, just try to connect to it
// with no waiting or retrying.
val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(driverHost, driverPort),
YarnSchedulerBackend.ENDPOINT_NAME)
addAmIpFilter(Some(driverRef))
/* 向resourceManager申请根启动--num-executor相同的资源 */
createAllocator(driverRef, sparkConf)
// In client mode the actor will stop the reporter thread.
reporterThread.join()
}
在appMaster#createAllocator()会进入到allocator#allocateResources()申请资源,接着进入handleAllocatedContainers(Seq[Container])方法。在runAllocatedContainers()中在已经申请到的container中运行executor。
/**
* Launches executors in the allocated containers.
*/
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
new ExecutorRunnable(
Some(container), conf, sparkConf, driverUrl, executorId, executorHostname,executorMemory,
executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources
).run()
}
在ExecutorRunnable#startContainer()中会设置本地相关环境变量,然后nmClient会启动container。
def startContainer(): java.util.Map[String, ByteBuffer] = {
/* 此处设置spark.executor.extraClassPath为系统环境变量 */
ctx.setLocalResources(localResources.asJava)
// Send the start request to the ContainerManager
try {
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
在CoarseGrainedExecutorBackend#main(Array[String])启动时会执行run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)的方法。先创建env然后根据env使用CoarseGrainedExecutorBackend作为executor创建rpc。
/* 创建env主要用与Rpc提交相关的请求 */
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
rpc在onStart()的时候会发送RegisterExecutor的请求,用于注册executor的相关信息。
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
Driver端CoarseGrainedSchedulerBackend#receiveAndReply(RpcCallContext)在收到executor注册请求时,会reply一个已经注册成功的响应。
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
executor收到响应后会启动一个exectuor,接下来就是等待Driver发送过来要进行调度的任务(用case LaunchTask匹配请求)。executor执行launchTask(),创建TaskRunner任务运行的流程就与standalone模式相同,yarn-client模式下spark任务提交以及运行的流程就是这样。
override def receive: PartialFunction[Any, Unit] = {
/* Driver响应executor注册成功时接收的请求 */
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
/* Driver发送过来要进行调度的任务 */
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}
}
接下来分析spark app在yarn cluster模式下的启动流程,主要流程和client模式一样,都是从SparkSubmit开始分析,启动环境的差异在于prepareSubmitEnvironment()方法。在cluster模式下会设置childMainClass为org.apache.spark.deploy.yarn.YarnClusterApplication。
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
在sparkContext创建taskScheduler时,会设置其scheduler为YarnClusterScheduler,SchedulerBackend为YarnClusterSchedulerBackend,作为task调度的容器与client模式是有差异的。