in backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala [101:211]
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String],
properties: Map[String, String],
hadoopConf: Configuration): ValidationResult = {
def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
filteredRootPaths.nonEmpty &&
!VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
filteredRootPaths.toArray)
) {
Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
} else {
None
}
}
def validateFormat(): Option[String] = {
def validateTypes(validatorFunc: PartialFunction[StructField, String]): Option[String] = {
// Collect unsupported types.
val unsupportedDataTypeReason = fields.collect(validatorFunc)
if (unsupportedDataTypeReason.nonEmpty) {
Some(
s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.")
} else {
None
}
}
def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
val charTypePattern = "char\\((\\d+)\\)".r
GlutenConfig.get.forceOrcCharTypeScanFallbackEnabled && charTypePattern
.findFirstIn(
CharVarcharUtils
.getRawTypeString(metadata)
.getOrElse(stringType.catalogString))
.isDefined
}
format match {
case ParquetReadFormat =>
val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
if (parquetOptions.mergeSchema) {
// https://github.com/apache/incubator-gluten/issues/7174
Some(s"not support when merge schema is true")
} else {
None
}
case DwrfReadFormat => None
case OrcReadFormat =>
if (!VeloxConfig.get.veloxOrcScanEnabled) {
Some(s"Velox ORC scan is turned off, ${VeloxConfig.VELOX_ORC_SCAN_ENABLED.key}")
} else {
val typeValidator: PartialFunction[StructField, String] = {
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[StructType] =>
"StructType as element in ArrayType"
case StructField(_, arrayType: ArrayType, _, _)
if arrayType.elementType.isInstanceOf[ArrayType] =>
"ArrayType as element in ArrayType"
case StructField(_, mapType: MapType, _, _)
if mapType.keyType.isInstanceOf[StructType] =>
"StructType as Key in MapType"
case StructField(_, mapType: MapType, _, _)
if mapType.valueType.isInstanceOf[ArrayType] =>
"ArrayType as Value in MapType"
case StructField(_, stringType: StringType, _, metadata)
if isCharType(stringType, metadata) =>
CharVarcharUtils.getRawTypeString(metadata) + "(force fallback)"
case StructField(_, TimestampType, _, _) => "TimestampType"
}
validateTypes(typeValidator)
}
case _ => Some(s"Unsupported file format $format.")
}
}
def validateEncryption(): Option[String] = {
val encryptionValidationEnabled = GlutenConfig.get.parquetEncryptionValidationEnabled
if (!encryptionValidationEnabled) {
return None
}
val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
val encryptionResult =
ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, fileLimit)
if (encryptionResult.ok()) {
None
} else {
Some(s"Detected encrypted parquet files: ${encryptionResult.reason()}")
}
}
val validationChecks = Seq(
validateScheme(),
validateFormat(),
validateEncryption()
)
for (check <- validationChecks) {
if (check.isDefined) {
return ValidationResult.failed(check.get)
}
}
ValidationResult.succeeded
}