in core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala [244:923]
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = args.toSparkConf()
var childMainClass = ""
// Set the cluster manager
val clusterManager: Int = args.maybeMaster match {
case Some(v) =>
assert(args.maybeRemote.isEmpty)
v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if m.startsWith("k8s") => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, k8s, or local")
-1
}
case None => LOCAL // default master or remote mode.
}
// Set the deploy mode; default is client mode
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>
error("Deploy mode must be either client or cluster")
-1
}
if (clusterManager == YARN) {
// Make sure YARN is included in our build if we're trying to use it
if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
"Could not load YARN classes. " +
"This copy of Spark may not have been compiled with YARN support.")
}
}
if (clusterManager == KUBERNETES) {
args.maybeMaster = Option(Utils.checkAndGetK8sMasterUrl(args.master))
// Make sure KUBERNETES is included in our build if we're trying to use it
if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
error(
"Could not load KUBERNETES classes. " +
"This copy of Spark may not have been compiled with KUBERNETES support.")
}
}
// Fail fast, the following modes are not supported or applicable
(clusterManager, deployMode) match {
case (STANDALONE, CLUSTER) if args.isPython =>
error("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
case (STANDALONE, CLUSTER) if args.isR =>
error("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
error("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
error("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark SQL shell.")
case (_, CLUSTER) if isThriftServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Thrift server.")
case (_, CLUSTER) if isConnectServer(args.mainClass) =>
error("Cluster deploy mode is not applicable to Spark Connect server.")
case _ =>
}
// Update args.deployMode if it is null. It will be passed down as a Spark property later.
(args.deployMode, deployMode) match {
case (null, CLIENT) => args.deployMode = "client"
case (null, CLUSTER) => args.deployMode = "cluster"
case _ =>
}
val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
val isKubernetesClusterModeDriver = isKubernetesClient &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
val isCustomClasspathInClusterModeDisallowed =
!sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) &&
args.proxyUser != null &&
(isYarnCluster || isStandAloneCluster || isKubernetesCluster)
if (!isStandAloneCluster) {
// Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
// too for packages that include Python code
val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
packagesTransitive = true, args.packagesExclusions, args.packages,
args.repositories, args.ivyRepoPath, args.ivySettingsPath)
if (resolvedMavenCoordinates.nonEmpty) {
if (isKubernetesCluster) {
// We need this in K8s cluster mode so that we can upload local deps
// via the k8s application, like in cluster mode driver
childClasspath ++= resolvedMavenCoordinates
} else {
// In K8s client mode, when in the driver, add resolved jars early as we might need
// them at the submit time for artifact downloading.
// For example we might use the dependencies for downloading
// files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
// --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
if (isKubernetesClusterModeDriver) {
val loader = getSubmitClassLoader(sparkConf)
for (jar <- resolvedMavenCoordinates) {
addJarToClasspath(jar, loader)
}
}
args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
if (args.isPython || isInternal(args.primaryResource)) {
args.pyFiles = mergeFileLists(args.pyFiles,
mergeFileLists(resolvedMavenCoordinates: _*))
}
}
}
// install any R packages that may have been passed through --jars or --packages.
// Spark Packages may contain R source code inside the jar.
if (args.isR && !StringUtils.isBlank(args.jars)) {
RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
}
}
// update spark config from args
args.toSparkConf(Option(sparkConf))
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
val targetDir = Utils.createTempDir()
// Kerberos is not supported in standalone mode
if (clusterManager != STANDALONE
&& args.principal != null
&& args.keytab != null) {
// If client mode, make sure the keytab is just a local path.
if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
args.keytab = new URI(args.keytab).getPath()
}
if (!Utils.isLocalUri(args.keytab)) {
require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}
// Resolve glob path for different resources.
args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull
// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
if (deployMode == CLIENT) {
localPrimaryResource = Option(args.primaryResource).map {
downloadFile(_, targetDir, sparkConf, hadoopConf)
}.orNull
localJars = Option(args.jars).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
localPyFiles = Option(args.pyFiles).map {
downloadFileList(_, targetDir, sparkConf, hadoopConf)
}.orNull
if (isKubernetesClusterModeDriver) {
// SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
// in cluster mode, the archives should be available in the driver's current working
// directory too.
// SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current
// working directory
// SPARK-43540: add current working directory into driver classpath
// SPARK-47475: make download to driver optional so executors may fetch resource from remote
// url directly to avoid overwhelming driver network when resource is big and executor count
// is high
val workingDirectory = "."
childClasspath += workingDirectory
def downloadResourcesToCurrentDirectory(
uris: String,
isArchive: Boolean = false,
avoidDownload: String => Boolean = _ => false): String = {
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val (avoidDownloads, toDownloads) =
resolvedUris.partition(uri => avoidDownload(uri.getScheme))
val localResources = downloadFileList(
toDownloads.map(
Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf)
(Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map {
case (localResources, resolvedUri) =>
val source = new File(localResources.getPath).getCanonicalFile
val dest = new File(
workingDirectory,
if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
.getCanonicalFile
logInfo(log"Files ${MDC(LogKeys.URI, resolvedUri)}" +
log" from ${MDC(LogKeys.SOURCE_PATH, source)}" +
log" to ${MDC(LogKeys.DESTINATION_PATH, dest)}")
Utils.deleteRecursively(dest)
if (isArchive) {
Utils.unpack(source, dest)
} else {
Files.copy(source.toPath, dest.toPath)
}
// Keep the URIs of local files with the given fragments.
Utils.getUriBuilder(
localResources).fragment(resolvedUri.getFragment).build().toString
} ++ avoidDownloads.map(_.toString)).mkString(",")
}
val avoidJarDownloadSchemes = sparkConf.get(KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES)
def avoidJarDownload(scheme: String): Boolean =
avoidJarDownloadSchemes.contains("*") || avoidJarDownloadSchemes.contains(scheme)
val filesLocalFiles = Option(args.files).map {
downloadResourcesToCurrentDirectory(_)
}.orNull
val updatedJars = Option(args.jars).map {
downloadResourcesToCurrentDirectory(_, avoidDownload = avoidJarDownload)
}.orNull
val archiveLocalFiles = Option(args.archives).map {
downloadResourcesToCurrentDirectory(_, true)
}.orNull
val pyLocalFiles = Option(args.pyFiles).map {
downloadResourcesToCurrentDirectory(_)
}.orNull
args.files = filesLocalFiles
args.archives = archiveLocalFiles
args.pyFiles = pyLocalFiles
args.jars = updatedJars
}
}
// When running in YARN, for some remote resources with scheme:
// 1. Hadoop FileSystem doesn't support them.
// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
// We will download them to local disk prior to add to YARN's distributed cache.
// For yarn client mode, since we already download them with above code, so we only need to
// figure out the local path and replace the remote one.
if (clusterManager == YARN) {
val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)
def shouldDownload(scheme: String): Boolean = {
forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
}
def downloadResource(resource: String): String = {
val uri = Utils.resolveURI(resource)
uri.getScheme match {
case "local" | "file" => resource
case e if shouldDownload(e) =>
val file = new File(targetDir, new Path(uri).getName)
if (file.exists()) {
file.toURI.toString
} else {
downloadFile(resource, targetDir, sparkConf, hadoopConf)
}
case _ => uri.toString
}
}
args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
args.files = Option(args.files).map { files =>
Utils.stringToSeq(files).map(downloadResource).mkString(",")
}.orNull
args.pyFiles = Option(args.pyFiles).map { pyFiles =>
Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
}.orNull
args.jars = Option(args.jars).map { jars =>
Utils.stringToSeq(jars).map(downloadResource).mkString(",")
}.orNull
args.archives = Option(args.archives).map { archives =>
Utils.stringToSeq(archives).map(downloadResource).mkString(",")
}.orNull
}
// At this point, we have attempted to download all remote resources.
// Now we try to resolve the main class if our primary resource is a JAR.
if (args.mainClass == null && !args.isPython && !args.isR) {
try {
val uri = new URI(
Option(localPrimaryResource).getOrElse(args.primaryResource)
)
val fs = FileSystem.get(uri, hadoopConf)
Utils.tryWithResource(new JarInputStream(fs.open(new Path(uri)))) { jar =>
args.mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
}
} catch {
case e: Throwable =>
error(
s"Failed to get main class in JAR with error '${e.getMessage}'. " +
" Please specify one with --class."
)
}
if (args.mainClass == null) {
// If we still can't figure out the main class at this point, blow up.
error("No main class set in JAR; please specify one with --class.")
}
}
// If we're running a python app, set the main class to our specific python runner
if (args.isPython && deployMode == CLIENT) {
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
}
}
// Non-PySpark applications can need Python dependencies.
if (deployMode == CLIENT && clusterManager != YARN) {
// The YARN backend handles python files differently, so don't merge the lists.
args.files = mergeFileLists(args.files, args.pyFiles)
}
if (localPyFiles != null) {
sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toImmutableArraySeq)
}
// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
if (args.isR && clusterManager == YARN) {
val sparkRPackagePath = RUtils.localSparkRPackagePath
if (sparkRPackagePath.isEmpty) {
error("SPARK_HOME does not exist for R application in YARN mode.")
}
val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
if (!sparkRPackageFile.exists()) {
error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
}
val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString
// Distribute the SparkR package.
// Assigns a symbol link name "sparkr" to the shipped package.
args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")
// Distribute the R package archive containing all the built R packages.
if (!RUtils.rPackages.isEmpty) {
val rPackageFile =
RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
if (!rPackageFile.exists()) {
error("Failed to zip all the built R packages.")
}
val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
// Assigns a symbol link name "rpkg" to the shipped package.
args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
}
}
// TODO: Support distributing R packages with standalone cluster
if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
error("Distributing R packages with standalone cluster is not supported.")
}
// If we're running an R app, set the main class to our specific R runner
if (args.isR && deployMode == CLIENT) {
if (args.primaryResource == SPARKR_SHELL) {
args.mainClass = "org.apache.spark.api.r.RBackend"
} else {
// If an R file is provided, add it to the child arguments and list of files to deploy.
// Usage: RRunner <main R file> [app arguments]
args.mainClass = "org.apache.spark.deploy.RRunner"
args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
args.files = mergeFileLists(args.files, args.primaryResource)
}
}
if (isYarnCluster && args.isR) {
// In yarn-cluster mode for an R app, add primary resource to files
// that can be distributed with the job
args.files = mergeFileLists(args.files, args.primaryResource)
}
// Special flag to avoid deprecation warnings at the client
sys.props("SPARK_SUBMIT") = "true"
// A list of rules to map each argument to system properties or command-line options in
// each deploy mode; we iterate through these below
val options = List[OptionAssigner](
// All cluster managers
OptionAssigner(
// If remote is not set, sets the master,
if (args.maybeRemote.isEmpty) args.master
else args.maybeMaster.orNull,
ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
OptionAssigner(
args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = SUBMIT_DEPLOY_MODE.key),
OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT,
confKey = JAR_IVY_REPO_PATH.key),
OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = DRIVER_CLASS_PATH.key),
OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = DRIVER_JAVA_OPTIONS.key),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = DRIVER_LIBRARY_PATH.key),
OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = PRINCIPAL.key),
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = KEYTAB.key),
OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),
// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | KUBERNETES,
CLUSTER, confKey = JAR_PACKAGES.key),
OptionAssigner(args.repositories, STANDALONE | KUBERNETES,
CLUSTER, confKey = JAR_REPOSITORIES.key),
OptionAssigner(args.ivyRepoPath, STANDALONE | KUBERNETES,
CLUSTER, confKey = JAR_IVY_REPO_PATH.key),
OptionAssigner(args.packagesExclusions, STANDALONE | KUBERNETES,
CLUSTER, confKey = JAR_PACKAGES_EXCLUSIONS.key),
// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives",
mergeFn = Some(mergeFileLists(_, _))),
// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_INSTANCES.key),
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_CORES.key),
OptionAssigner(args.executorMemory, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
confKey = EXECUTOR_MEMORY.key),
OptionAssigner(args.totalExecutorCores, STANDALONE, ALL_DEPLOY_MODES,
confKey = CORES_MAX.key),
OptionAssigner(args.files, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
confKey = FILES.key),
OptionAssigner(args.archives, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
confKey = ARCHIVES.key),
OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
OptionAssigner(args.jars, STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
confKey = JARS.key),
OptionAssigner(args.driverMemory, STANDALONE | YARN | KUBERNETES, CLUSTER,
confKey = DRIVER_MEMORY.key),
OptionAssigner(args.driverCores, STANDALONE | YARN | KUBERNETES, CLUSTER,
confKey = DRIVER_CORES.key),
OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
confKey = DRIVER_SUPERVISE.key),
OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = JAR_IVY_REPO_PATH.key),
// An internal option used only for spark-shell to add user jars to repl's classloader,
// previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
// remote jars, so adding a new option to only specify local jars for spark-shell internally.
OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
)
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass
if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
childClasspath += localPrimaryResource
}
if (localJars != null) { childClasspath ++= localJars.split(",") }
}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, or already downloaded
// to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
// added to the classpath of YARN client.
if (isYarnCluster) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
if (args.jars != null) { childClasspath ++= args.jars.split(",") }
}
if (deployMode == CLIENT) {
if (args.childArgs != null) { childArgs ++= args.childArgs }
}
// Map all arguments to command-line options or system properties for our chosen mode
for (opt <- options) {
if (opt.value != null &&
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += opt.clOption += opt.value }
if (opt.confKey != null) {
// Used in SparkConnectClient because Spark Connect client does not have SparkConf.
if (opt.confKey == "spark.remote") System.setProperty("spark.remote", opt.value)
if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
} else {
sparkConf.set(opt.confKey, opt.value)
}
}
}
}
// In case of shells, spark.ui.showConsoleProgress can be true by default or by user. Except,
// when Spark Connect is in local mode, because Spark Connect support its own progress
// reporting.
if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
}
// Add the application jar automatically so the user doesn't have to call sc.addJar
// For isKubernetesClusterModeDriver, the jar is already added in the previous spark-submit
// For YARN cluster mode, the jar is already distributed on each node as "app.jar"
// For python and R files, the primary resource is already distributed as a regular file
if (!isKubernetesClusterModeDriver && !isYarnCluster && !args.isPython && !args.isR) {
var jars = sparkConf.get(JARS)
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sparkConf.set(JARS, jars)
}
// In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
// All Spark parameters are expected to be passed to the client through system properties.
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = REST_CLUSTER_SUBMIT_CLASS
childArgs += args.primaryResource += args.mainClass
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += "--memory" += m }
Option(args.driverCores).foreach { c => childArgs += "--cores" += c }
childArgs += "launch"
childArgs += args.master += args.primaryResource += args.mainClass
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN) {
if (args.isPython) {
sparkConf.set("spark.yarn.isPython", "true")
}
}
if (clusterManager == KUBERNETES && UserGroupInformation.isSecurityEnabled) {
setRMPrincipal(sparkConf)
}
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
if (args.isPython) {
childArgs += "--primary-py-file" += args.primaryResource
childArgs += "--class" += "org.apache.spark.deploy.PythonRunner"
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += "--primary-r-file" += mainFile
childArgs += "--class" += "org.apache.spark.deploy.RRunner"
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += "--jar" += args.primaryResource
}
childArgs += "--class" += args.mainClass
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += "--arg" += arg }
}
}
if (isKubernetesCluster) {
childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
if (args.isPython) {
childArgs ++= Array("--primary-py-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
childArgs ++= Array("--primary-r-file", args.primaryResource)
childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
}
else {
childArgs ++= Array("--primary-java-resource", args.primaryResource)
childArgs ++= Array("--main-class", args.mainClass)
}
} else {
childArgs ++= Array("--main-class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg =>
childArgs += "--arg" += arg
}
}
// Pass the proxyUser to the k8s app so it is possible to add it to the driver args
if (args.proxyUser != null) {
childArgs += "--proxy-user" += args.proxyUser
}
}
// Load any properties specified through --conf and the default properties file
for ((k, v) <- args.sparkProperties) {
sparkConf.setIfMissing(k, v)
}
// Ignore invalid spark.driver.host in cluster modes.
if (deployMode == CLUSTER) {
sparkConf.remove(DRIVER_HOST_ADDRESS)
}
// Resolve paths in certain spark properties
val pathConfigs = Seq(
JARS.key,
FILES.key,
ARCHIVES.key,
"spark.yarn.dist.files",
"spark.yarn.dist.archives",
"spark.yarn.dist.jars")
pathConfigs.foreach { config =>
// Replace old URIs with resolved URIs, if they exist
sparkConf.getOption(config).foreach { oldValue =>
sparkConf.set(config, Utils.resolveURIs(oldValue))
}
}
// Resolve and format python file paths properly before adding them to the PYTHONPATH.
// The resolving part is redundant in the case of --py-files, but necessary if the user
// explicitly sets `spark.submit.pyFiles` in his/her default properties file.
val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
val formattedPyFiles = if (deployMode != CLUSTER) {
PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
} else {
// Ignoring formatting python path in yarn cluster mode, these two modes
// support dealing with remote python files, they could distribute and add python files
// locally.
resolvedPyFiles
}
sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toImmutableArraySeq)
if (args.verbose && isSqlShell(childMainClass)) {
childArgs ++= Seq("--verbose")
}
val setSubmitTimeInClusterModeDriver =
sparkConf.getBoolean("spark.kubernetes.setSubmitTimeInDriver", true)
if (!sparkConf.contains("spark.app.submitTime")
|| isKubernetesClusterModeDriver && setSubmitTimeInClusterModeDriver) {
sparkConf.set("spark.app.submitTime", System.currentTimeMillis().toString)
}
if (childClasspath.nonEmpty && isCustomClasspathInClusterModeDisallowed) {
childClasspath.clear()
logWarning(log"Ignore classpath " +
log"${MDC(LogKeys.CLASS_PATH, childClasspath.mkString(", "))} " +
log"with proxy user specified in Cluster mode when " +
log"${MDC(LogKeys.CONFIG, ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE.key)} is " +
log"disabled")
}
(childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
}