./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master yarn \
  --deploy-mode cluster \  # can be client for client mode
  --executor-memory 20G \
  --num-executors 50 \
  /path/to/examples.jar \


standalone模式应用启动一样,SparkSubmit#runMain(SparkSubmitArguments, Boolean)是应用程序的入口。由于是在yarn环境下启动,在前期准备submit环境时会有差异,差异点在prepareSubmitEnvironment(SparkSubmitArguments, Option[HadoopConfiguration])方法,在方法中会依args.masterargs.deployMode进行模式匹配,当masteryarn时,会将childMainClass设置为org.apache.spark.deploy.yarn.YarnClusterApplication作为资源调度的启动类。

private[deploy] def prepareSubmitEnvironment(
   args: SparkSubmitArguments,
   conf: Option[HadoopConfiguration] = None)
   : (Seq[String], Seq[String], SparkConf, String) = {
    // Set the cluster manager
    val clusterManager: Int = args.master match {
      case "yarn" => YARN
      case "yarn-client" | "yarn-cluster" =>
        logWarning(s"Master ${args.master} is deprecated since 2.0." +
          " Please use master \"yarn\" with specified deploy mode instead.")
    if (deployMode == CLIENT) {
    /* 在client模式下 用户程序直接在submit内通过反射机制执行,此时用户自己打的jar和--jars指定的jar都会被加载到classpath中  */
    	childMainClass = args.mainClass
    	if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
       	childClasspath += localPrimaryResource
    	if (localJars != null) { childClasspath ++= localJars.split(",") }
    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      /* YARN_CLUSTER_SUBMIT_CLASS在cluster模式下为org.apache.spark.deploy.yarn.YarnClusterApplication */
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
    (childArgs, childClasspath, sparkConf, childMainClass) 

submit()需要的环境准备好之后,通过mainClass构建spark应用,由于目前分析在yarn client模式下的启动,mainClass并不是SparkApplication的实例。因而,app类型为JavaMainApplication

val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
} else {
  // SPARK-4170
  if (classOf[scala.App].isAssignableFrom(mainClass)) {
  logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
  new JavaMainApplication(mainClass)
/* standalone模式在 SparkSubmit#prepareSubmitEnvironment(args)中将childMainClass设置为RestSubmissionClient */
app.start(childArgs.toArray, sparkConf)


override def start(args: Array[String], conf: SparkConf): Unit = {
  val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
  if (!Modifier.isStatic(mainMethod.getModifiers)) {
    throw new IllegalStateException("The main method in the given main class must be static")
  val sysProps = conf.getAll.toMap
  sysProps.foreach { case (k, v) =>
    sys.props(k) = v
  mainMethod.invoke(null, args)
/* spark application中使用sparkConf和sparkContext加载环境相关配置 */
val config = new SparkConf().setAppName("spark-app")
	.set("", "spark-mongo-connector")
val sparkContext = new SparkContext(config)

SparkContext#createTaskScheduler(SparkContext, String, String)方法中会根据master确定schedulerbackend。由于masteryarn,在getClusterManager(String)中确定cm的类型为YarnClusterManager。在yarn-client模式下调用createTaskScheduler()createSchedulerBackend()通过masterUrldeployMode可得 schedulerYarnSchedulerbackendYarnClientSchedulerBackend

/* 当masterUrl为外部资源时 (Yarn、Mesos、K8s),走此处的逻辑: (yarn)cluster模式走YarnClusterScheduler、
        (yarn)client走YarnScheduler用于资源调度 */
case masterUrl =>
  val cm = getClusterManager(masterUrl) match {
    case Some(clusterMgr) => clusterMgr
    case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
  val scheduler = cm.createTaskScheduler(sc, masterUrl)
  val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
  cm.initialize(scheduler, backend)


override def start() {
  /* 动态申请资源的时候才会调用 SchedulerBackendUtils#getInitialTargetExecutorNumber */
  totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
  client = new Client(args, conf)
  /* 将Application提交之后  # 可看ApplicationMaster#main()的启动 */
  bindToYarn(client.submitApplication(), None)
  // SPARK-8687: Ensure all necessary properties have already been set before
  // we initialize our driver scheduler backend, which serves these properties
  // to the executors
  /* 调用YarnSchedulerBackend的父类CoarseGrainedSchedulerBackend#start()方法,在start()方法里实现自己 */

进一步看client.submitApplication()提交应用给AppMaster前,如何初始化ContainerContext运行环境、java opts和运行AM的指令,进入createContainerLaunchContext()方法,client模式下amClassorg.apache.spark.deploy.yarn.ExecutorLauncher。在yarn client模式下,都是有appMasterresourceManager申请--num-executor NUM参数指定的数目。

 * Set up a ContainerLaunchContext to launch our ApplicationMaster container.
 * This sets up the launch environment, java options, and the command for launching the AM.
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) {
  // 设置环境变量及spark-java-opts
  val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
   * 这个函数的主要作用是将用户自己打的jar包(--jars指定的jar发送到分布式缓存中去),并设置了spark.yarn.user.jar
   * 和spark.yarn.secondary.jars这两个参数, 然后这两个参数会被封装程 --user-class-path 传递给
   * executor使用
  val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
  val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
  // Add Xmx for AM memory
  javaOpts += "-Xmx" + amMemory + "m"
  val tmpDir = new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR)
  javaOpts += "" + tmpDir
  /* 判断是否在cluster集群环境来确定AMclass, client模式下为ExecutorLauncher, 通过AMclass及一些参数构建command 进而构建amContainer */
  val amClass =
  if (isClusterMode) {
  } else {


/* YarnSchedulerBackend启动时实例化,负责根ApplicationMaster进行通信 */
private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
override def start() {
	// TODO (prashant) send conf instead of properties
	driverEndpoint = createDriverEndpointRef(properties)


private def runExecutorLauncher(): Unit = {
  val hostname = Utils.localHostName
  val amCores = sparkConf.get(AM_CORES)
  rpcEnv = RpcEnv.create("sparkYarnAM", hostname, hostname, -1, sparkConf, securityMgr,
                         amCores, true)
  // The client-mode AM doesn't listen for incoming connections, so report an invalid port.
  registerAM(hostname, -1, sparkConf, sparkConf.getOption("spark.driver.appUIAddress"))
  // The driver should be up and listening, so unlike cluster mode, just try to connect to it
  // with no waiting or retrying.
  val (driverHost, driverPort) = Utils.parseHostPort(args.userArgs(0))
  val driverRef = rpcEnv.setupEndpointRef(
    RpcAddress(driverHost, driverPort),
  /* 向resourceManager申请根启动--num-executor相同的资源 */
  createAllocator(driverRef, sparkConf)
  // In client mode the actor will stop the reporter thread.


 * Launches executors in the allocated containers.
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
  new ExecutorRunnable(
    Some(container), conf, sparkConf, driverUrl, executorId, executorHostname,executorMemory,
    executorCores, appAttemptId.getApplicationId.toString, securityMgr, localResources


def startContainer(): java.util.Map[String, ByteBuffer] = {
  /* 此处设置spark.executor.extraClassPath为系统环境变量 */
  // Send the start request to the ContainerManager
  try {
    nmClient.startContainer(container.get, ctx)
  } catch {
    case ex: Exception =>
    throw new SparkException(s"Exception while starting container ${container.get.getId}" +
                             s" on host $hostname", ex)

CoarseGrainedExecutorBackend#main(Array[String])启动时会执行run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)的方法。先创建env然后根据env使用CoarseGrainedExecutorBackend作为executor创建rpc

/* 创建env主要用与Rpc提交相关的请求 */
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
	env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
workerUrl.foreach { url =>
	env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))


override def onStart() {
  logInfo("Connecting to driver: " + driverUrl)
  rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    driver = Some(ref)
    ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
  }(ThreadUtils.sameThread).onComplete {
    // This is a very fast action so we can use "ThreadUtils.sameThread"
    case Success(msg) =>
    // Always receive `true`. Just ignore it
    case Failure(e) =>
    exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)


// Note: some tests expect the reply to come after we put the executor in the map
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))

executor收到响应后会启动一个exectuor,接下来就是等待Driver发送过来要进行调度的任务(用case LaunchTask匹配请求)。executor执行launchTask(),创建TaskRunner任务运行的流程就与standalone模式相同,yarn-client模式下spark任务提交以及运行的流程就是这样。

override def receive: PartialFunction[Any, Unit] = {
  /* Driver响应executor注册成功时接收的请求 */
  case RegisteredExecutor =>
    logInfo("Successfully registered with driver")
    try {
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    } catch {
      case NonFatal(e) =>
      exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
  /* Driver发送过来要进行调度的任务 */
  case LaunchTask(data) =>
    if (executor == null) {
      exitExecutor(1, "Received LaunchTask command but executor was null")
    } else {
      val taskDesc = TaskDescription.decode(data.value)
      logInfo("Got assigned task " + taskDesc.taskId)
      executor.launchTask(this, taskDesc)

接下来分析spark appyarn cluster模式下的启动流程,主要流程和client模式一样,都是从SparkSubmit开始分析,启动环境的差异在于prepareSubmitEnvironment()方法。在cluster模式下会设置childMainClassorg.apache.spark.deploy.yarn.YarnClusterApplication

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
  if (args.isPython) {
     childArgs += ("--primary-py-file", args.primaryResource)
     childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
  } else if (args.isR) {
     val mainFile = new Path(args.primaryResource).getName
     childArgs += ("--primary-r-file", mainFile)
     childArgs += ("--class", "org.apache.spark.deploy.RRunner")
  } else {
     if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
        childArgs += ("--jar", args.primaryResource)
     childArgs += ("--class", args.mainClass)
  if (args.childArgs != null) {
     args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
