in backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala [244:371]
override def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
isPartitionedTable: Boolean,
options: Map[String, String]): ValidationResult = {
// Validate if HiveFileFormat write is supported based on output file type
def validateHiveFileFormat(hiveFileFormat: HiveFileFormat): Option[String] = {
// Reflect to get access to fileSinkConf which contains the output file format
val fileSinkConfField = format.getClass.getDeclaredField("fileSinkConf")
fileSinkConfField.setAccessible(true)
val fileSinkConf = fileSinkConfField.get(hiveFileFormat)
val tableInfoField = fileSinkConf.getClass.getDeclaredField("tableInfo")
tableInfoField.setAccessible(true)
val tableInfo = tableInfoField.get(fileSinkConf)
val getOutputFileFormatClassNameMethod = tableInfo.getClass
.getDeclaredMethod("getOutputFileFormatClassName")
val outputFileFormatClassName = getOutputFileFormatClassNameMethod.invoke(tableInfo)
// Match based on the output file format class name
outputFileFormatClassName match {
case "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat" =>
None
case _ =>
Some(
"HiveFileFormat is supported only with Parquet as the output file type"
) // Unsupported format
}
}
def validateCompressionCodec(): Option[String] = {
// Velox doesn't support brotli and lzo.
val unSupportedCompressions = Set("brotli", "lzo", "lz4raw", "lz4_raw")
val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options)
if (unSupportedCompressions.contains(compressionCodec)) {
Some("Brotli, lzo, lz4raw and lz4_raw compression codec is unsupported in Velox backend.")
} else {
None
}
}
// Validate if all types are supported.
def validateDataTypes(): Option[String] = {
val unsupportedTypes = format match {
case _: ParquetFileFormat =>
fields.flatMap {
case StructField(_, _: YearMonthIntervalType, _, _) =>
Some("YearMonthIntervalType")
case StructField(_, _: StructType, _, _) =>
Some("StructType")
case _ => None
}
case _ =>
fields.flatMap {
field =>
field.dataType match {
case _: StructType => Some("StructType")
case _: ArrayType => Some("ArrayType")
case _: MapType => Some("MapType")
case _: YearMonthIntervalType => Some("YearMonthIntervalType")
case _ => None
}
}
}
if (unsupportedTypes.nonEmpty) {
Some(unsupportedTypes.mkString("Found unsupported type:", ",", ""))
} else {
None
}
}
def validateFieldMetadata(): Option[String] = {
fields.find(_.metadata != Metadata.empty).map {
filed =>
s"StructField contain the metadata information: $filed, metadata: ${filed.metadata}"
}
}
def validateFileFormat(): Option[String] = {
format match {
case _: ParquetFileFormat => None // Parquet is directly supported
case h: HiveFileFormat if GlutenConfig.get.enableHiveFileFormatWriter =>
validateHiveFileFormat(h) // Parquet via Hive SerDe
case _ =>
Some(
"Only ParquetFileFormat and HiveFileFormat are supported."
) // Unsupported format
}
}
def validateWriteFilesOptions(): Option[String] = {
val maxRecordsPerFile = options
.get("maxRecordsPerFile")
.map(_.toLong)
.getOrElse(SQLConf.get.maxRecordsPerFile)
if (maxRecordsPerFile > 0) {
Some("Unsupported native write: maxRecordsPerFile not supported.")
} else {
None
}
}
def validateBucketSpec(): Option[String] = {
val isHiveCompatibleBucketTable = bucketSpec.nonEmpty && options
.getOrElse("__hive_compatible_bucketed_table_insertion__", "false")
.equals("true")
// Currently, the velox backend only supports bucketed tables compatible with Hive and
// is limited to partitioned tables. Therefore, we should add this condition restriction.
// After velox supports bucketed non-partitioned tables, we can remove the restriction on
// partitioned tables.
if (bucketSpec.isEmpty || (isHiveCompatibleBucketTable && isPartitionedTable)) {
None
} else {
Some("Unsupported native write: non-compatible hive bucket write is not supported.")
}
}
validateCompressionCodec()
.orElse(validateFileFormat())
.orElse(validateFieldMetadata())
.orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
case Some(reason) => ValidationResult.failed(reason)
case _ => ValidationResult.succeeded
}
}