in sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala [56:521]
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
cols.foreach { c =>
if (c.name.length > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
ident, "ADD COLUMN with qualified column")
}
if (!c.nullable) {
throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError()
}
}
AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")
case AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _), specs)
if supportsV1Command(catalog) =>
if (specs.size > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN in bulk")
}
val s = specs.head
if (s.column.name.length > 1) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN with qualified column")
}
if (s.newNullability.isDefined) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN ... SET NOT NULL")
}
if (s.newPosition.isDefined) {
throw QueryCompilationErrors.unsupportedTableOperationError(
catalog, ident, "ALTER COLUMN ... FIRST | AFTER")
}
val builder = new MetadataBuilder
// Add comment to metadata
s.newComment.map(c => builder.putString("comment", c))
val colName = s.column.name.head
val dataType = s.newDataType.getOrElse {
table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
.map {
case (_, StructField(_, st: StringType, _, metadata)) =>
CharVarcharUtils.getRawType(metadata).getOrElse(st)
case (_, field) => field.dataType
}
.getOrElse {
throw QueryCompilationErrors.unresolvedColumnError(
toSQLId(s.column.name), table.schema.fieldNames)
}
}
// Add the current default column value string (if any) to the column metadata.
s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, c) }
val newColumn = StructField(
colName,
dataType,
nullable = true,
builder.build())
AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)
case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt)
if supportsV1Command(catalog) =>
val prop = Map(ClusterBySpec.toProperty(table.schema,
clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver))
AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)
case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")
case DropColumns(ResolvedV1TableIdentifier(ident), _, _) =>
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP COLUMN")
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident, props, isView = false)
case UnsetTableProperties(ResolvedV1TableIdentifier(ident), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = false)
case SetViewProperties(ResolvedViewIdentifier(ident), props) =>
AlterTableSetPropertiesCommand(ident, props, isView = true)
case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)
case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command =>
DescribeDatabaseCommand(db, extended, output)
case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command =>
AlterDatabasePropertiesCommand(db, properties)
case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command =>
if (StringUtils.isEmpty(location)) {
throw QueryExecutionErrors.invalidEmptyLocationError(location)
}
AlterDatabaseSetLocationCommand(db, location)
case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) =>
AlterTableRenameCommand(oldIdent, newName.asTableIdentifier, isView)
// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(
ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) =>
DescribeTableCommand(ident, partitionSpec, isExtended, output)
case DescribeColumn(
ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended, output) =>
// For views, the column will not be resolved by `ResolveReferences` because
// `ResolvedView` stores only the identifier.
DescribeColumnCommand(ident, column.nameParts, isExtended, output)
case DescribeColumn(ResolvedV1TableIdentifier(ident), column, isExtended, output) =>
column match {
case u: UnresolvedAttribute =>
throw QueryCompilationErrors.columnNotFoundError(u.name)
case a: Attribute =>
DescribeColumnCommand(ident, a.qualifier :+ a.name, isExtended, output)
case Alias(child, _) =>
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(child))
case _ =>
throw SparkException.internalError(s"[BUG] unexpected column expression: $column")
}
// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, _)
if c.resolved =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = false)
if (!isV2Provider(provider)) {
constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
}
case c @ CreateTableAsSelect(
ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, writeOptions, _, _) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
tableSpec.options ++ writeOptions,
c.tableSpec.location,
c.tableSpec.serde,
ctas = true)
if (!isV2Provider(provider)) {
constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
}
case RefreshTable(ResolvedV1TableOrViewIdentifier(ident)) =>
RefreshTableCommand(ident)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) if c.resolved =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
ident, "REPLACE TABLE")
} else {
c
}
case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
throw QueryCompilationErrors.unsupportedTableOperationError(
ident, "REPLACE TABLE AS SELECT")
} else {
c
}
case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command =>
DropTableCommand(ident, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) =>
DropTempViewCommand(ident)
case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) =>
DropTableCommand(ident, ifExists, isView = true, purge = false)
case DropView(r @ ResolvedIdentifier(catalog, ident), _) =>
if (catalog == FakeSystemCatalog) {
DropTempViewCommand(ident)
} else {
throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
}
case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command =>
val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
if (location.isDefined && location.get.isEmpty) {
throw QueryExecutionErrors.invalidEmptyLocationError(location.get)
}
CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties)
case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command =>
DropDatabaseCommand(db, d.ifExists, d.cascade)
case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command =>
ShowTablesCommand(Some(db), pattern, output)
case ShowTablesExtended(
ResolvedV1Database(db),
pattern,
output) =>
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
output.head.withName("database") +: output.tail
} else {
output
}
ShowTablesCommand(Some(db), Some(pattern), newOutput, isExtended = true)
case ShowTablePartition(
ResolvedTable(catalog, _, table: V1Table, _),
partitionSpec,
output) if supportsV1Command(catalog) =>
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
output.head.withName("database") +: output.tail
} else {
output
}
val tablePartitionSpec = Option(partitionSpec).map(
_.asInstanceOf[UnresolvedPartitionSpec].spec)
ShowTablesCommand(table.catalogTable.identifier.database,
Some(table.catalogTable.identifier.table), newOutput,
isExtended = true, tablePartitionSpec)
// ANALYZE TABLE works on permanent views if the views are cached.
case AnalyzeTable(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, noScan) =>
if (partitionSpec.isEmpty) {
AnalyzeTableCommand(ident, noScan)
} else {
AnalyzePartitionCommand(ident, partitionSpec, noScan)
}
case AnalyzeTables(ResolvedV1Database(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
AnalyzeColumnCommand(ident, columnNames, allColumns)
// V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here.
case RepairTable(
ResolvedV1TableIdentifierInSessionCatalog(ident),
addPartitions,
dropPartitions) =>
RepairTableCommand(ident, addPartitions, dropPartitions)
// V2 catalog doesn't support LOAD DATA yet, we must use v1 command here.
case LoadData(
ResolvedV1TableIdentifierInSessionCatalog(ident),
path,
isLocal,
isOverwrite,
partition) =>
LoadDataCommand(
ident,
path,
isLocal,
isOverwrite,
partition)
case ShowCreateTable(ResolvedV1TableOrViewIdentifier(ident), asSerde, output) if asSerde =>
ShowCreateTableAsSerdeCommand(ident, output)
// If target is view, force use v1 command
case ShowCreateTable(ResolvedViewIdentifier(ident), _, output) =>
ShowCreateTableCommand(ident, output)
case ShowCreateTable(ResolvedV1TableIdentifier(ident), _, output)
if conf.useV1Command => ShowCreateTableCommand(ident, output)
case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
ShowCreateTableCommand(table.catalogTable.identifier, output)
case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
TruncateTableCommand(ident, None)
case TruncatePartition(ResolvedV1TableIdentifier(ident), partitionSpec) =>
TruncateTableCommand(
ident,
Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)
case ShowPartitions(
ResolvedV1TableOrViewIdentifier(ident),
pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
ShowPartitionsCommand(
ident,
output,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))
case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
val v1TableName = ident
val resolver = conf.resolver
val db = ns match {
case Some(db) if v1TableName.database.exists(!resolver(_, db.head)) =>
throw QueryCompilationErrors.showColumnsWithConflictNamespacesError(
Seq(db.head), Seq(v1TableName.database.get))
case _ => ns.map(_.head)
}
ShowColumnsCommand(db, v1TableName, output)
// V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here.
case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) =>
RepairTableCommand(
ident,
enableAddPartitions = true,
enableDropPartitions = false,
"ALTER TABLE RECOVER PARTITIONS")
case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
AlterTableAddPartitionCommand(
ident,
partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
ifNotExists)
case RenamePartitions(
ResolvedV1TableIdentifier(ident),
UnresolvedPartitionSpec(from, _),
UnresolvedPartitionSpec(to, _)) =>
AlterTableRenamePartitionCommand(ident, from, to)
case DropPartitions(
ResolvedV1TableIdentifier(ident), specs, ifExists, purge) =>
AlterTableDropPartitionCommand(
ident,
specs.asUnresolvedPartitionSpecs.map(_.spec),
ifExists,
purge,
retainData = false)
// V2 catalog doesn't support setting serde properties yet, we must use v1 command here.
case SetTableSerDeProperties(
ResolvedV1TableIdentifierInSessionCatalog(ident),
serdeClassName,
serdeProperties,
partitionSpec) =>
AlterTableSerDePropertiesCommand(
ident,
serdeClassName,
serdeProperties,
partitionSpec)
case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) =>
AlterTableSetLocationCommand(ident, None, location)
// V2 catalog doesn't support setting partition location yet, we must use v1 command here.
case SetTableLocation(
ResolvedV1TableIdentifierInSessionCatalog(ident),
Some(partitionSpec),
location) =>
AlterTableSetLocationCommand(ident, Some(partitionSpec), location)
case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
AlterViewAsCommand(ident, originalText, query)
case AlterViewSchemaBinding(ResolvedViewIdentifier(ident), viewSchemaMode) =>
AlterViewSchemaBindingCommand(ident, viewSchemaMode)
case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment,
collation, properties, originalText, child, allowExisting, replace, viewSchemaMode) =>
CreateViewCommand(
name = ident,
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
collation = collation,
properties = properties,
originalText = originalText,
plan = child,
allowExisting = allowExisting,
replace = replace,
viewType = PersistedView,
viewSchemaMode = viewSchemaMode)
case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _) =>
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "views")
case ShowViews(ns: ResolvedNamespace, pattern, output) =>
ns match {
case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
case _ =>
throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views")
}
// If target is view, force use v1 command
case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) =>
ShowTablePropertiesCommand(ident, propertyKey, output)
case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output)
if conf.useV1Command =>
ShowTablePropertiesCommand(ident, propertyKey, output)
case DescribeFunction(ResolvedNonPersistentFunc(_, V1Function(info)), extended) =>
DescribeFunctionCommand(info, extended)
case DescribeFunction(ResolvedPersistentFunc(catalog, _, func), extended) =>
if (isSessionCatalog(catalog)) {
DescribeFunctionCommand(func.asInstanceOf[V1Function].info, extended)
} else {
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions")
}
case ShowFunctions(
ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
ShowFunctionsCommand(db, pattern, userScope, systemScope, output)
case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
if (isSessionCatalog(catalog)) {
val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
identifier.asFunctionIdentifier)
DropFunctionCommand(funcIdentifier, ifExists, false)
} else {
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "DROP FUNCTION")
}
case RefreshFunction(ResolvedPersistentFunc(catalog, identifier, _)) =>
if (isSessionCatalog(catalog)) {
val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
identifier.asFunctionIdentifier)
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
} else {
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
}
case CreateFunction(
ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) =>
CreateFunctionCommand(
FunctionIdentifier(ident.table, ident.database, ident.catalog),
className,
resources,
false,
ifExists,
replace)
case CreateFunction(ResolvedIdentifier(catalog, _), _, _, _, _) =>
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")
case c @ CreateUserDefinedFunction(
ResolvedIdentifierInSessionCatalog(ident), _, _, _, _, _, _, _, _, _, _, _) =>
CreateUserDefinedFunctionCommand(
FunctionIdentifier(ident.table, ident.database, ident.catalog),
c.inputParamText,
c.returnTypeText,
c.exprText,
c.queryText,
c.comment,
c.isDeterministic,
c.containsSQL,
c.language,
c.isTableFunc,
isTemp = false,
c.ignoreIfExists,
c.replace)
case CreateUserDefinedFunction(
ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _, _, _) =>
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")
}