in backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala [77:161]
override def postProcessNativeConfig(
nativeConfMap: util.Map[String, String],
backendPrefix: String): Unit = {
val settingPrefix = backendPrefix + ".runtime_settings."
if (nativeConfMap.getOrDefault("spark.memory.offHeap.enabled", "false").toBoolean) {
val offHeapSize =
nativeConfMap.getOrDefault("spark.gluten.memory.offHeap.size.in.bytes", "0").toLong
if (offHeapSize > 0) {
// Only set default max_bytes_before_external_sort for CH when it is not set explicitly.
val sortSpillKey = settingPrefix + "max_bytes_before_external_sort";
if (!nativeConfMap.containsKey(sortSpillKey)) {
val sortSpillValue = offHeapSize * 0.5
nativeConfMap.put(sortSpillKey, sortSpillValue.toLong.toString)
}
// Only set default max_bytes_before_external_group_by for CH when it is not set explicitly.
val groupBySpillKey = settingPrefix + "max_bytes_before_external_group_by";
if (!nativeConfMap.containsKey(groupBySpillKey)) {
val groupBySpillValue = offHeapSize * 0.5
nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString)
}
val maxMemoryUsageKey = settingPrefix + "max_memory_usage";
if (!nativeConfMap.containsKey(maxMemoryUsageKey)) {
val maxMemoryUsageValue = offHeapSize
nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString)
}
// Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash
val joinAlgorithmKey = settingPrefix + "join_algorithm";
if (
nativeConfMap.containsKey(joinAlgorithmKey) &&
nativeConfMap.get(joinAlgorithmKey) == "grace_hash"
) {
val joinSpillKey = settingPrefix + "max_bytes_in_join";
if (!nativeConfMap.containsKey(joinSpillKey)) {
val joinSpillValue = offHeapSize * 0.7
nativeConfMap.put(joinSpillKey, joinSpillValue.toLong.toString)
}
}
}
}
val injectConfig: (String, String) => Unit = (srcKey, dstKey) => {
if (nativeConfMap.containsKey(srcKey) && !nativeConfMap.containsKey(dstKey)) {
nativeConfMap.put(dstKey, nativeConfMap.get(srcKey))
}
}
val hdfsConfigPrefix = backendPrefix + ".runtime_config.hdfs."
injectConfig("spark.hadoop.input.connect.timeout", hdfsConfigPrefix + "input_connect_timeout")
injectConfig("spark.hadoop.input.read.timeout", hdfsConfigPrefix + "input_read_timeout")
injectConfig("spark.hadoop.input.write.timeout", hdfsConfigPrefix + "input_write_timeout")
injectConfig(
"spark.hadoop.dfs.client.log.severity",
hdfsConfigPrefix + "dfs_client_log_severity")
// TODO: set default to true when metrics could be collected
// while ch query plan optimization is enabled.
val planOptKey = settingPrefix + "query_plan_enable_optimizations"
if (!nativeConfMap.containsKey(planOptKey)) {
nativeConfMap.put(planOptKey, "false")
}
// Respect spark config spark.sql.orc.compression.codec for CH backend
// TODO: consider compression or orc.compression in table options.
val orcCompressionKey = settingPrefix + "output_format_orc_compression_method"
if (!nativeConfMap.containsKey(orcCompressionKey)) {
if (nativeConfMap.containsKey("spark.sql.orc.compression.codec")) {
val compression = nativeConfMap.get("spark.sql.orc.compression.codec").toLowerCase()
compression match {
case "none" => nativeConfMap.put(orcCompressionKey, "none")
case "uncompressed" => nativeConfMap.put(orcCompressionKey, "none")
case "snappy" => nativeConfMap.put(orcCompressionKey, "snappy")
case "zlib" => nativeConfMap.put(orcCompressionKey, "zlib")
case "zstd" => nativeConfMap.put(orcCompressionKey, "zstd")
case "lz4" => nativeConfMap.put(orcCompressionKey, "lz4")
case _ =>
throw new UnsupportedOperationException(s"Not supported ORC compression: $compression")
}
} else {
nativeConfMap.put(orcCompressionKey, "snappy")
}
}
}