private[deploy] def prepareSubmitEnvironment()

in core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala [244:923]


  private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String], Seq[String], SparkConf, String) = {
    // Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = args.toSparkConf()
    var childMainClass = ""

    // Set the cluster manager
    val clusterManager: Int = args.maybeMaster match {
      case Some(v) =>
        assert(args.maybeRemote.isEmpty)
        v match {
          case "yarn" => YARN
          case m if m.startsWith("spark") => STANDALONE
          case m if m.startsWith("k8s") => KUBERNETES
          case m if m.startsWith("local") => LOCAL
          case _ =>
            error("Master must either be yarn or start with spark, k8s, or local")
            -1
        }
      case None => LOCAL // default master or remote mode.
    }

    // Set the deploy mode; default is client mode
    val deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ =>
        error("Deploy mode must be either client or cluster")
        -1
    }

    if (clusterManager == YARN) {
      // Make sure YARN is included in our build if we're trying to use it
      if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
        error(
          "Could not load YARN classes. " +
          "This copy of Spark may not have been compiled with YARN support.")
      }
    }

    if (clusterManager == KUBERNETES) {
      args.maybeMaster = Option(Utils.checkAndGetK8sMasterUrl(args.master))
      // Make sure KUBERNETES is included in our build if we're trying to use it
      if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
        error(
          "Could not load KUBERNETES classes. " +
            "This copy of Spark may not have been compiled with KUBERNETES support.")
      }
    }

    // Fail fast, the following modes are not supported or applicable
    (clusterManager, deployMode) match {
      case (STANDALONE, CLUSTER) if args.isPython =>
        error("Cluster deploy mode is currently not supported for python " +
          "applications on standalone clusters.")
      case (STANDALONE, CLUSTER) if args.isR =>
        error("Cluster deploy mode is currently not supported for R " +
          "applications on standalone clusters.")
      case (LOCAL, CLUSTER) =>
        error("Cluster deploy mode is not compatible with master \"local\"")
      case (_, CLUSTER) if isShell(args.primaryResource) =>
        error("Cluster deploy mode is not applicable to Spark shells.")
      case (_, CLUSTER) if isSqlShell(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark SQL shell.")
      case (_, CLUSTER) if isThriftServer(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark Thrift server.")
      case (_, CLUSTER) if isConnectServer(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark Connect server.")
      case _ =>
    }

    // Update args.deployMode if it is null. It will be passed down as a Spark property later.
    (args.deployMode, deployMode) match {
      case (null, CLIENT) => args.deployMode = "client"
      case (null, CLUSTER) => args.deployMode = "cluster"
      case _ =>
    }
    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
    val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
    val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
    val isKubernetesClient = clusterManager == KUBERNETES && deployMode == CLIENT
    val isKubernetesClusterModeDriver = isKubernetesClient &&
      sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
    val isCustomClasspathInClusterModeDisallowed =
      !sparkConf.get(ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE) &&
      args.proxyUser != null &&
      (isYarnCluster || isStandAloneCluster || isKubernetesCluster)

    if (!isStandAloneCluster) {
      // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
      // too for packages that include Python code
      val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
        packagesTransitive = true, args.packagesExclusions, args.packages,
        args.repositories, args.ivyRepoPath, args.ivySettingsPath)

      if (resolvedMavenCoordinates.nonEmpty) {
        if (isKubernetesCluster) {
          // We need this in K8s cluster mode so that we can upload local deps
          // via the k8s application, like in cluster mode driver
          childClasspath ++= resolvedMavenCoordinates
        } else {
          // In K8s client mode, when in the driver, add resolved jars early as we might need
          // them at the submit time for artifact downloading.
          // For example we might use the dependencies for downloading
          // files from a Hadoop Compatible fs e.g. S3. In this case the user might pass:
          // --packages com.amazonaws:aws-java-sdk:1.7.4:org.apache.hadoop:hadoop-aws:2.7.6
          if (isKubernetesClusterModeDriver) {
            val loader = getSubmitClassLoader(sparkConf)
            for (jar <- resolvedMavenCoordinates) {
              addJarToClasspath(jar, loader)
            }
          }

          args.jars = mergeFileLists(args.jars, mergeFileLists(resolvedMavenCoordinates: _*))
          if (args.isPython || isInternal(args.primaryResource)) {
            args.pyFiles = mergeFileLists(args.pyFiles,
              mergeFileLists(resolvedMavenCoordinates: _*))
          }
        }
      }

      // install any R packages that may have been passed through --jars or --packages.
      // Spark Packages may contain R source code inside the jar.
      if (args.isR && !StringUtils.isBlank(args.jars)) {
        RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
      }
    }

    // update spark config from args
    args.toSparkConf(Option(sparkConf))
    val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
    val targetDir = Utils.createTempDir()

    // Kerberos is not supported in standalone mode
    if (clusterManager != STANDALONE
        && args.principal != null
        && args.keytab != null) {
      // If client mode, make sure the keytab is just a local path.
      if (deployMode == CLIENT && Utils.isLocalUri(args.keytab)) {
        args.keytab = new URI(args.keytab).getPath()
      }

      if (!Utils.isLocalUri(args.keytab)) {
        require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
        UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
      }
    }

    // Resolve glob path for different resources.
    args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull


    // In client mode, download remote files.
    var localPrimaryResource: String = null
    var localJars: String = null
    var localPyFiles: String = null
    if (deployMode == CLIENT) {
      localPrimaryResource = Option(args.primaryResource).map {
        downloadFile(_, targetDir, sparkConf, hadoopConf)
      }.orNull
      localJars = Option(args.jars).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf)
      }.orNull
      localPyFiles = Option(args.pyFiles).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf)
      }.orNull

      if (isKubernetesClusterModeDriver) {
        // SPARK-33748: this mimics the behaviour of Yarn cluster mode. If the driver is running
        // in cluster mode, the archives should be available in the driver's current working
        // directory too.
        // SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current
        // working directory
        // SPARK-43540: add current working directory into driver classpath
        // SPARK-47475: make download to driver optional so executors may fetch resource from remote
        // url directly to avoid overwhelming driver network when resource is big and executor count
        // is high
        val workingDirectory = "."
        childClasspath += workingDirectory
        def downloadResourcesToCurrentDirectory(
            uris: String,
            isArchive: Boolean = false,
            avoidDownload: String => Boolean = _ => false): String = {
          val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
          val (avoidDownloads, toDownloads) =
            resolvedUris.partition(uri => avoidDownload(uri.getScheme))
          val localResources = downloadFileList(
            toDownloads.map(
              Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
            targetDir, sparkConf, hadoopConf)
          (Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map {
            case (localResources, resolvedUri) =>
              val source = new File(localResources.getPath).getCanonicalFile
              val dest = new File(
                workingDirectory,
                if (resolvedUri.getFragment != null) resolvedUri.getFragment else source.getName)
                .getCanonicalFile
              logInfo(log"Files ${MDC(LogKeys.URI, resolvedUri)}" +
                log" from ${MDC(LogKeys.SOURCE_PATH, source)}" +
                log" to ${MDC(LogKeys.DESTINATION_PATH, dest)}")
              Utils.deleteRecursively(dest)
              if (isArchive) {
                Utils.unpack(source, dest)
              } else {
                Files.copy(source.toPath, dest.toPath)
              }
              // Keep the URIs of local files with the given fragments.
              Utils.getUriBuilder(
                localResources).fragment(resolvedUri.getFragment).build().toString
          } ++ avoidDownloads.map(_.toString)).mkString(",")
        }

        val avoidJarDownloadSchemes = sparkConf.get(KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES)

        def avoidJarDownload(scheme: String): Boolean =
          avoidJarDownloadSchemes.contains("*") || avoidJarDownloadSchemes.contains(scheme)

        val filesLocalFiles = Option(args.files).map {
          downloadResourcesToCurrentDirectory(_)
        }.orNull
        val updatedJars = Option(args.jars).map {
          downloadResourcesToCurrentDirectory(_, avoidDownload = avoidJarDownload)
        }.orNull
        val archiveLocalFiles = Option(args.archives).map {
          downloadResourcesToCurrentDirectory(_, true)
        }.orNull
        val pyLocalFiles = Option(args.pyFiles).map {
          downloadResourcesToCurrentDirectory(_)
        }.orNull
        args.files = filesLocalFiles
        args.archives = archiveLocalFiles
        args.pyFiles = pyLocalFiles
        args.jars = updatedJars
      }
    }

    // When running in YARN, for some remote resources with scheme:
    //   1. Hadoop FileSystem doesn't support them.
    //   2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
    // We will download them to local disk prior to add to YARN's distributed cache.
    // For yarn client mode, since we already download them with above code, so we only need to
    // figure out the local path and replace the remote one.
    if (clusterManager == YARN) {
      val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)

      def shouldDownload(scheme: String): Boolean = {
        forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
          Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
      }

      def downloadResource(resource: String): String = {
        val uri = Utils.resolveURI(resource)
        uri.getScheme match {
          case "local" | "file" => resource
          case e if shouldDownload(e) =>
            val file = new File(targetDir, new Path(uri).getName)
            if (file.exists()) {
              file.toURI.toString
            } else {
              downloadFile(resource, targetDir, sparkConf, hadoopConf)
            }
          case _ => uri.toString
        }
      }

      args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
      args.files = Option(args.files).map { files =>
        Utils.stringToSeq(files).map(downloadResource).mkString(",")
      }.orNull
      args.pyFiles = Option(args.pyFiles).map { pyFiles =>
        Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
      }.orNull
      args.jars = Option(args.jars).map { jars =>
        Utils.stringToSeq(jars).map(downloadResource).mkString(",")
      }.orNull
      args.archives = Option(args.archives).map { archives =>
        Utils.stringToSeq(archives).map(downloadResource).mkString(",")
      }.orNull
    }

    // At this point, we have attempted to download all remote resources.
    // Now we try to resolve the main class if our primary resource is a JAR.
    if (args.mainClass == null && !args.isPython && !args.isR) {
      try {
        val uri = new URI(
          Option(localPrimaryResource).getOrElse(args.primaryResource)
        )
        val fs = FileSystem.get(uri, hadoopConf)

        Utils.tryWithResource(new JarInputStream(fs.open(new Path(uri)))) { jar =>
          args.mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
        }
      } catch {
        case e: Throwable =>
          error(
            s"Failed to get main class in JAR with error '${e.getMessage}'. " +
            " Please specify one with --class."
          )
      }

      if (args.mainClass == null) {
        // If we still can't figure out the main class at this point, blow up.
        error("No main class set in JAR; please specify one with --class.")
      }
    }

    // If we're running a python app, set the main class to our specific python runner
    if (args.isPython && deployMode == CLIENT) {
      if (args.primaryResource == PYSPARK_SHELL) {
        args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
      } else {
        // If a python file is provided, add it to the child arguments and list of files to deploy.
        // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
        args.mainClass = "org.apache.spark.deploy.PythonRunner"
        args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
      }
    }

    // Non-PySpark applications can need Python dependencies.
    if (deployMode == CLIENT && clusterManager != YARN) {
      // The YARN backend handles python files differently, so don't merge the lists.
      args.files = mergeFileLists(args.files, args.pyFiles)
    }

    if (localPyFiles != null) {
      sparkConf.set(SUBMIT_PYTHON_FILES, localPyFiles.split(",").toImmutableArraySeq)
    }

    // In YARN mode for an R app, add the SparkR package archive and the R package
    // archive containing all of the built R libraries to archives so that they can
    // be distributed with the job
    if (args.isR && clusterManager == YARN) {
      val sparkRPackagePath = RUtils.localSparkRPackagePath
      if (sparkRPackagePath.isEmpty) {
        error("SPARK_HOME does not exist for R application in YARN mode.")
      }
      val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
      if (!sparkRPackageFile.exists()) {
        error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
      }
      val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

      // Distribute the SparkR package.
      // Assigns a symbol link name "sparkr" to the shipped package.
      args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

      // Distribute the R package archive containing all the built R packages.
      if (!RUtils.rPackages.isEmpty) {
        val rPackageFile =
          RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
        if (!rPackageFile.exists()) {
          error("Failed to zip all the built R packages.")
        }

        val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
        // Assigns a symbol link name "rpkg" to the shipped package.
        args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
      }
    }

    // TODO: Support distributing R packages with standalone cluster
    if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
      error("Distributing R packages with standalone cluster is not supported.")
    }

    // If we're running an R app, set the main class to our specific R runner
    if (args.isR && deployMode == CLIENT) {
      if (args.primaryResource == SPARKR_SHELL) {
        args.mainClass = "org.apache.spark.api.r.RBackend"
      } else {
        // If an R file is provided, add it to the child arguments and list of files to deploy.
        // Usage: RRunner <main R file> [app arguments]
        args.mainClass = "org.apache.spark.deploy.RRunner"
        args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
        args.files = mergeFileLists(args.files, args.primaryResource)
      }
    }

    if (isYarnCluster && args.isR) {
      // In yarn-cluster mode for an R app, add primary resource to files
      // that can be distributed with the job
      args.files = mergeFileLists(args.files, args.primaryResource)
    }

    // Special flag to avoid deprecation warnings at the client
    sys.props("SPARK_SUBMIT") = "true"

    // A list of rules to map each argument to system properties or command-line options in
    // each deploy mode; we iterate through these below
    val options = List[OptionAssigner](

      // All cluster managers
      OptionAssigner(
        // If remote is not set, sets the master,
        if (args.maybeRemote.isEmpty) args.master
        else args.maybeMaster.orNull,
        ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
      OptionAssigner(
        args.maybeRemote.orNull, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.remote"),
      OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = SUBMIT_DEPLOY_MODE.key),
      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
      OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT,
        confKey = JAR_IVY_REPO_PATH.key),
      OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
        confKey = DRIVER_MEMORY.key),
      OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = DRIVER_CLASS_PATH.key),
      OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = DRIVER_JAVA_OPTIONS.key),
      OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = DRIVER_LIBRARY_PATH.key),
      OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = PRINCIPAL.key),
      OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = KEYTAB.key),
      OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key),

      // Propagate attributes for dependency resolution at the driver side
      OptionAssigner(args.packages, STANDALONE | KUBERNETES,
        CLUSTER, confKey = JAR_PACKAGES.key),
      OptionAssigner(args.repositories, STANDALONE | KUBERNETES,
        CLUSTER, confKey = JAR_REPOSITORIES.key),
      OptionAssigner(args.ivyRepoPath, STANDALONE | KUBERNETES,
        CLUSTER, confKey = JAR_IVY_REPO_PATH.key),
      OptionAssigner(args.packagesExclusions, STANDALONE | KUBERNETES,
        CLUSTER, confKey = JAR_PACKAGES_EXCLUSIONS.key),

      // Yarn only
      OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles",
        mergeFn = Some(mergeFileLists(_, _))),
      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars",
        mergeFn = Some(mergeFileLists(_, _))),
      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files",
        mergeFn = Some(mergeFileLists(_, _))),
      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives",
        mergeFn = Some(mergeFileLists(_, _))),

      // Other options
      OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = EXECUTOR_INSTANCES.key),
      OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = EXECUTOR_CORES.key),
      OptionAssigner(args.executorMemory, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = EXECUTOR_MEMORY.key),
      OptionAssigner(args.totalExecutorCores, STANDALONE, ALL_DEPLOY_MODES,
        confKey = CORES_MAX.key),
      OptionAssigner(args.files, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = FILES.key),
      OptionAssigner(args.archives, LOCAL | STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = ARCHIVES.key),
      OptionAssigner(args.jars, LOCAL, CLIENT, confKey = JARS.key),
      OptionAssigner(args.jars, STANDALONE | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = JARS.key),
      OptionAssigner(args.driverMemory, STANDALONE | YARN | KUBERNETES, CLUSTER,
        confKey = DRIVER_MEMORY.key),
      OptionAssigner(args.driverCores, STANDALONE | YARN | KUBERNETES, CLUSTER,
        confKey = DRIVER_CORES.key),
      OptionAssigner(args.supervise.toString, STANDALONE, CLUSTER,
        confKey = DRIVER_SUPERVISE.key),
      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = JAR_IVY_REPO_PATH.key),

      // An internal option used only for spark-shell to add user jars to repl's classloader,
      // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
      // remote jars, so adding a new option to only specify local jars for spark-shell internally.
      OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
    )

    // In client mode, launch the application main class directly
    // In addition, add the main application jar and any added jars (if any) to the classpath
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      }
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    }
    // Add the main application jar and any added jars to classpath in case YARN client
    // requires these jars.
    // This assumes both primaryResource and user jars are local jars, or already downloaded
    // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
    // added to the classpath of YARN client.
    if (isYarnCluster) {
      if (isUserJar(args.primaryResource)) {
        childClasspath += args.primaryResource
      }
      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
    }

    if (deployMode == CLIENT) {
      if (args.childArgs != null) { childArgs ++= args.childArgs }
    }

    // Map all arguments to command-line options or system properties for our chosen mode
    for (opt <- options) {
      if (opt.value != null &&
          (deployMode & opt.deployMode) != 0 &&
          (clusterManager & opt.clusterManager) != 0) {
        if (opt.clOption != null) { childArgs += opt.clOption += opt.value }
        if (opt.confKey != null) {
          // Used in SparkConnectClient because Spark Connect client does not have SparkConf.
          if (opt.confKey == "spark.remote") System.setProperty("spark.remote", opt.value)
          if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
            sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
          } else {
            sparkConf.set(opt.confKey, opt.value)
          }
        }
      }
    }

    // In case of shells, spark.ui.showConsoleProgress can be true by default or by user. Except,
    // when Spark Connect is in local mode, because Spark Connect support its own progress
    // reporting.
    if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
      sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
    }

    // Add the application jar automatically so the user doesn't have to call sc.addJar
    // For isKubernetesClusterModeDriver, the jar is already added in the previous spark-submit
    // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
    // For python and R files, the primary resource is already distributed as a regular file
    if (!isKubernetesClusterModeDriver && !isYarnCluster && !args.isPython && !args.isR) {
      var jars = sparkConf.get(JARS)
      if (isUserJar(args.primaryResource)) {
        jars = jars ++ Seq(args.primaryResource)
      }
      sparkConf.set(JARS, jars)
    }

    // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
    // All Spark parameters are expected to be passed to the client through system properties.
    if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += args.primaryResource += args.mainClass
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += "--memory" += m }
        Option(args.driverCores).foreach { c => childArgs += "--cores" += c }
        childArgs += "launch"
        childArgs += args.master += args.primaryResource += args.mainClass
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

    // Let YARN know it's a pyspark app, so it distributes needed libraries.
    if (clusterManager == YARN) {
      if (args.isPython) {
        sparkConf.set("spark.yarn.isPython", "true")
      }
    }

    if (clusterManager == KUBERNETES && UserGroupInformation.isSecurityEnabled) {
      setRMPrincipal(sparkConf)
    }

    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        childArgs += "--primary-py-file" += args.primaryResource
        childArgs += "--class" += "org.apache.spark.deploy.PythonRunner"
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += "--primary-r-file" += mainFile
        childArgs += "--class" += "org.apache.spark.deploy.RRunner"
      } else {
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          childArgs += "--jar" += args.primaryResource
        }
        childArgs += "--class" += args.mainClass
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += "--arg" += arg }
      }
    }

    if (isKubernetesCluster) {
      childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
      if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
        if (args.isPython) {
          childArgs ++= Array("--primary-py-file", args.primaryResource)
          childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
        } else if (args.isR) {
          childArgs ++= Array("--primary-r-file", args.primaryResource)
          childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
        }
        else {
          childArgs ++= Array("--primary-java-resource", args.primaryResource)
          childArgs ++= Array("--main-class", args.mainClass)
        }
      } else {
        childArgs ++= Array("--main-class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg =>
          childArgs += "--arg" += arg
        }
      }
      // Pass the proxyUser to the k8s app so it is possible to add it to the driver args
      if (args.proxyUser != null) {
        childArgs += "--proxy-user" += args.proxyUser
      }
    }

    // Load any properties specified through --conf and the default properties file
    for ((k, v) <- args.sparkProperties) {
      sparkConf.setIfMissing(k, v)
    }

    // Ignore invalid spark.driver.host in cluster modes.
    if (deployMode == CLUSTER) {
      sparkConf.remove(DRIVER_HOST_ADDRESS)
    }

    // Resolve paths in certain spark properties
    val pathConfigs = Seq(
      JARS.key,
      FILES.key,
      ARCHIVES.key,
      "spark.yarn.dist.files",
      "spark.yarn.dist.archives",
      "spark.yarn.dist.jars")
    pathConfigs.foreach { config =>
      // Replace old URIs with resolved URIs, if they exist
      sparkConf.getOption(config).foreach { oldValue =>
        sparkConf.set(config, Utils.resolveURIs(oldValue))
      }
    }

    // Resolve and format python file paths properly before adding them to the PYTHONPATH.
    // The resolving part is redundant in the case of --py-files, but necessary if the user
    // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
    val pyFiles = sparkConf.get(SUBMIT_PYTHON_FILES)
    val resolvedPyFiles = Utils.resolveURIs(pyFiles.mkString(","))
    val formattedPyFiles = if (deployMode != CLUSTER) {
      PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
    } else {
      // Ignoring formatting python path in yarn cluster mode, these two modes
      // support dealing with remote python files, they could distribute and add python files
      // locally.
      resolvedPyFiles
    }
    sparkConf.set(SUBMIT_PYTHON_FILES, formattedPyFiles.split(",").toImmutableArraySeq)

    if (args.verbose && isSqlShell(childMainClass)) {
      childArgs ++= Seq("--verbose")
    }

    val setSubmitTimeInClusterModeDriver =
      sparkConf.getBoolean("spark.kubernetes.setSubmitTimeInDriver", true)
    if (!sparkConf.contains("spark.app.submitTime")
      || isKubernetesClusterModeDriver && setSubmitTimeInClusterModeDriver) {
      sparkConf.set("spark.app.submitTime", System.currentTimeMillis().toString)
    }

    if (childClasspath.nonEmpty && isCustomClasspathInClusterModeDisallowed) {
      childClasspath.clear()
      logWarning(log"Ignore classpath " +
        log"${MDC(LogKeys.CLASS_PATH, childClasspath.mkString(", "))} " +
        log"with proxy user specified in Cluster mode when " +
        log"${MDC(LogKeys.CONFIG, ALLOW_CUSTOM_CLASSPATH_BY_PROXY_USER_IN_CLUSTER_MODE.key)} is " +
        log"disabled")
    }

    (childArgs.toSeq, childClasspath.toSeq, sparkConf, childMainClass)
  }