in linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/launch/SparkSubmitProcessEngineConnLaunchBuilder.scala [57:214]
def getCommands(
engineConnBuildRequest: EngineConnBuildRequest,
mainClass: String,
gcLogDir: String,
logDir: String
): Array[String] = {
val properties = engineConnBuildRequest.engineConnCreationDesc.properties
val sparkConf = getValueAndRemove(properties, LINKIS_SPARK_CONF)
// sparkcsonf DEMO:spark.sql.shuffle.partitions=10;spark.memory.fraction=0.6
if (StringUtils.isNotBlank(sparkConf)) {
sparkConf.split(";").filter(StringUtils.isNotBlank(_)).foreach { keyAndValue =>
val values = keyAndValue.split("=")
if (values.length != 2) {
logger.warn(s"spark conf has invalid value, keyAndValue:${keyAndValue}")
} else {
val key = keyAndValue.split("=")(0).trim
val value = keyAndValue.split("=")(1).trim
if (StringUtils.isNotBlank(key) && StringUtils.isNotBlank(value)) {
engineConnBuildRequest.engineConnCreationDesc.properties.put(key, value)
} else {
logger.warn(s"spark conf has empty value, key:${key}, value:${value}")
}
}
}
}
val className = getValueAndRemove(properties, "className", mainClass)
val driverCores = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_CORES)
val driverMemory = getValueAndRemove(properties, LINKIS_SPARK_DRIVER_MEMORY)
val executorCores = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_CORES)
val executorMemory = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_MEMORY)
val numExecutors = getValueAndRemove(properties, LINKIS_SPARK_EXECUTOR_INSTANCES)
val userEngineResource = engineConnBuildRequest.engineResource
val darResource = userEngineResource.getLockedResource.asInstanceOf[DriverAndYarnResource]
val files: ArrayBuffer[String] = getValueAndRemove(properties, "files", "")
.split(",")
.filter(isNotBlankPath)
.toBuffer
.asInstanceOf[ArrayBuffer[String]]
val jars = new ArrayBuffer[String]()
jars ++= getValueAndRemove(properties, "jars", "").split(",").filter(isNotBlankPath)
jars ++= getValueAndRemove(properties, SPARK_DEFAULT_EXTERNAL_JARS_PATH)
.split(",")
.filter(x => {
val isPath = isNotBlankPath(x)
// filter by isFile cannot support this case:
// The cg-linkismanager startup user is inconsistent with the engineconn startup user
// val isFile = (new java.io.File(x)).isFile
logger.info(s"file:${x}, check isPath:${isPath}")
isPath
})
val pyFiles = getValueAndRemove(properties, "py-files", "").split(",").filter(isNotBlankPath)
val archives = getValueAndRemove(properties, "archives", "").split(",").filter(isNotBlankPath)
val queue = if (null != darResource) {
darResource.getYarnResource.getQueueName
} else {
"default"
}
val driverClassPath =
Array(getValueAndRemove(properties, SPARK_DRIVER_CLASSPATH), variable(CLASSPATH))
var userWithCreator: UserWithCreator = UserWithCreator("DefaultUser", "DefaultCreator")
engineConnBuildRequest.labels.asScala.foreach {
case label: UserCreatorLabel =>
userWithCreator = UserWithCreator(label.getUser, label.getCreator)
case _ =>
}
val appName = getValueAndRemove(properties, SPARK_APP_NAME) + "_" + userWithCreator.creator
val commandLine: ArrayBuffer[String] = ArrayBuffer[String]()
commandLine += SPARK_SUBMIT_PATH.getValue
def addOpt(option: String, value: String): Unit = {
if (StringUtils.isNotBlank(value)) {
commandLine += option
commandLine += value
}
}
def addProxyUser(): Unit = {
if (!HadoopConf.KEYTAB_PROXYUSER_ENABLED.getValue) return
val proxyUser = getValueAndRemove(properties, "proxyUser", "")
if (StringUtils.isNotBlank(proxyUser)) {
addOpt("--proxy-user", proxyUser)
} else {
addOpt("--proxy-user", userWithCreator.user)
}
}
def getMemory(memory: String): String = if (StringUtils.isNumeric(memory)) {
memory + "g"
} else {
memory
}
var deployMode: String = SparkConfiguration.SPARK_YARN_CLIENT
val label = LabelUtil.getEngingeConnRuntimeModeLabel(engineConnBuildRequest.labels)
val isYarnClusterMode: Boolean =
if (null != label && label.getModeValue.equals(LabelValueConstant.YARN_CLUSTER_VALUE)) true
else false
if (isYarnClusterMode) {
deployMode = SparkConfiguration.SPARK_YARN_CLUSTER
files ++= Array(s"${variable(PWD)}/conf/linkis-engineconn.properties")
var clusterJars: String = getValueAndRemove(properties, SPARK_YARN_CLUSTER_JARS)
if (StringUtils.isBlank(clusterJars)) {
throw new SparkEngineException(
SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorCode,
SparkErrorCodeSummary.LINKIS_SPARK_YARN_CLUSTER_JARS_ERROR.getErrorDesc
)
}
if (clusterJars.endsWith("/")) {
clusterJars = clusterJars.dropRight(1)
}
jars += s"$clusterJars/*"
}
addOpt("--master", "yarn")
addOpt("--deploy-mode", deployMode)
addOpt("--name", appName)
addProxyUser()
if (jars.isEmpty) {
jars += ""
}
jars += variable(UDF_JARS)
addOpt("--jars", jars.mkString(","))
addOpt("--py-files", pyFiles.mkString(","))
addOpt("--files", files.mkString(","))
addOpt("--archives", archives.mkString(","))
addOpt("--driver-class-path", driverClassPath.mkString(":"))
addOpt("--driver-memory", getMemory(driverMemory))
addOpt("--driver-cores", driverCores.toString)
addOpt("--executor-memory", getMemory(executorMemory))
addOpt("--executor-cores", executorCores.toString)
addOpt("--num-executors", numExecutors.toString)
addOpt("--queue", queue)
getConf(engineConnBuildRequest, gcLogDir, logDir, isYarnClusterMode).foreach {
case (key, value) =>
addOpt("--conf", s"""$key="$value"""")
}
addOpt("--class", className)
addOpt("1>", s"${variable(LOG_DIRS)}/stdout")
addOpt("2>>", s"${variable(LOG_DIRS)}/stderr")
addOpt("", s" ${variable(PWD)}/lib/${ENGINE_JAR.getValue}")
commandLine.toArray.filter(StringUtils.isNotEmpty)
}