spark standalone模式启动源码分析
18 Feb 2021spark目前支持以standalone、Mesos、YARN、Kubernetes等方式部署,本文主要分析apache spark在standalone模式下资源的初始化、用户application的提交,在spark-submit脚本提交应用时,如何将–extraClassPath等参数传递给Driver等相关流程。
从spark-submit.sh提交用户app开始进行分析,--class 为jar包中的main类,/path/to/examples.jar为用户自定义的jar包、1000为运行SparkPi所需要的参数(基于spark 2.4.5分析)。
# Run on a Spark standalone cluster in client deploy mode
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://207.184.161.138:7077 \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar \
1000
在spark的bin目录下的spark-submit.sh脚本中存在调用spark-class.sh,同时会将spark-submit的参数作为"$@"进行传递:
# 在用spark-submit提交程序jar及相应参数时,调用该脚本程序 "$@"为执行脚本的参数,将其传递给spark-class.sh
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
在spark-class.sh中会将参数传递给org.apache.spark.launcher.Main用于启动程序:
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
# 调用build_command()函数将参数传递给 org.apache.spark.launcher.Main这个类,用于启动用户程序
done < <(build_command "$@")
参数传递到org.apache.spark.launcher.Main#main(String[] argsArray)方法用于触发运行spark应用程序,当class为SparkSubmit时,从args中解析校验请求参数,校验参数、加载classpath中的jar、向executor申请的资源来构建bash脚本,触发spark执行应用程序。
public static void main(String[] argsArray) throws Exception {
/* 通过spark-submit脚本启动时为此形式,exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@" */
if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
/* 从spark-submit.sh中解析请求参数,获取spark参数构建执行命令 AbstractCommandBuilder#buildCommand */
cmd = buildCommand(builder, env, printLaunchCommand);
} else {
AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
cmd = buildCommand(builder, env, printLaunchCommand);
}
/*
* /usr/latest/bin/java -cp [classpath options] org.apache.spark.deploy.SparkSubmit --master yarn-cluster
* --num-executors 100 --executor-memory 6G --executor-cores 4 --driver-memory 1G --conf spark.default.parallelism=1000
* --conf spark.storage.memoryFraction=0.5 --conf spark.shuffle.memoryFraction=0.3
* */
for (String c : bashCmd) {
System.out.print(c);
System.out.print('\0');
}
}
进入org.apache.spark.deploy#main()方法体,parseArguments(args)方法会解析spark-submit.class的参数、加载系统环境变量(ignore spark无关的参数),会调用父类 SparkSubmitOptionParser#parse(List<String> args)方法解析参数,然后通过handle()、handleUnknown()、handleExtraArgs()获得应用程序需要的jar(--jars参数)和参数。
def doSubmit(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
在提交应用未指定action参数时,默认为submit类型,以下为SparkSubmitArguments#loadEnvironmentArguments()解析的内容
// Action should be SUBMIT unless otherwise specified
action = Option(action).getOrElse(SUBMIT)
继续跟踪到SparkSubmit#submit(SparkSubmitArguments, Boolean)方法,在spark 1.3以后逐渐采用rest协议进行数据通信,直接进入doRunMain(args: SparkSubmitArguments, uninitLog: Boolean)方法,调用prepareSubmitEnvironment(args)解析应用程序参数,通过自定义类加载器MutableURLClassLoader下载jar包加载class进入jvm:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
/* 设置当前线程的classLoader,MutableURLClassLoader实现了URLClassLoader接口,用于自定义类的加载 */
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
/* 线程默认类加载器假如不设置 采用的是系统类加载器,线程上下文加载器会继承其父类加载器 */
Thread.currentThread.setContextClassLoader(loader)
/* 只有在yarn client模式下,用户的jar、通过--jars上传的jar全部被打包到loader的classpath里面.所以说,只要不少包 无论隐式
* 引用其它包的类还是显式的引用,都会被找到.
* --jars 参数指定的jars在yarn cluster模式下,直接是被封装到childArgs里面了,传递给了yarn.client
*/
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
/* 采用的是上面的类加载器用于加载类class */
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logWarning(s"Failed to load $childMainClass.", e)
}
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication]
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass)
}
try {
/* standalone模式在 SparkSubmit#prepareSubmitEnvironment(args)中将childMainClass设置为RestSubmissionClient */
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable =>
throw findCause(t)
}
}
}
在之前调用prepareSubmitEnvironment(args)时已将mainClass实例化为RestSubmissionClient,使用app.start(childArgs.toArray, sparkConf)使用restclient提交请求,在RestSubmissionClient.filterSystemEnvironment(sys.env)方法会过滤掉非SPARK_或MESOS_开头的环境变量。
override def start(args: Array[String], conf: SparkConf): Unit = {
if (args.length < 2) {
sys.error("Usage: RestSubmissionClient [app resource] [main class] [app args*]")
sys.exit(1)
}
val appResource = args(0)
val mainClass = args(1)
val appArgs = args.slice(2, args.length) /* 参数的顺序是(args.primaryResource(用户jar), args.mainClass, args.childArgs) */
// 过滤系统中的环境变量,只保留以 SPARK_ or MESOS_开头的环境变量
val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
run(appResource, mainClass, appArgs, conf, env)
}
追踪到RestSubmissionClientApp#run方法,将sparkConf转换为sparkProperties并进行过滤(只保留spark.开头的属性),继续跟踪client.createSubmission(submitRequest)提交rest请求。
/** Submits a request to run the application and return the response. Visible for testing. */
def run(
appResource: String,
mainClass: String,
appArgs: Array[String],
conf: SparkConf,
env: Map[String, String] = Map()): SubmitRestProtocolResponse = {
val master = conf.getOption("spark.master").getOrElse {
throw new IllegalArgumentException("'spark.master' must be set.")
}
/* SparkConf创建的时候获取的配置 (以spark.开头的), 转换为SparkProperties */
val sparkProperties = conf.getAll.toMap
val client = new RestSubmissionClient(master)
val submitRequest = client.constructSubmitRequest(
appResource, mainClass, appArgs, sparkProperties, env)
/* 发送创建好的消息Message(submitRequest)到Driver端, postJson(url, request.toJson)解析rest返回的结果 */
client.createSubmission(submitRequest)
}
在RestSubmissionClientApp#createSubmission()方法中验证所有masters地址,开始构建submitUrl然后逐个向master发送请求。在每次发送请求时都会验证master是否可用,当不可用时会将其添加到lostMasters列表中。至此,在standalone模式下提交一个spark application的流程就到此为止。
/**
* Submit an application specified by the parameters in the provided request.
* If the submission was successful, poll the status of the submission and report
* it to the user. Otherwise, report the error message provided by the server.
*/
def createSubmission(request: CreateSubmissionRequest): SubmitRestProtocolResponse = {
logInfo(s"Submitting a request to launch an application in $master.")
var handled: Boolean = false
var response: SubmitRestProtocolResponse = null
for (m <- masters if !handled) {
validateMaster(m)
val url = getSubmitUrl(m)
response = postJson(url, request.toJson)
response match {
case s: CreateSubmissionResponse =>
if (s.success) {
reportSubmissionStatus(s)
handleRestResponse(s)
handled = true
}
case unexpected =>
handleUnexpectedRestResponse(unexpected)
}
}
response
}
客户端提交应用的部分看完了,现在来分析master端如何接收请求并进行处理,在start-master.sh脚本中存在以下脚本,可以以org.apache.spark.deploy.master.Master作为分析代码的入口。
# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.master.Master"
if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
在Master#main方法中启动了RPC运行环境以及Endpoint,RpcEndpoint:RPC端点 ,Spark针对于每个节点(Client/Master/Worker)都称之一个Rpc端点,且都实现RpcEndpoint接口,内部根据不同端点的需求,设计不同的消息和不同的业务处理,如果需要发送(询问)则调用Dispatcher。
def main(argStrings: Array[String]) {
Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
exitOnUncaughtException = false))
Utils.initDaemon(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
/* 创建rpc环境 和 Endpoint(供Rpc调用),在Spark中 Driver, Master ,Worker角色都有各自的Endpoint,相当于各自的Inbox */
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
Master继承了ThreadSafeRpcEndpoint类,重写的receive方法用于接收netty提交的请求,这部分为Master服务启动的过程。
override def receive: PartialFunction[Any, Unit] = {
/* 在AppClient向master注册Application后才会触发master的schedule函数进行launchExecutors操作 */
case RegisterApplication(description, driver) =>
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
driver.send(RegisteredApplication(app.id, self))
schedule() /* todo: 用于调度Driver,具体的调度内容需要详细的看 */
}
}
RestSubmissionClient提交的请求统一由StandaloneRestServer#handleSubmit(String, SubmitRestProtocolMessage, HttpServletResponse)统一进行处理,通过case CreateSubmissionRequest表达式匹配请求的类型,使用DeployMessages.RequestSubmitDriver(driverDescription)申请启动Driver。
// A server that responds to requests submitted by the [[RestSubmissionClient]].
// This is intended to be embedded in the standalone Master and used in cluster mode only.
protected override def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
/* 构建好所有的参数DriverDescription,用于向Driver端发送请求 */
val driverDescription = buildDriverDescription(submitRequest)
/* Driver构建完成后正式向Master发起一个请求,向master请求资源 */
val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription))
val submitResponse = new CreateSubmissionResponse
submitResponse.serverSparkVersion = sparkVersion
submitResponse.message = response.message
submitResponse.success = response.success
submitResponse.submissionId = response.driverId.orNull
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
submitResponse.unknownFields = unknownFields
}
submitResponse
}
}
在Master#receiveAndReply()方法中用createDriver(description)对DriverDescription再进行一次封装,同时通过schedule()进行资源调度到Worker上(在schedule方法中调用launchDriver的方法,会向Worker发送一个LaunchDriver类型请求),最后reply进行rest请求响应。
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule() // 执行调度的逻辑schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}
}
将视角转到Worker#receive()方法中,通过模式匹配case LaunchDriver(driverId, driverDesc)进入如下代码,然后调用driver.start()启动程序。
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
/*
* 在RestSubmissionClient向StandaloneRestServer提交launchDriver请求后,实际上在StandaloneRestServer进行了一层封装
* DriverWrapper. 所以,在此处启动的类是DriverWrapper 而不是用户程序本身,在该main方法里,主要是用自定义类加载器加载了用户的
* main方法,然后开始启动用户程序 初始化sparkContext等;
*/
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
/*
* 此处的Command就是在StandaloneRestServer封装好的
* val command = new Command("org.apache.spark.deploy.worker.DriverWrapper", Seq("",
* "", mainClass)) ++ appArgs, // args to the DriverWrapper
*/
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
进入driver.start()方法,应用会创建Driver所需要的工作目录,同时download用户自定义的jar包 然后开始运行Driver。
/** Starts a thread to run and manage the driver. */
private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
// prepare driver jars and run driver, 下载用户自定义的jar包, buildProcessBuilder该方法有两个默认值的备用参数,主要是准备程序运行的环境 (但并不包含app所在的jar)
val exitCode = prepareAndRunDriver()
// set final state depending on if forcibly killed and process exit code
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
// notify worker of final driver state, possible exception
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
进一步进入到prepareAndRunDriver()方法,程序使用CommandUtils.buildProcessBuilder()结合command所要运行的环境,重新构建一个命令。例如: 本地环境变量、系统classpath, 替换掉传递过来的占位符。
private[worker] def prepareAndRunDriver(): Int = {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir) // 下载用户自定义的jar包
def substituteVariables(argument: String): String = argument match {
case "" => workerUrl
case "" => localJarFilename
case other => other
}
// TODO: If we add ability to submit multiple jars they should also be added here
/* buildProcessBuilder该方法有两个默认值的备用参数,主要是准备程序运行的环境 (但并不包含app所在的jar) */
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
runDriver(builder, driverDir, driverDesc.supervise)
}
进入CommandUtils#buildLocalCommand方法,-cp参数是在buildCommandSeq(Command, Int, String)中构建。
/**
* Build a command based on the given one, taking into account the local environment
* of where this command is expected to run, substitute any placeholders, and append
* any extra class paths.
*/
private def buildLocalCommand(
command: Command,
securityMgr: SecurityManager,
substituteArguments: String => String,
classPath: Seq[String] = Seq.empty,
env: Map[String, String]): Command = {
val libraryPathName = Utils.libraryPathEnvName // 返回系统的path,也就是一些
val libraryPathEntries = command.libraryPathEntries
val cmdLibraryPath = command.environment.get(libraryPathName)
var newEnvironment = if (libraryPathEntries.nonEmpty && libraryPathName.nonEmpty) {
val libraryPaths = libraryPathEntries ++ cmdLibraryPath ++ env.get(libraryPathName)
command.environment + ((libraryPathName, libraryPaths.mkString(File.pathSeparator)))
} else {
/*
* RestSubmissionClient发送过来的环境变量只有 SPARK_和MESOS_ 开头的环境变量,也即是对于driver端System.getenv()系统环境变量获取
* 的值. 如spark-env初始化的 SPARK_ 开头的环境变量,在提交的时候已经创建好了;
*/
command.environment
}
Command(
/*
* 对于driver并不是用户命令的入口,而是一个封装类org.apache.spark.deploy.DriverWrapper, 在封装类里面进一步解析
* 对于executor是这个org.apache.spark.executor.CoarseGrainedExecutorBackend类
*/
command.mainClass,
command.arguments.map(substituteArguments),
newEnvironment,
command.classPathEntries ++ classPath,
Seq.empty, // library path already captured in environment variable
// filter out auth secret from java options
command.javaOpts.filterNot(_.startsWith("-D" + SecurityManager.SPARK_AUTH_SECRET_CONF))) // spark.jars在此处
}
在StandaloneRestServer#buildDriverDescription()方法里指明如何构建Command类型,用命令行执行的是org.apache.spark.deploy.worker.DriverWrapper包装类。
/* 直接执行的是这个封装类,通过自定义urlClassLoader指定classpath的方式加载用户的jar然后通过反射执行 */
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("", "", mainClass) ++ appArgs, // args to the DriverWrapper
environmentVariables, extraClassPath, extraLibraryPath, javaOpts) // 也即是此时spark.jars也即--jars传来的参数在javaOpts里面
进入到DriverManager#main(args: Array[String])方法,通过自定义的classLoader加载jar包,根据mainClass通过反射执行其main()方法,触发用户程序的执行。
def main(args: Array[String]) {
case workerUrl :: userJar :: mainClass :: extraArgs =>
val conf = new SparkConf()
val host: String = Utils.localHostName()
val port: Int = sys.props.getOrElse("spark.driver.port", "0").toInt
val rpcEnv = RpcEnv.create("Driver", host, port, conf, new SecurityManager(conf))
logInfo(s"Driver address: ${rpcEnv.address}")
rpcEnv.setupEndpoint("workerWatcher", new WorkerWatcher(rpcEnv, workerUrl))
val currentLoader = Thread.currentThread.getContextClassLoader
val userJarUrl = new File(userJar).toURI().toURL()
val loader =
if (sys.props.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(Array(userJarUrl), currentLoader)
} else {
new MutableURLClassLoader(Array(userJarUrl), currentLoader)
}
/*
* 此时通过反射从userJarURL获取用户入口代码,调用用户的入口程序,然后执行. 在初始化SparkContext的时候会把spark.jars
* 所指定的所有jar都添加到集群中 为将来执行tasks准备好依赖环境, return c.newInstance()
*/
Thread.currentThread.setContextClassLoader(loader)
setupDependencies(loader, userJar)
// Delegate to supplied main class
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
rpcEnv.shutdown()
}
现在Driver已经启动了,接下来看应用如何启动executor和task的流程,Executor的启动从SparkContext#createTaskScheduler(SparkContext, String, String)方法,方法体中会初始化StandaloneSchedulerBackend类。SparkContext准备完成后会调用_taskScheduler.start()方法启动StandaloneSchedulerBackend方法:
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor (YarnSchedule)
_taskScheduler.start()
private def createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler) = {
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc) /* standalone模式下执行任务调度器executor */
val masterUrls = sparkUrl.split(",").map("spark://" + _)
/* 重点, 用户程序向master注册, executor申请都是由该函数完成的. start是在TaskSchedulerImpl中的start函数里启动的 */
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
}
进入StandaloneSchedulerBackend#start()方法,用CoarseGrainedExecutorBackend构建command命令,然后构建ApplicationDescription对象,将其传入appClient并向Master发起应用注册的请求StandaloneAppClient#tryRegisterAllMasters()方法中发送RegisterApplication(appDescription, self),Master端收到请求后会重新运行schedule()的方法。
override def start() {
super.start()
// Start executors with a few necessary configs for registering with the scheduler
/* 只获取了Executor启动时用到的配置,不包含--jars传递的值 */
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
// 重点关注两个参数 spark.executor.extraLibraryPath spark.driver.extraLibraryPath
val webUrl = sc.ui.map(_.webUrl).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
}
进入Worker#receive()方法,根据case匹配到LaunchExecutor的请求,构建ExecutorRunner对象并调用其start()方法。
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
}
进入ExecutorRunner#start()方法,首先创建了一个worker线程用于执行任务,要执行的方法为fetchAndRunExecutor()。在方法中通过CommandUtils.buildProcessBuilder()创建进程,然后设置执行路径、环境变量以及spark UI相关内容,然后启动进程(process执行类为CoarseGrainedExecutorBackend)。
/**
* Download and run the executor described in our ApplicationDescription
*/
private def fetchAndRunExecutor() {
// Launch the process
val subsOpts = appDesc.command.javaOpts.map {
Utils.substituteAppNExecIds(_, appId, execId.toString)
}
val subsCommand = appDesc.command.copy(javaOpts = subsOpts)
val builder = CommandUtils.buildProcessBuilder(subsCommand, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
// 执行构建完成的ProcessBuilder
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
}
在CoarseGrainedExecutorBackend#receive()方法中接收case LaunchTask(data)的请求,当executor初始化好之后执行executor.launchTask(this, taskDesc)方法。
override def receive: PartialFunction[Any, Unit] = {
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskDesc)
}
}
进入TaskRunner#run()方法,设置TaskMemoryManager、序列化jar文件、初始化各种Metrics统计信息,然后通过task.run()的任务就正常执行了。至此,从使用spark-submit.sh脚本提交用户application在standalone模式下的流程就先分析完成。
override def run(): Unit = {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
/*
* 类加载器设置的是url类加载器, 而其父类加载器是系统类加载器. currentJars是以来的uri, 用户在调用
* updateDependencies将依赖添加至此
*/
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
logInfo(s"Running $taskName (TID $taskId)")
// Run the actual task and measure its runtime.
taskStartTime = System.currentTimeMillis()
taskStartCpu = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
var threwException = true
val value = Utils.tryWithSafeFinally {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
}
}