override def postProcessNativeConfig()

in backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala [77:161]


  override def postProcessNativeConfig(
      nativeConfMap: util.Map[String, String],
      backendPrefix: String): Unit = {
    val settingPrefix = backendPrefix + ".runtime_settings."
    if (nativeConfMap.getOrDefault("spark.memory.offHeap.enabled", "false").toBoolean) {
      val offHeapSize =
        nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", "0").toLong
      if (offHeapSize > 0) {
        // Only set default max_bytes_before_external_sort for CH when it is not set explicitly.
        val sortSpillKey = settingPrefix + "max_bytes_before_external_sort";
        if (!nativeConfMap.containsKey(sortSpillKey)) {
          val sortSpillValue = offHeapSize * 0.5
          nativeConfMap.put(sortSpillKey, sortSpillValue.toLong.toString)
        }

        // Only set default max_bytes_before_external_group_by for CH when it is not set explicitly.
        val groupBySpillKey = settingPrefix + "max_bytes_before_external_group_by";
        if (!nativeConfMap.containsKey(groupBySpillKey)) {
          val groupBySpillValue = offHeapSize * 0.5
          nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString)
        }

        val maxMemoryUsageKey = settingPrefix + "max_memory_usage";
        if (!nativeConfMap.containsKey(maxMemoryUsageKey)) {
          val maxMemoryUsageValue = offHeapSize
          nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString)
        }

        // Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash
        val joinAlgorithmKey = settingPrefix + "join_algorithm";
        if (
          nativeConfMap.containsKey(joinAlgorithmKey) &&
          nativeConfMap.get(joinAlgorithmKey) == "grace_hash"
        ) {
          val joinSpillKey = settingPrefix + "max_bytes_in_join";
          if (!nativeConfMap.containsKey(joinSpillKey)) {
            val joinSpillValue = offHeapSize * 0.7
            nativeConfMap.put(joinSpillKey, joinSpillValue.toLong.toString)
          }
        }
      }
    }

    val injectConfig: (String, String) => Unit = (srcKey, dstKey) => {
      if (nativeConfMap.containsKey(srcKey) && !nativeConfMap.containsKey(dstKey)) {
        nativeConfMap.put(dstKey, nativeConfMap.get(srcKey))
      }
    }

    val hdfsConfigPrefix = backendPrefix + ".runtime_config.hdfs."
    injectConfig("spark.hadoop.input.connect.timeout", hdfsConfigPrefix + "input_connect_timeout")
    injectConfig("spark.hadoop.input.read.timeout", hdfsConfigPrefix + "input_read_timeout")
    injectConfig("spark.hadoop.input.write.timeout", hdfsConfigPrefix + "input_write_timeout")
    injectConfig(
      "spark.hadoop.dfs.client.log.severity",
      hdfsConfigPrefix + "dfs_client_log_severity")

    // TODO: set default to true when metrics could be collected
    // while ch query plan optimization is enabled.
    val planOptKey = settingPrefix + "query_plan_enable_optimizations"
    if (!nativeConfMap.containsKey(planOptKey)) {
      nativeConfMap.put(planOptKey, "false")
    }

    // Respect spark config spark.sql.orc.compression.codec for CH backend
    // TODO: consider compression or orc.compression in table options.
    val orcCompressionKey = settingPrefix + "output_format_orc_compression_method"
    if (!nativeConfMap.containsKey(orcCompressionKey)) {
      if (nativeConfMap.containsKey("spark.sql.orc.compression.codec")) {
        val compression = nativeConfMap.get("spark.sql.orc.compression.codec").toLowerCase()
        compression match {
          case "none" => nativeConfMap.put(orcCompressionKey, "none")
          case "uncompressed" => nativeConfMap.put(orcCompressionKey, "none")
          case "snappy" => nativeConfMap.put(orcCompressionKey, "snappy")
          case "zlib" => nativeConfMap.put(orcCompressionKey, "zlib")
          case "zstd" => nativeConfMap.put(orcCompressionKey, "zstd")
          case "lz4" => nativeConfMap.put(orcCompressionKey, "lz4")
          case _ =>
            throw new UnsupportedOperationException(s"Not supported ORC compression: $compression")
        }
      } else {
        nativeConfMap.put(orcCompressionKey, "snappy")
      }
    }
  }