override def validateScanExec()

in backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala [101:211]


  override def validateScanExec(
      format: ReadFileFormat,
      fields: Array[StructField],
      rootPaths: Seq[String],
      properties: Map[String, String],
      hadoopConf: Configuration): ValidationResult = {

    def validateScheme(): Option[String] = {
      val filteredRootPaths = distinctRootPaths(rootPaths)
      if (
        filteredRootPaths.nonEmpty &&
        !VeloxFileSystemValidationJniWrapper.allSupportedByRegisteredFileSystems(
          filteredRootPaths.toArray)
      ) {
        Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.")
      } else {
        None
      }
    }

    def validateFormat(): Option[String] = {
      def validateTypes(validatorFunc: PartialFunction[StructField, String]): Option[String] = {
        // Collect unsupported types.
        val unsupportedDataTypeReason = fields.collect(validatorFunc)
        if (unsupportedDataTypeReason.nonEmpty) {
          Some(
            s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.")
        } else {
          None
        }
      }

      def isCharType(stringType: StringType, metadata: Metadata): Boolean = {
        val charTypePattern = "char\\((\\d+)\\)".r
        GlutenConfig.get.forceOrcCharTypeScanFallbackEnabled && charTypePattern
          .findFirstIn(
            CharVarcharUtils
              .getRawTypeString(metadata)
              .getOrElse(stringType.catalogString))
          .isDefined
      }

      format match {
        case ParquetReadFormat =>
          val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get)
          if (parquetOptions.mergeSchema) {
            // https://github.com/apache/incubator-gluten/issues/7174
            Some(s"not support when merge schema is true")
          } else {
            None
          }
        case DwrfReadFormat => None
        case OrcReadFormat =>
          if (!VeloxConfig.get.veloxOrcScanEnabled) {
            Some(s"Velox ORC scan is turned off, ${VeloxConfig.VELOX_ORC_SCAN_ENABLED.key}")
          } else {
            val typeValidator: PartialFunction[StructField, String] = {
              case StructField(_, arrayType: ArrayType, _, _)
                  if arrayType.elementType.isInstanceOf[StructType] =>
                "StructType as element in ArrayType"
              case StructField(_, arrayType: ArrayType, _, _)
                  if arrayType.elementType.isInstanceOf[ArrayType] =>
                "ArrayType as element in ArrayType"
              case StructField(_, mapType: MapType, _, _)
                  if mapType.keyType.isInstanceOf[StructType] =>
                "StructType as Key in MapType"
              case StructField(_, mapType: MapType, _, _)
                  if mapType.valueType.isInstanceOf[ArrayType] =>
                "ArrayType as Value in MapType"
              case StructField(_, stringType: StringType, _, metadata)
                  if isCharType(stringType, metadata) =>
                CharVarcharUtils.getRawTypeString(metadata) + "(force fallback)"
              case StructField(_, TimestampType, _, _) => "TimestampType"
            }
            validateTypes(typeValidator)
          }
        case _ => Some(s"Unsupported file format $format.")
      }
    }

    def validateEncryption(): Option[String] = {

      val encryptionValidationEnabled = GlutenConfig.get.parquetEncryptionValidationEnabled
      if (!encryptionValidationEnabled) {
        return None
      }

      val fileLimit = GlutenConfig.get.parquetEncryptionValidationFileLimit
      val encryptionResult =
        ParquetMetadataUtils.validateEncryption(format, rootPaths, hadoopConf, fileLimit)
      if (encryptionResult.ok()) {
        None
      } else {
        Some(s"Detected encrypted parquet files: ${encryptionResult.reason()}")
      }
    }

    val validationChecks = Seq(
      validateScheme(),
      validateFormat(),
      validateEncryption()
    )

    for (check <- validationChecks) {
      if (check.isDefined) {
        return ValidationResult.failed(check.get)
      }
    }

    ValidationResult.succeeded
  }