in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala [111:577]
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters, DataSourceV2ScanRelation(
v2Relation, V1ScanWrapper(scan, pushed, pushedDownOperators), output, _, _)) =>
val v1Relation = scan.toV1TableScan[BaseRelation with TableScan](session.sqlContext)
if (v1Relation.schema != scan.readSchema()) {
throw QueryExecutionErrors.fallbackV1RelationReportsInconsistentSchemaError(
scan.readSchema(), v1Relation.schema)
}
val rdd = v1Relation.buildScan()
val unsafeRowRDD = DataSourceStrategy.toCatalystRDD(v1Relation, output, rdd)
val catalogName = v2Relation.catalog.map(_.name())
val tableIdentifier = v2Relation.identifier.flatMap(_.asTableIdentifierOpt(catalogName))
val dsScan = RowDataSourceScanExec(
output,
output.toStructType,
Set.empty,
pushed.toSet,
pushedDownOperators,
unsafeRowRDD,
v1Relation,
None,
tableIdentifier)
DataSourceV2Strategy.withProjectAndFilter(
project, filters, dsScan, needsUnsafeConversion = false) :: Nil
case PhysicalOperation(project, filters,
DataSourceV2ScanRelation(_, scan: LocalScan, output, _, _)) =>
val localScanExec = LocalTableScanExec(output, scan.rows().toImmutableArraySeq, None)
DataSourceV2Strategy.withProjectAndFilter(
project, filters, localScanExec, needsUnsafeConversion = false) :: Nil
case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
// projection and filters were already pushed down in the optimizer.
// this uses PhysicalOperation to get the projection and ensure that if the batch scan does
// not support columnar, a projection is added to convert the rows to UnsafeRow.
val (runtimeFilters, postScanFilters) = filters.partition {
case _: DynamicPruning => true
case _ => false
}
val batchExec = BatchScanExec(relation.output, relation.scan, runtimeFilters,
relation.ordering, relation.relation.table,
StoragePartitionJoinParams(relation.keyGroupedPartitioning))
DataSourceV2Strategy.withProjectAndFilter(
project, postScanFilters, batchExec, !batchExec.supportsColumnar) :: Nil
case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isDefined =>
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
val scanExec = MicroBatchScanExec(
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
// Add a Project here to make sure we produce unsafe rows.
DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
case PhysicalOperation(p, f, r: StreamingDataSourceV2ScanRelation)
if r.startOffset.isDefined && r.endOffset.isEmpty =>
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
// initialize partitions
scanExec.inputPartitions
// Add a Project here to make sure we produce unsafe rows.
DataSourceV2Strategy.withProjectAndFilter(p, f, scanExec, !scanExec.supportsColumnar) :: Nil
case WriteToDataSourceV2(relationOpt, writer, query, customMetrics) =>
val invalidateCacheFunc: () => Unit = () => relationOpt match {
case Some(r) => session.sharedState.cacheManager.uncacheQuery(session, r, cascade = true)
case None => ()
}
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
case c @ CreateTable(ResolvedIdentifier(catalog, ident), columns, partitioning,
tableSpec: TableSpec, ifNotExists) =>
val tableCatalog = catalog.asTableCatalog
ResolveDefaultColumns.validateCatalogForDefaultValue(columns, tableCatalog, ident)
ResolveTableConstraints.validateCatalogForTableConstraint(
tableSpec.constraints, tableCatalog, ident)
val statementType = "CREATE TABLE"
GeneratedColumn.validateGeneratedColumns(
c.tableSchema, tableCatalog, ident, statementType)
IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)
CreateTableExec(
catalog.asTableCatalog,
ident,
columns.map(_.toV2Column(statementType)).toArray,
partitioning,
qualifyLocInTableSpec(tableSpec),
ifNotExists) :: Nil
case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec: TableSpec,
options, ifNotExists, true) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicCreateTableAsSelectExec(staging, ident, parts, query,
qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil
case _ =>
CreateTableAsSelectExec(catalog.asTableCatalog, ident, parts, query,
qualifyLocInTableSpec(tableSpec), options, ifNotExists) :: Nil
}
case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
case c @ ReplaceTable(
ResolvedIdentifier(catalog, ident), columns, parts, tableSpec: TableSpec, orCreate) =>
val tableCatalog = catalog.asTableCatalog
ResolveDefaultColumns.validateCatalogForDefaultValue(columns, tableCatalog, ident)
ResolveTableConstraints.validateCatalogForTableConstraint(
tableSpec.constraints, tableCatalog, ident)
val statementType = "REPLACE TABLE"
GeneratedColumn.validateGeneratedColumns(
c.tableSchema, tableCatalog, ident, statementType)
IdentityColumn.validateIdentityColumn(c.tableSchema, tableCatalog, ident)
val v2Columns = columns.map(_.toV2Column(statementType)).toArray
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableExec(staging, ident, v2Columns, parts,
qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
case _ =>
ReplaceTableExec(tableCatalog, ident, v2Columns, parts,
qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
}
case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
parts, query, tableSpec: TableSpec, options, orCreate, true) =>
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
staging,
ident,
parts,
query,
qualifyLocInTableSpec(tableSpec),
options,
orCreate = orCreate,
invalidateCache) :: Nil
case _ =>
ReplaceTableAsSelectExec(
catalog.asTableCatalog,
ident,
parts,
query,
qualifyLocInTableSpec(tableSpec),
options,
orCreate = orCreate,
invalidateCache) :: Nil
}
case AppendData(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
_, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
assert(analyzedQuery.isDefined)
AppendDataExecV1(v1, analyzedQuery.get, refreshCache(r), v1Write) :: Nil
case v2Write =>
throw QueryCompilationErrors.batchWriteCapabilityError(
v1, v2Write.getClass.getName, classOf[V1Write].getName)
}
case AppendData(r: DataSourceV2Relation, query, _, _, Some(write), _) =>
AppendDataExec(planLater(query), refreshCache(r), write) :: Nil
case OverwriteByExpression(r @ DataSourceV2Relation(v1: SupportsWrite, _, _, _, _), _, _,
_, _, Some(write), analyzedQuery) if v1.supports(TableCapability.V1_BATCH_WRITE) =>
write match {
case v1Write: V1Write =>
assert(analyzedQuery.isDefined)
OverwriteByExpressionExecV1(v1, analyzedQuery.get, refreshCache(r), v1Write) :: Nil
case v2Write =>
throw QueryCompilationErrors.batchWriteCapabilityError(
v1, v2Write.getClass.getName, classOf[V1Write].getName)
}
case OverwriteByExpression(
r: DataSourceV2Relation, _, query, _, _, Some(write), _) =>
OverwriteByExpressionExec(planLater(query), refreshCache(r), write) :: Nil
case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, _, _, Some(write)) =>
OverwritePartitionsDynamicExec(planLater(query), refreshCache(r), write) :: Nil
case DeleteFromTableWithFilters(r: DataSourceV2Relation, filters) =>
DeleteFromTableExec(r.table.asDeletable, filters.toArray, refreshCache(r)) :: Nil
case DeleteFromTable(relation, condition) =>
relation match {
case DataSourceV2ScanRelation(r, _, output, _, _) =>
val table = r.table
if (SubqueryExpression.hasSubquery(condition)) {
throw QueryCompilationErrors.unsupportedDeleteByConditionWithSubqueryError(condition)
}
// fail if any filter cannot be converted.
// correctness depends on removing all matching data.
val filters = DataSourceStrategy.normalizeExprs(Seq(condition), output)
.flatMap(splitConjunctivePredicates(_).map {
f => DataSourceV2Strategy.translateFilterV2(f).getOrElse(
throw QueryCompilationErrors.cannotTranslateExpressionToSourceFilterError(f))
}).toArray
table match {
case t: SupportsDeleteV2 if t.canDeleteWhere(filters) =>
DeleteFromTableExec(t, filters, refreshCache(r)) :: Nil
case t: SupportsDeleteV2 =>
throw QueryCompilationErrors.cannotDeleteTableWhereFiltersError(t, filters)
case t: TruncatableTable if condition == TrueLiteral =>
TruncateTableExec(t, refreshCache(r)) :: Nil
case _ =>
throw QueryCompilationErrors.tableDoesNotSupportDeletesError(table)
}
case LogicalRelationWithTable(_, Some(catalogTable)) =>
val tableIdentifier = catalogTable.identifier
throw QueryCompilationErrors.unsupportedTableOperationError(
tableIdentifier,
"DELETE")
case other =>
throw SparkException.internalError("Unexpected table relation: " + other)
}
case ReplaceData(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, projections, _,
Some(write)) =>
// use the original relation to refresh the cache
ReplaceDataExec(planLater(query), refreshCache(r), projections, write) :: Nil
case WriteDelta(_: DataSourceV2Relation, _, query, r: DataSourceV2Relation, projections,
Some(write)) =>
// use the original relation to refresh the cache
WriteDeltaExec(planLater(query), refreshCache(r), projections, write) :: Nil
case MergeRows(isSourceRowPresent, isTargetRowPresent, matchedInstructions,
notMatchedInstructions, notMatchedBySourceInstructions, checkCardinality, output, child) =>
MergeRowsExec(isSourceRowPresent, isTargetRowPresent, matchedInstructions,
notMatchedInstructions, notMatchedBySourceInstructions, checkCardinality,
output, planLater(child)) :: Nil
case WriteToContinuousDataSource(writer, query, customMetrics) =>
WriteToContinuousDataSourceExec(writer, planLater(query), customMetrics) :: Nil
case DescribeNamespace(ResolvedNamespace(catalog, ns, _), extended, output) =>
DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) :: Nil
case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) =>
if (partitionSpec.nonEmpty) {
throw QueryCompilationErrors.describeDoesNotSupportPartitionForV2TablesError()
}
DescribeTableExec(output, r.table, isExtended) :: Nil
case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
column match {
case c: Attribute =>
DescribeColumnExec(output, c, isExtended, r.table) :: Nil
case nested =>
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(nested))
}
case DropTable(r: ResolvedIdentifier, ifExists, purge) =>
val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r)
DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil
case _: NoopCommand =>
LocalTableScanExec(Nil, Nil, None) :: Nil
case RenameTable(r @ ResolvedTable(catalog, oldIdent, _, _), newIdent, isView) =>
if (isView) {
throw QueryCompilationErrors.cannotRenameTableWithAlterViewError()
}
RenameTableExec(
catalog,
oldIdent,
newIdent.asIdentifier,
invalidateTableCache(r),
session.sharedState.cacheManager.cacheQuery) :: Nil
case SetNamespaceProperties(ResolvedNamespace(catalog, ns, _), properties) =>
AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil
case SetNamespaceLocation(ResolvedNamespace(catalog, ns, _), location) =>
if (StringUtils.isEmpty(location)) {
throw QueryExecutionErrors.invalidEmptyLocationError(location)
}
AlterNamespaceSetPropertiesExec(
catalog.asNamespaceCatalog,
ns,
Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(location))) :: Nil
case CommentOnNamespace(ResolvedNamespace(catalog, ns, _), comment) =>
AlterNamespaceSetPropertiesExec(
catalog.asNamespaceCatalog,
ns,
Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
case CreateNamespace(ResolvedNamespace(catalog, ns, _), ifNotExists, properties) =>
val location = properties.get(SupportsNamespaces.PROP_LOCATION)
if (location.isDefined && location.get.isEmpty) {
throw QueryExecutionErrors.invalidEmptyLocationError(location.get)
}
val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc =>
properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
}.getOrElse(properties)
CreateNamespaceExec(catalog.asNamespaceCatalog, ns, ifNotExists, finalProperties) :: Nil
case DropNamespace(ResolvedNamespace(catalog, ns, _), ifExists, cascade) =>
DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil
case ShowTables(ResolvedNamespace(catalog, ns, _), pattern, output) =>
ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
case ShowTablesExtended(
ResolvedNamespace(catalog, ns, _),
pattern,
output) =>
ShowTablesExtendedExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
case ShowTablePartition(r: ResolvedTable, part, output) =>
ShowTablePartitionExec(output, r.catalog, r.identifier,
r.table.asPartitionable, Seq(part).asResolvedPartitionSpecs.head) :: Nil
case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns, _)) =>
val catalogManager = session.sessionState.catalogManager
val namespace = if (ns.nonEmpty) Some(ns) else None
SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil
case ShowTableProperties(rt: ResolvedTable, propertyKey, output) =>
ShowTablePropertiesExec(output, rt.table, rt.name, propertyKey) :: Nil
case AnalyzeTable(_: ResolvedTable, _, _) | AnalyzeColumn(_: ResolvedTable, _, _) =>
throw QueryCompilationErrors.analyzeTableNotSupportedForV2TablesError()
case AddPartitions(
r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _), parts, ignoreIfExists) =>
AddPartitionExec(
table,
parts.asResolvedPartitionSpecs,
ignoreIfExists,
recacheTable(r)) :: Nil
case DropPartitions(
r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _),
parts,
ignoreIfNotExists,
purge) =>
DropPartitionExec(
table,
parts.asResolvedPartitionSpecs,
ignoreIfNotExists,
purge,
recacheTable(r)) :: Nil
case RenamePartitions(
r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _), from, to) =>
RenamePartitionExec(
table,
Seq(from).asResolvedPartitionSpecs.head,
Seq(to).asResolvedPartitionSpecs.head,
recacheTable(r)) :: Nil
case RecoverPartitions(_: ResolvedTable) =>
throw QueryCompilationErrors.alterTableRecoverPartitionsNotSupportedForV2TablesError()
case SetTableSerDeProperties(_: ResolvedTable, _, _, _) =>
throw QueryCompilationErrors.alterTableSerDePropertiesNotSupportedForV2TablesError()
case LoadData(_: ResolvedTable, _, _, _, _) =>
throw QueryCompilationErrors.loadDataNotSupportedForV2TablesError()
case ShowCreateTable(rt: ResolvedTable, asSerde, output) =>
if (asSerde) {
throw QueryCompilationErrors.showCreateTableAsSerdeNotSupportedForV2TablesError()
}
ShowCreateTableExec(output, rt) :: Nil
case TruncateTable(r: ResolvedTable) =>
TruncateTableExec(
r.table.asTruncatable,
recacheTable(r)) :: Nil
case TruncatePartition(r: ResolvedTable, part) =>
TruncatePartitionExec(
r.table.asPartitionable,
Seq(part).asResolvedPartitionSpecs.head,
recacheTable(r)) :: Nil
case ShowColumns(resolvedTable: ResolvedTable, ns, output) =>
ns match {
case Some(namespace) =>
val tableNamespace = resolvedTable.identifier.namespace()
if (namespace.length != tableNamespace.length ||
!namespace.zip(tableNamespace).forall(SQLConf.get.resolver.tupled)) {
throw QueryCompilationErrors.showColumnsWithConflictNamespacesError(
namespace, tableNamespace.toSeq)
}
case _ =>
}
ShowColumnsExec(output, resolvedTable) :: Nil
case r @ ShowPartitions(
ResolvedTable(catalog, _, table: SupportsPartitionManagement, _),
pattern @ (None | Some(_: ResolvedPartitionSpec)), output) =>
ShowPartitionsExec(
output,
catalog,
table,
pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil
case RepairTable(_: ResolvedTable, _, _) =>
throw QueryCompilationErrors.repairTableNotSupportedForV2TablesError()
case r: CacheTable =>
CacheTableExec(r.table, r.multipartIdentifier, r.isLazy, r.options) :: Nil
case r: CacheTableAsSelect =>
CacheTableAsSelectExec(
r.tempViewName, r.plan, r.originalText, r.isLazy, r.options, r.referredTempFunctions) :: Nil
case r: UncacheTable =>
def isTempView(table: LogicalPlan): Boolean = table match {
case SubqueryAlias(_, v: View) => v.isTempView
case _ => false
}
UncacheTableExec(r.table, cascade = !isTempView(r.table)) :: Nil
case a: AlterTableCommand if a.table.resolved =>
val table = a.table.asInstanceOf[ResolvedTable]
ResolveTableConstraints.validateCatalogForTableChange(
a.changes, table.catalog, table.identifier)
AlterTableExec(table.catalog, table.identifier, a.changes) :: Nil
case CreateIndex(ResolvedTable(_, _, table, _),
indexName, indexType, ifNotExists, columns, properties) =>
table match {
case s: SupportsIndex =>
val namedRefs = columns.map { case (field, prop) =>
FieldReference(field.name) -> prop
}
CreateIndexExec(s, indexName, indexType, ifNotExists, namedRefs, properties) :: Nil
case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
s"CreateIndex is not supported in this table ${table.name}.")
}
case DropIndex(ResolvedTable(_, _, table, _), indexName, ifNotExists) =>
table match {
case s: SupportsIndex =>
DropIndexExec(s, indexName, ifNotExists) :: Nil
case _ => throw QueryCompilationErrors.tableIndexNotSupportedError(
s"DropIndex is not supported in this table ${table.name}.")
}
case ShowFunctions(
ResolvedNamespace(catalog, ns, _), userScope, systemScope, pattern, output) =>
ShowFunctionsExec(
output,
catalog.asFunctionCatalog,
ns,
userScope,
systemScope,
pattern) :: Nil
case c: Call =>
ExplainOnlySparkPlan(c) :: Nil
case _ => Nil
}