override def doApply()

in src/spark-project/engine-spark/src/main/scala/org/apache/spark/conf/rule/SparkConfRule.scala [51:84]


  override def doApply(helper: SparkConfHelper): Unit = {
    val userDefinedMemory = helper.getConf(SparkConfHelper.EXECUTOR_MEMORY)
    if (StringUtils.isNotBlank(userDefinedMemory)) {
      return
    }
    if (StringUtils.isBlank(helper.getOption(SparkConfHelper.SOURCE_TABLE_SIZE))) {
      logInfo(s"Source table size is Empty, skip ${getClass.getName}")
      return
    }
    val sourceMB = Utils.byteStringAsMb(helper.getOption(SparkConfHelper.SOURCE_TABLE_SIZE))
    val sourceGB = sourceMB / 1000
    val hasCountDistinct = helper.hasCountDistinct
    var memory = sourceGB match {
      case _ if `sourceGB` >= 100 && `hasCountDistinct` =>
        "20GB"
      case _ if (`sourceGB` >= 100) || (`sourceGB` >= 10 && `hasCountDistinct`) =>
        "16GB"
      case _ if `sourceGB` >= 10 || (`sourceGB` >= 1 && `hasCountDistinct`) =>
        "10GB"
      case _ if `sourceMB` >= 10 =>
        "4GB"
      case _ =>
        "1GB"
    }
    if (helper.getClusterManager.fetchMaximumResourceAllocation != null) {
      val maxResourceMemory = helper.getClusterManager.fetchMaximumResourceAllocation.memory
      val mp = KylinBuildEnv.get().kylinConfig.getMaxAllocationResourceProportion
      val maxMemoryMb = maxResourceMemory * mp
      if (Utils.byteStringAsMb(memory) > maxMemoryMb) {
        memory = (maxMemoryMb - 1).toInt + "MB"
      }
    }
    helper.setConf(SparkConfHelper.EXECUTOR_MEMORY, memory)
  }