override def apply()

in sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala [56:521]


  override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
    case AddColumns(ResolvedV1TableIdentifier(ident), cols) =>
      cols.foreach { c =>
        if (c.name.length > 1) {
          throw QueryCompilationErrors.unsupportedTableOperationError(
            ident, "ADD COLUMN with qualified column")
        }
        if (!c.nullable) {
          throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError()
        }
      }
      AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))

    case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) =>
      throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS")

    case AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _), specs)
        if supportsV1Command(catalog) =>
      if (specs.size > 1) {
        throw QueryCompilationErrors.unsupportedTableOperationError(
          catalog, ident, "ALTER COLUMN in bulk")
      }
      val s = specs.head
      if (s.column.name.length > 1) {
        throw QueryCompilationErrors.unsupportedTableOperationError(
          catalog, ident, "ALTER COLUMN with qualified column")
      }
      if (s.newNullability.isDefined) {
        throw QueryCompilationErrors.unsupportedTableOperationError(
          catalog, ident, "ALTER COLUMN ... SET NOT NULL")
      }
      if (s.newPosition.isDefined) {
        throw QueryCompilationErrors.unsupportedTableOperationError(
          catalog, ident, "ALTER COLUMN ... FIRST | AFTER")
      }
      val builder = new MetadataBuilder
      // Add comment to metadata
      s.newComment.map(c => builder.putString("comment", c))
      val colName = s.column.name.head
      val dataType = s.newDataType.getOrElse {
        table.schema.findNestedField(Seq(colName), resolver = conf.resolver)
          .map {
            case (_, StructField(_, st: StringType, _, metadata)) =>
              CharVarcharUtils.getRawType(metadata).getOrElse(st)
            case (_, field) => field.dataType
          }
          .getOrElse {
            throw QueryCompilationErrors.unresolvedColumnError(
              toSQLId(s.column.name), table.schema.fieldNames)
          }
      }
      // Add the current default column value string (if any) to the column metadata.
      s.newDefaultExpression.map { c => builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, c) }
      val newColumn = StructField(
        colName,
        dataType,
        nullable = true,
        builder.build())
      AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)

    case AlterTableClusterBy(ResolvedTable(catalog, _, table: V1Table, _), clusterBySpecOpt)
        if supportsV1Command(catalog) =>
      val prop = Map(ClusterBySpec.toProperty(table.schema,
        clusterBySpecOpt.getOrElse(ClusterBySpec(Nil)), conf.resolver))
      AlterTableSetPropertiesCommand(table.catalogTable.identifier, prop, isView = false)

    case RenameColumn(ResolvedV1TableIdentifier(ident), _, _) =>
      throw QueryCompilationErrors.unsupportedTableOperationError(ident, "RENAME COLUMN")

    case DropColumns(ResolvedV1TableIdentifier(ident), _, _) =>
      throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP COLUMN")

    case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
      AlterTableSetPropertiesCommand(ident, props, isView = false)

    case UnsetTableProperties(ResolvedV1TableIdentifier(ident), keys, ifExists) =>
      AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = false)

    case SetViewProperties(ResolvedViewIdentifier(ident), props) =>
      AlterTableSetPropertiesCommand(ident, props, isView = true)

    case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
      AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)

    case DescribeNamespace(ResolvedV1Database(db), extended, output) if conf.useV1Command =>
      DescribeDatabaseCommand(db, extended, output)

    case SetNamespaceProperties(ResolvedV1Database(db), properties) if conf.useV1Command =>
      AlterDatabasePropertiesCommand(db, properties)

    case SetNamespaceLocation(ResolvedV1Database(db), location) if conf.useV1Command =>
      if (StringUtils.isEmpty(location)) {
        throw QueryExecutionErrors.invalidEmptyLocationError(location)
      }
      AlterDatabaseSetLocationCommand(db, location)

    case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) =>
      AlterTableRenameCommand(oldIdent, newName.asTableIdentifier, isView)

    // Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
    case DescribeRelation(
        ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) =>
      DescribeTableCommand(ident, partitionSpec, isExtended, output)

    case DescribeColumn(
        ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended, output) =>
      // For views, the column will not be resolved by `ResolveReferences` because
      // `ResolvedView` stores only the identifier.
      DescribeColumnCommand(ident, column.nameParts, isExtended, output)

    case DescribeColumn(ResolvedV1TableIdentifier(ident), column, isExtended, output) =>
      column match {
        case u: UnresolvedAttribute =>
          throw QueryCompilationErrors.columnNotFoundError(u.name)
        case a: Attribute =>
          DescribeColumnCommand(ident, a.qualifier :+ a.name, isExtended, output)
        case Alias(child, _) =>
          throw QueryCompilationErrors.commandNotSupportNestedColumnError(
            "DESC TABLE COLUMN", toPrettySQL(child))
        case _ =>
          throw SparkException.internalError(s"[BUG] unexpected column expression: $column")
      }

    // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
    // session catalog and the table provider is not v2.
    case c @ CreateTable(ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, _)
        if c.resolved =>
      val (storageFormat, provider) = getStorageFormatAndProvider(
        c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
        ctas = false)
      if (!isV2Provider(provider)) {
        constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
          c.ignoreIfExists, storageFormat, provider)
      } else {
        c
      }

    case c @ CreateTableAsSelect(
        ResolvedV1Identifier(ident), _, _, tableSpec: TableSpec, writeOptions, _, _) =>
      val (storageFormat, provider) = getStorageFormatAndProvider(
        c.tableSpec.provider,
        tableSpec.options ++ writeOptions,
        c.tableSpec.location,
        c.tableSpec.serde,
        ctas = true)

      if (!isV2Provider(provider)) {
        constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning,
          c.ignoreIfExists, storageFormat, provider)
      } else {
        c
      }

    case RefreshTable(ResolvedV1TableOrViewIdentifier(ident)) =>
      RefreshTableCommand(ident)

    // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
    // session catalog and the table provider is not v2.
    case c @ ReplaceTable(ResolvedV1Identifier(ident), _, _, _, _) if c.resolved =>
      val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
      if (!isV2Provider(provider)) {
        throw QueryCompilationErrors.unsupportedTableOperationError(
          ident, "REPLACE TABLE")
      } else {
        c
      }

    case c @ ReplaceTableAsSelect(ResolvedV1Identifier(ident), _, _, _, _, _, _) =>
      val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
      if (!isV2Provider(provider)) {
        throw QueryCompilationErrors.unsupportedTableOperationError(
          ident, "REPLACE TABLE AS SELECT")
      } else {
        c
      }

    case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if conf.useV1Command =>
      DropTableCommand(ident, ifExists, isView = false, purge = purge)

    // v1 DROP TABLE supports temp view.
    case DropTable(ResolvedIdentifier(FakeSystemCatalog, ident), _, _) =>
      DropTempViewCommand(ident)

    case DropView(ResolvedIdentifierInSessionCatalog(ident), ifExists) =>
      DropTableCommand(ident, ifExists, isView = true, purge = false)

    case DropView(r @ ResolvedIdentifier(catalog, ident), _) =>
      if (catalog == FakeSystemCatalog) {
        DropTempViewCommand(ident)
      } else {
        throw QueryCompilationErrors.catalogOperationNotSupported(catalog, "views")
      }

    case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command =>
      val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
      val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
      val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
      if (location.isDefined && location.get.isEmpty) {
        throw QueryExecutionErrors.invalidEmptyLocationError(location.get)
      }
      CreateDatabaseCommand(name, c.ifNotExists, location, comment, newProperties)

    case d @ DropNamespace(ResolvedV1Database(db), _, _) if conf.useV1Command =>
      DropDatabaseCommand(db, d.ifExists, d.cascade)

    case ShowTables(ResolvedV1Database(db), pattern, output) if conf.useV1Command =>
      ShowTablesCommand(Some(db), pattern, output)

    case ShowTablesExtended(
        ResolvedV1Database(db),
        pattern,
        output) =>
      val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
        output.head.withName("database") +: output.tail
      } else {
        output
      }
      ShowTablesCommand(Some(db), Some(pattern), newOutput, isExtended = true)

    case ShowTablePartition(
        ResolvedTable(catalog, _, table: V1Table, _),
        partitionSpec,
        output) if supportsV1Command(catalog) =>
      val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
        output.head.withName("database") +: output.tail
      } else {
        output
      }
      val tablePartitionSpec = Option(partitionSpec).map(
        _.asInstanceOf[UnresolvedPartitionSpec].spec)
      ShowTablesCommand(table.catalogTable.identifier.database,
        Some(table.catalogTable.identifier.table), newOutput,
        isExtended = true, tablePartitionSpec)

    // ANALYZE TABLE works on permanent views if the views are cached.
    case AnalyzeTable(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, noScan) =>
      if (partitionSpec.isEmpty) {
        AnalyzeTableCommand(ident, noScan)
      } else {
        AnalyzePartitionCommand(ident, partitionSpec, noScan)
      }

    case AnalyzeTables(ResolvedV1Database(db), noScan) =>
      AnalyzeTablesCommand(Some(db), noScan)

    case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
      AnalyzeColumnCommand(ident, columnNames, allColumns)

    // V2 catalog doesn't support REPAIR TABLE yet, we must use v1 command here.
    case RepairTable(
        ResolvedV1TableIdentifierInSessionCatalog(ident),
        addPartitions,
        dropPartitions) =>
      RepairTableCommand(ident, addPartitions, dropPartitions)

    // V2 catalog doesn't support LOAD DATA yet, we must use v1 command here.
    case LoadData(
        ResolvedV1TableIdentifierInSessionCatalog(ident),
        path,
        isLocal,
        isOverwrite,
        partition) =>
      LoadDataCommand(
        ident,
        path,
        isLocal,
        isOverwrite,
        partition)

    case ShowCreateTable(ResolvedV1TableOrViewIdentifier(ident), asSerde, output) if asSerde =>
      ShowCreateTableAsSerdeCommand(ident, output)

    // If target is view, force use v1 command
    case ShowCreateTable(ResolvedViewIdentifier(ident), _, output) =>
      ShowCreateTableCommand(ident, output)

    case ShowCreateTable(ResolvedV1TableIdentifier(ident), _, output)
      if conf.useV1Command => ShowCreateTableCommand(ident, output)

    case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
        if supportsV1Command(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
      ShowCreateTableCommand(table.catalogTable.identifier, output)

    case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
      TruncateTableCommand(ident, None)

    case TruncatePartition(ResolvedV1TableIdentifier(ident), partitionSpec) =>
      TruncateTableCommand(
        ident,
        Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)

    case ShowPartitions(
        ResolvedV1TableOrViewIdentifier(ident),
        pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
      ShowPartitionsCommand(
        ident,
        output,
        pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))

    case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
      val v1TableName = ident
      val resolver = conf.resolver
      val db = ns match {
        case Some(db) if v1TableName.database.exists(!resolver(_, db.head)) =>
          throw QueryCompilationErrors.showColumnsWithConflictNamespacesError(
            Seq(db.head), Seq(v1TableName.database.get))
        case _ => ns.map(_.head)
      }
      ShowColumnsCommand(db, v1TableName, output)

    // V2 catalog doesn't support RECOVER PARTITIONS yet, we must use v1 command here.
    case RecoverPartitions(ResolvedV1TableIdentifierInSessionCatalog(ident)) =>
      RepairTableCommand(
        ident,
        enableAddPartitions = true,
        enableDropPartitions = false,
        "ALTER TABLE RECOVER PARTITIONS")

    case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
      AlterTableAddPartitionCommand(
        ident,
        partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
        ifNotExists)

    case RenamePartitions(
        ResolvedV1TableIdentifier(ident),
        UnresolvedPartitionSpec(from, _),
        UnresolvedPartitionSpec(to, _)) =>
      AlterTableRenamePartitionCommand(ident, from, to)

    case DropPartitions(
        ResolvedV1TableIdentifier(ident), specs, ifExists, purge) =>
      AlterTableDropPartitionCommand(
        ident,
        specs.asUnresolvedPartitionSpecs.map(_.spec),
        ifExists,
        purge,
        retainData = false)

    // V2 catalog doesn't support setting serde properties yet, we must use v1 command here.
    case SetTableSerDeProperties(
        ResolvedV1TableIdentifierInSessionCatalog(ident),
        serdeClassName,
        serdeProperties,
        partitionSpec) =>
      AlterTableSerDePropertiesCommand(
        ident,
        serdeClassName,
        serdeProperties,
        partitionSpec)

    case SetTableLocation(ResolvedV1TableIdentifier(ident), None, location) =>
      AlterTableSetLocationCommand(ident, None, location)

    // V2 catalog doesn't support setting partition location yet, we must use v1 command here.
    case SetTableLocation(
        ResolvedV1TableIdentifierInSessionCatalog(ident),
        Some(partitionSpec),
        location) =>
      AlterTableSetLocationCommand(ident, Some(partitionSpec), location)

    case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
      AlterViewAsCommand(ident, originalText, query)

    case AlterViewSchemaBinding(ResolvedViewIdentifier(ident), viewSchemaMode) =>
      AlterViewSchemaBindingCommand(ident, viewSchemaMode)

    case CreateView(ResolvedIdentifierInSessionCatalog(ident), userSpecifiedColumns, comment,
        collation, properties, originalText, child, allowExisting, replace, viewSchemaMode) =>
      CreateViewCommand(
        name = ident,
        userSpecifiedColumns = userSpecifiedColumns,
        comment = comment,
        collation = collation,
        properties = properties,
        originalText = originalText,
        plan = child,
        allowExisting = allowExisting,
        replace = replace,
        viewType = PersistedView,
        viewSchemaMode = viewSchemaMode)

    case CreateView(ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _) =>
      throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "views")

    case ShowViews(ns: ResolvedNamespace, pattern, output) =>
      ns match {
        case ResolvedDatabaseInSessionCatalog(db) => ShowViewsCommand(db, pattern, output)
        case _ =>
          throw QueryCompilationErrors.missingCatalogAbilityError(ns.catalog, "views")
      }

    // If target is view, force use v1 command
    case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) =>
      ShowTablePropertiesCommand(ident, propertyKey, output)

    case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output)
        if conf.useV1Command =>
      ShowTablePropertiesCommand(ident, propertyKey, output)

    case DescribeFunction(ResolvedNonPersistentFunc(_, V1Function(info)), extended) =>
      DescribeFunctionCommand(info, extended)

    case DescribeFunction(ResolvedPersistentFunc(catalog, _, func), extended) =>
      if (isSessionCatalog(catalog)) {
        DescribeFunctionCommand(func.asInstanceOf[V1Function].info, extended)
      } else {
        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "functions")
      }

    case ShowFunctions(
        ResolvedDatabaseInSessionCatalog(db), userScope, systemScope, pattern, output) =>
      ShowFunctionsCommand(db, pattern, userScope, systemScope, output)

    case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
      if (isSessionCatalog(catalog)) {
        val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
          identifier.asFunctionIdentifier)
        DropFunctionCommand(funcIdentifier, ifExists, false)
      } else {
        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "DROP FUNCTION")
      }

    case RefreshFunction(ResolvedPersistentFunc(catalog, identifier, _)) =>
      if (isSessionCatalog(catalog)) {
        val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
          identifier.asFunctionIdentifier)
        RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
      } else {
        throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
      }

    case CreateFunction(
        ResolvedIdentifierInSessionCatalog(ident), className, resources, ifExists, replace) =>
      CreateFunctionCommand(
        FunctionIdentifier(ident.table, ident.database, ident.catalog),
        className,
        resources,
        false,
        ifExists,
        replace)

    case CreateFunction(ResolvedIdentifier(catalog, _), _, _, _, _) =>
      throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")

    case c @ CreateUserDefinedFunction(
        ResolvedIdentifierInSessionCatalog(ident), _, _, _, _, _, _, _, _, _, _, _) =>
      CreateUserDefinedFunctionCommand(
        FunctionIdentifier(ident.table, ident.database, ident.catalog),
        c.inputParamText,
        c.returnTypeText,
        c.exprText,
        c.queryText,
        c.comment,
        c.isDeterministic,
        c.containsSQL,
        c.language,
        c.isTableFunc,
        isTemp = false,
        c.ignoreIfExists,
        c.replace)

    case CreateUserDefinedFunction(
        ResolvedIdentifier(catalog, _), _, _, _, _, _, _, _, _, _, _, _) =>
      throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "CREATE FUNCTION")
  }