def getCommands()

in linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala [57:214]


  def getCommands(
      engineConnBuildRequest: EngineConnBuildRequest,
      mainClass: String,
      gcLogDir: String,
      logDir: String
  ): Array[String] = {
    val properties = engineConnBuildRequest.engineConnCreationDesc.properties
    val sparkConf = getValueAndRemove(properties, LINKIS_SPARK_CONF)
    // sparkcsonf DEMO:spark.sql.shuffle.partitions=10;spark.memory.fraction=0.6
    if (StringUtils.isNotBlank(sparkConf)) {
      sparkConf.split(";").filter(StringUtils.isNotBlank(_)).foreach { keyAndValue =>
        val values = keyAndValue.split("=")
        if (values.length != 2) {
          logger.warn(s"spark conf has invalid value, keyAndValue:${keyAndValue}")
        } else {
          val key = keyAndValue.split("=")(0).trim
          val value = keyAndValue.split("=")(1).trim
          if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
            engineConnBuildRequest.engineConnCreationDesc.properties.put(key, value)
          } else {
            logger.warn(s"spark conf has empty value, key:${key}, value:${value}")
          }
        }
      }
    }
    val className = getValueAndRemove(properties, "className", mainClass)
    val driverCores = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_CORES)
    val driverMemory = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_MEMORY)
    val executorCores = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_CORES)
    val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY)
    val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES)

    val userEngineResource = engineConnBuildRequest.engineResource
    val darResource = userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
    val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "")
      .split(",")
      .filter(isNotBlankPath)
      .toBuffer
      .asInstanceOf[ArrayBuffer[String]]
    val jars = new ArrayBuffer[String]()
    jars ++= getValueAndRemove(properties, "jars", "").split(",").filter(isNotBlankPath)
    jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
      .split(",")
      .filter(x => {
        val isPath = isNotBlankPath(x)
        // filter by isFile cannot support this case:
        // The cg-linkismanager startup user is inconsistent with the engineconn startup user

        // val isFile = (new java.io.File(x)).isFile
        logger.info(s"file:${x}, check isPath:${isPath}")
        isPath
      })
    val pyFiles = getValueAndRemove(properties, "py-files", "").split(",").filter(isNotBlankPath)
    val archives = getValueAndRemove(properties, "archives", "").split(",").filter(isNotBlankPath)

    val queue = if (null != darResource) {
      darResource.getYarnResource.getQueueName
    } else {
      "default"
    }

    val driverClassPath =
      Array(getValueAndRemove(properties, SPARK_DRIVER_CLASSPATH), variable(CLASSPATH))

    var userWithCreator: UserWithCreator = UserWithCreator("DefaultUser", "DefaultCreator")
    engineConnBuildRequest.labels.asScala.foreach {
      case label: UserCreatorLabel =>
        userWithCreator = UserWithCreator(label.getUser, label.getCreator)
      case _ =>
    }
    val appName = getValueAndRemove(properties, SPARK_APP_NAME) + "_" + userWithCreator.creator

    val commandLine: ArrayBuffer[String] = ArrayBuffer[String]()
    commandLine += SPARK_SUBMIT_PATH.getValue

    def addOpt(option: String, value: String): Unit = {
      if (StringUtils.isNotBlank(value)) {
        commandLine += option
        commandLine += value
      }
    }

    def addProxyUser(): Unit = {
      if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) return
      val proxyUser = getValueAndRemove(properties, "proxyUser", "")
      if (StringUtils.isNotBlank(proxyUser)) {
        addOpt("--proxy-user", proxyUser)
      } else {
        addOpt("--proxy-user", userWithCreator.user)
      }
    }

    def getMemory(memory: String): String = if (StringUtils.isNumeric(memory)) {
      memory + "g"
    } else {
      memory
    }

    var deployMode: String = SparkConfiguration.SPARK_YARN_CLIENT

    val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels)
    val isYarnClusterMode: Boolean =
      if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
      else false

    if (isYarnClusterMode) {
      deployMode = SparkConfiguration.SPARK_YARN_CLUSTER
      files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties")

      var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS)

      if (StringUtils.isBlank(clusterJars)) {
        throw new SparkEngineException(
          SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode,
          SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc
        )
      }

      if (clusterJars.endsWith("/")) {
        clusterJars = clusterJars.dropRight(1)
      }
      jars += s"$clusterJars/*"
    }

    addOpt("--master", "yarn")
    addOpt("--deploy-mode", deployMode)
    addOpt("--name", appName)
    addProxyUser()

    if (jars.isEmpty) {
      jars += ""
    }
    jars += variable(UDF_JARS)

    addOpt("--jars", jars.mkString(","))
    addOpt("--py-files", pyFiles.mkString(","))
    addOpt("--files", files.mkString(","))
    addOpt("--archives", archives.mkString(","))
    addOpt("--driver-class-path", driverClassPath.mkString(":"))
    addOpt("--driver-memory", getMemory(driverMemory))
    addOpt("--driver-cores", driverCores.toString)
    addOpt("--executor-memory", getMemory(executorMemory))
    addOpt("--executor-cores", executorCores.toString)
    addOpt("--num-executors", numExecutors.toString)
    addOpt("--queue", queue)

    getConf(engineConnBuildRequest, gcLogDir, logDir, isYarnClusterMode).foreach {
      case (key, value) =>
        addOpt("--conf", s"""$key="$value"""")
    }

    addOpt("--class", className)
    addOpt("1>", s"${variable(LOG_DIRS)}/stdout")
    addOpt("2>>", s"${variable(LOG_DIRS)}/stderr")
    addOpt("", s" ${variable(PWD)}/lib/${ENGINE_JAR.getValue}")

    commandLine.toArray.filter(StringUtils.isNotEmpty)
  }