in src/spark-project/engine-spark/src/main/scala/org/apache/spark/conf/rule/SparkConfRule.scala [51:84]
override def doApply(helper: SparkConfHelper): Unit = {
val userDefinedMemory = helper.getConf(SparkConfHelper.EXECUTOR_MEMORY)
if (StringUtils.isNotBlank(userDefinedMemory)) {
return
}
if (StringUtils.isBlank(helper.getOption(SparkConfHelper.SOURCE_TABLE_SIZE))) {
logInfo(s"Source table size is Empty, skip ${getClass.getName}")
return
}
val sourceMB = Utils.byteStringAsMb(helper.getOption(SparkConfHelper.SOURCE_TABLE_SIZE))
val sourceGB = sourceMB / 1000
val hasCountDistinct = helper.hasCountDistinct
var memory = sourceGB match {
case _ if `sourceGB` >= 100 && `hasCountDistinct` =>
"20GB"
case _ if (`sourceGB` >= 100) || (`sourceGB` >= 10 && `hasCountDistinct`) =>
"16GB"
case _ if `sourceGB` >= 10 || (`sourceGB` >= 1 && `hasCountDistinct`) =>
"10GB"
case _ if `sourceMB` >= 10 =>
"4GB"
case _ =>
"1GB"
}
if (helper.getClusterManager.fetchMaximumResourceAllocation != null) {
val maxResourceMemory = helper.getClusterManager.fetchMaximumResourceAllocation.memory
val mp = KylinBuildEnv.get().kylinConfig.getMaxAllocationResourceProportion
val maxMemoryMb = maxResourceMemory * mp
if (Utils.byteStringAsMb(memory) > maxMemoryMb) {
memory = (maxMemoryMb - 1).toInt + "MB"
}
}
helper.setConf(SparkConfHelper.EXECUTOR_MEMORY, memory)
}