backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/VeloxBackend.scala (334 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package io.glutenproject.backendsapi.velox import io.glutenproject.{GlutenConfig, GlutenPlugin, VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME} import io.glutenproject.backendsapi._ import io.glutenproject.execution.WriteFilesExecTransformer import io.glutenproject.expression.WindowFunctionsBuilder import io.glutenproject.extension.ValidationResult import io.glutenproject.sql.shims.SparkShimLoader import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat import io.glutenproject.substrait.rel.LocalFilesNode.ReadFileFormat.{DwrfReadFormat, OrcReadFormat, ParquetReadFormat} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Literal, NamedExpression, NthValue, PercentRank, Rand, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary, SpecifiedWindowFrame} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.expression.UDFResolver import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import scala.util.control.Breaks.breakable class VeloxBackend extends Backend { override def name(): String = VeloxBackend.BACKEND_NAME override def buildInfo(): GlutenPlugin.BackendBuildInfo = GlutenPlugin.BackendBuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, VELOX_REVISION_TIME) override def iteratorApi(): IteratorApi = new IteratorApiImpl override def sparkPlanExecApi(): SparkPlanExecApi = new SparkPlanExecApiImpl override def transformerApi(): TransformerApi = new TransformerApiImpl override def validatorApi(): ValidatorApi = new ValidatorApiImpl override def metricsApi(): MetricsApi = new MetricsApiImpl override def listenerApi(): ListenerApi = new ListenerApiImpl override def broadcastApi(): BroadcastApi = new BroadcastApiImpl override def settings(): BackendSettingsApi = BackendSettings } object VeloxBackend { val BACKEND_NAME = "velox" } object BackendSettings extends BackendSettingsApi { val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd") val GLUTEN_VELOX_UDF_LIB_PATHS = getBackendConfigPrefix() + ".udfLibraryPaths" val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = getBackendConfigPrefix() + ".driver.udfLibraryPaths" val MAXIMUM_BATCH_SIZE: Int = 32768 override def supportFileFormatRead( format: ReadFileFormat, fields: Array[StructField], partTable: Boolean, paths: Seq[String]): ValidationResult = { // Validate if all types are supported. def validateTypes(validatorFunc: PartialFunction[StructField, String]): ValidationResult = { // Collect unsupported types. val unsupportedDataTypeReason = fields.collect(validatorFunc) if (unsupportedDataTypeReason.isEmpty) { ValidationResult.ok } else { ValidationResult.notOk( s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.") } } format match { case ParquetReadFormat => val typeValidator: PartialFunction[StructField, String] = { // Parquet scan of nested array with struct/array as element type is unsupported in Velox. case StructField(_, arrayType: ArrayType, _, _) if arrayType.elementType.isInstanceOf[StructType] => "StructType as element in ArrayType" case StructField(_, arrayType: ArrayType, _, _) if arrayType.elementType.isInstanceOf[ArrayType] => "ArrayType as element in ArrayType" // Parquet scan of nested map with struct as key type, // or array type as value type is not supported in Velox. case StructField(_, mapType: MapType, _, _) if mapType.keyType.isInstanceOf[StructType] => "StructType as Key in MapType" case StructField(_, mapType: MapType, _, _) if mapType.valueType.isInstanceOf[ArrayType] => "ArrayType as Value in MapType" } validateTypes(typeValidator) case DwrfReadFormat => ValidationResult.ok case OrcReadFormat => if (!GlutenConfig.getConf.veloxOrcScanEnabled) { ValidationResult.notOk(s"Velox ORC scan is turned off.") } else { val typeValidator: PartialFunction[StructField, String] = { case StructField(_, ByteType, _, _) => "ByteType not support" case StructField(_, arrayType: ArrayType, _, _) if arrayType.elementType.isInstanceOf[StructType] => "StructType as element in ArrayType" case StructField(_, arrayType: ArrayType, _, _) if arrayType.elementType.isInstanceOf[ArrayType] => "ArrayType as element in ArrayType" case StructField(_, mapType: MapType, _, _) if mapType.keyType.isInstanceOf[StructType] => "StructType as Key in MapType" case StructField(_, mapType: MapType, _, _) if mapType.valueType.isInstanceOf[ArrayType] => "ArrayType as Value in MapType" case StructField(_, stringType: StringType, _, metadata) if CharVarcharUtils .getRawTypeString(metadata) .getOrElse(stringType.catalogString) != stringType.catalogString => CharVarcharUtils.getRawTypeString(metadata) + " not support" case StructField(_, TimestampType, _, _) => "TimestampType not support" } validateTypes(typeValidator) } case _ => ValidationResult.notOk(s"Unsupported file format for $format.") } } override def supportWriteFilesExec( format: FileFormat, fields: Array[StructField], bucketSpec: Option[BucketSpec], options: Map[String, String]): Option[String] = { def validateCompressionCodec(): Option[String] = { // Velox doesn't support brotli and lzo. val unSupportedCompressions = Set("brotli, lzo") val compressionCodec = WriteFilesExecTransformer.getCompressionCodec(options) if (unSupportedCompressions.contains(compressionCodec)) { Some("brotli or lzo compression codec is not support in velox backend.") } else { None } } // Validate if all types are supported. def validateDateTypes(): Option[String] = { fields.flatMap { field => field.dataType match { case _: TimestampType => Some("TimestampType") case _: StructType => Some("StructType") case _: ArrayType => Some("ArrayType") case _: MapType => Some("MapType") case _ => None } }.headOption } def validateFieldMetadata(): Option[String] = { if (fields.exists(!_.metadata.equals(Metadata.empty))) { Some("StructField contain the metadata information.") } else None } def validateFileFormat(): Option[String] = { format match { case _: ParquetFileFormat => None case _: FileFormat => Some("Only parquet fileformat is supported in native write.") } } def validateWriteFilesOptions(): Option[String] = { val maxRecordsPerFile = options .get("maxRecordsPerFile") .map(_.toLong) .getOrElse(SQLConf.get.maxRecordsPerFile) if (maxRecordsPerFile > 0) { Some("Unsupported native write: maxRecordsPerFile not supported.") } else { None } } def validateBucketSpec(): Option[String] = { if (bucketSpec.nonEmpty) { Some("Unsupported native write: bucket write is not supported.") } else { None } } validateCompressionCodec() .orElse(validateFileFormat()) .orElse(validateFieldMetadata()) .orElse(validateDateTypes()) .orElse(validateWriteFilesOptions()) .orElse(validateBucketSpec()) } override def supportExpandExec(): Boolean = true override def supportSortExec(): Boolean = true override def supportSortMergeJoinExec(): Boolean = { GlutenConfig.getConf.enableColumnarSortMergeJoin } override def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = { var allSupported = true breakable { windowFunctions.foreach( func => { val windowExpression = func match { case alias: Alias => WindowFunctionsBuilder.extractWindowExpression(alias.child) case _ => throw new UnsupportedOperationException(s"$func is not supported.") } // Block the offloading by checking Velox's current limitations // when literal bound type is used for RangeFrame. def checkLimitations(swf: SpecifiedWindowFrame, orderSpec: Seq[SortOrder]): Unit = { def doCheck(bound: Expression, isUpperBound: Boolean): Unit = { bound match { case _: SpecialFrameBoundary => case e if e.foldable => orderSpec.foreach( order => order.direction match { case Descending => throw new UnsupportedOperationException( "DESC order is not supported when" + " literal bound type is used!") case _ => }) orderSpec.foreach( order => order.dataType match { case ByteType | ShortType | IntegerType | LongType | DateType => case _ => throw new UnsupportedOperationException( "Only integral type & date type are" + " supported for sort key when literal bound type is used!") }) val rawValue = e.eval().toString.toLong if (isUpperBound && rawValue < 0) { throw new UnsupportedOperationException( "Negative upper bound is not supported!") } else if (!isUpperBound && rawValue > 0) { throw new UnsupportedOperationException( "Positive lower bound is not supported!") } case _ => } } doCheck(swf.upper, true) doCheck(swf.lower, false) } windowExpression.windowSpec.frameSpecification match { case swf: SpecifiedWindowFrame => swf.frameType match { case RangeFrame => checkLimitations(swf, windowExpression.windowSpec.orderSpec) case _ => } case _ => } windowExpression.windowFunction match { case _: RowNumber | _: AggregateExpression | _: Rank | _: CumeDist | _: DenseRank | _: PercentRank | _: NthValue => case _ => allSupported = false } }) } allSupported } override def supportColumnarShuffleExec(): Boolean = { GlutenConfig.getConf.isUseColumnarShuffleManager || GlutenConfig.getConf.isUseCelebornShuffleManager } override def enableJoinKeysRewrite(): Boolean = false override def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = { t => if (super.supportHashBuildJoinTypeOnLeft(t)) { true } else { t match { // OPPRO-266: For Velox backend, build right and left are both supported for // LeftOuter and LeftSemi. // FIXME Hongze 22/12/06 // HashJoin.scala in shim was not always loaded by class loader. // The file should be removed and we temporarily disable the improvement // introduced by OPPRO-266 by commenting out the following prerequisite // condition. // case LeftOuter | LeftSemi => true case _ => false } } } override def supportHashBuildJoinTypeOnRight: JoinType => Boolean = { t => if (super.supportHashBuildJoinTypeOnRight(t)) { true } else { t match { // OPPRO-266: For Velox backend, build right and left are both supported for RightOuter. // FIXME Hongze 22/12/06 // HashJoin.scala in shim was not always loaded by class loader. // The file should be removed and we temporarily disable the improvement // introduced by OPPRO-266 by commenting out the following prerequisite // condition. // case RightOuter => true case _ => false } } } /** * Check whether a plan needs to be offloaded even though they have empty input schema, e.g, * Sum(1), Count(1), rand(), etc. * @param plan: * The Spark plan to check. */ private def mayNeedOffload(plan: SparkPlan): Boolean = { def checkExpr(expr: Expression): Boolean = { expr match { // Block directly falling back the below functions by FallbackEmptySchemaRelation. case alias: Alias => checkExpr(alias.child) case _: Rand => true case _ => false } } plan match { case exec: HashAggregateExec if exec.aggregateExpressions.nonEmpty => // Check Sum(1) or Count(1). exec.aggregateExpressions.forall( expression => { val aggFunction = expression.aggregateFunction aggFunction match { case _: Sum | _: Count => aggFunction.children.size == 1 && aggFunction.children.head.equals(Literal(1)) case _ => false } }) case p: ProjectExec if p.projectList.nonEmpty => p.projectList.forall(checkExpr(_)) case _ => false } } override def fallbackOnEmptySchema(plan: SparkPlan): Boolean = { // Count(1) and Sum(1) are special cases that Velox backend can handle. // Do not fallback it and its children in the first place. !mayNeedOffload(plan) } override def fallbackAggregateWithChild(): Boolean = true override def recreateJoinExecOnFallback(): Boolean = true override def removeHashColumnFromColumnarShuffleExchangeExec(): Boolean = true override def rescaleDecimalLiteral(): Boolean = true /** Get the config prefix for each backend */ override def getBackendConfigPrefix(): String = GlutenConfig.GLUTEN_CONFIG_PREFIX + VeloxBackend.BACKEND_NAME override def rescaleDecimalIntegralExpression(): Boolean = true override def shuffleSupportedCodec(): Set[String] = SHUFFLE_SUPPORTED_CODEC override def resolveNativeConf(nativeConf: java.util.Map[String, String]): Unit = { checkMaxBatchSize(nativeConf) UDFResolver.resolveUdfConf(nativeConf) } override def insertPostProjectForGenerate(): Boolean = true override def skipNativeCtas(ctas: CreateDataSourceTableAsSelectCommand): Boolean = true override def skipNativeInsertInto(insertInto: InsertIntoHadoopFsRelationCommand): Boolean = { insertInto.partitionColumns.nonEmpty && insertInto.staticPartitions.size < insertInto.partitionColumns.size || insertInto.bucketSpec.nonEmpty } override def alwaysFailOnMapExpression(): Boolean = true override def requiredChildOrderingForWindow(): Boolean = true override def staticPartitionWriteOnly(): Boolean = true override def supportTransformWriteFiles: Boolean = true override def allowDecimalArithmetic: Boolean = SQLConf.get.decimalOperationsAllowPrecisionLoss override def enableNativeWriteFiles(): Boolean = { GlutenConfig.getConf.enableNativeWriter.getOrElse( SparkShimLoader.getSparkShims.enableNativeWriteFilesByDefault() ) } private def checkMaxBatchSize(nativeConf: java.util.Map[String, String]): Unit = { if (nativeConf.containsKey(GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY)) { val maxBatchSize = nativeConf.get(GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY).toInt if (maxBatchSize > MAXIMUM_BATCH_SIZE) { throw new IllegalArgumentException( s"The maximum value of ${GlutenConfig.GLUTEN_MAX_BATCH_SIZE_KEY}" + s" is $MAXIMUM_BATCH_SIZE for Velox backend.") } } } override def shouldRewriteCount(): Boolean = { // Velox backend does not support count if it has more that one child, // so we should rewrite it. true } override def supportCartesianProductExec(): Boolean = true }