absl::Status SchemaUpdaterImpl::AlterTable()

in backend/schema/updater/schema_updater.cc [4845:5092]


absl::Status SchemaUpdaterImpl::AlterTable(
    const ddl::AlterTable& alter_table,
    const database_api::DatabaseDialect& dialect) {
  const Table* table =
      latest_schema_->FindTableCaseSensitive(alter_table.table_name());
  if (table == nullptr) {
    return error::TableNotFound(alter_table.table_name());
  }

  switch (alter_table.alter_type_case()) {
    case ddl::AlterTable::kSetInterleaveClause: {
      return AlterInterleaveAction(
          alter_table.set_interleave_clause().interleave_clause().on_delete(),
          table);
    }
    case ddl::AlterTable::kSetOnDelete: {
      return AlterInterleaveAction(alter_table.set_on_delete().action(), table);
    }
    case ddl::AlterTable::kAddCheckConstraint: {
      if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
        ddl::CheckConstraint mutable_check_constraint =
            alter_table.add_check_constraint().check_constraint();
        ZETASQL_RET_CHECK(mutable_check_constraint.has_expression_origin());
        ZETASQL_ASSIGN_OR_RETURN(ExpressionTranslateResult result,
                         TranslatePostgreSqlExpression(
                             table, /*ddl_create_table=*/nullptr,
                             mutable_check_constraint.expression_origin()
                                 .original_expression()));
        mutable_check_constraint.mutable_expression_origin()
            ->set_original_expression(result.original_postgresql_expression);
        mutable_check_constraint.set_expression(
            result.translated_googlesql_expression);
        return AddCheckConstraint(mutable_check_constraint, table);
      }
      return AddCheckConstraint(
          alter_table.add_check_constraint().check_constraint(), table);
    }
    case ddl::AlterTable::kAddForeignKey: {
      return AddForeignKey(alter_table.add_foreign_key().foreign_key(), table);
    }
    case ddl::AlterTable::kDropConstraint: {
      return DropConstraint(alter_table.drop_constraint().name(), table);
    }
    case ddl::AlterTable::kAddColumn: {
      const auto& column_def = alter_table.add_column().column();
      // If the column exists but IF_NOT_EXISTS is set then we're fine.
      if (table->FindColumn(column_def.column_name()) != nullptr &&
          alter_table.add_column().existence_modifier() == ddl::IF_NOT_EXISTS) {
        return absl::OkStatus();
      }
      ZETASQL_ASSIGN_OR_RETURN(const Column* new_column,
                       CreateColumn(column_def, table, /*ddl_table=*/
                                    nullptr, dialect));
      if (new_column->is_generated()) {
        const_cast<Column*>(new_column)->PopulateDependentColumns();
      }

      if (new_column->is_trackable_by_change_stream()) {
        // Add the newly added column to tracking objects map for each change
        // stream implicitly/explicitly tracking the entire table this column
        // belongs to
        for (const ChangeStream* change_stream : table->change_streams()) {
          ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>(
              change_stream,
              [table, new_column](
                  ChangeStream::Editor* change_stream_editor) -> absl::Status {
                change_stream_editor->add_tracked_table_column(
                    table->Name(), new_column->Name());
                return absl::OkStatus();
              }));
        }
        // Populate the list of change streams tracking this column
        ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(
            new_column, [table](Column::Editor* column_editor) -> absl::Status {
              for (const ChangeStream* change_stream :
                   table->change_streams()) {
                column_editor->add_change_stream(change_stream);
              }
              return absl::OkStatus();
            }));
      }

      ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(
          table, [new_column](Table::Editor* editor) -> absl::Status {
            editor->add_column(new_column);
            return absl::OkStatus();
          }));
      return absl::OkStatus();
    }
    case ddl::AlterTable::kAlterColumn: {
      const std::string& column_name =
          alter_table.alter_column().column().column_name();
      const Column* column = table->FindColumn(column_name);
      if (column == nullptr) {
        return error::ColumnNotFound(table->Name(), column_name);
      }
      const auto& alter_column = alter_table.alter_column();
      if (alter_column.has_operation()) {
        if (alter_column.operation() ==
            ddl::AlterTable::AlterColumn::ALTER_IDENTITY) {
          if (!column->is_identity_column()) {
            return error::ColumnIsNotIdentityColumn(table->Name(), column_name);
          }
          ZETASQL_RET_CHECK(column->sequences_used().size() == 1);
          const Sequence* sequence =
              static_cast<const Sequence*>(column->sequences_used().at(0));
          const auto& column_def = alter_column.column();
          ddl::AlterSequence alter_sequence;
          alter_sequence.set_sequence_name(sequence->Name());
          if (alter_column.identity_alter_start_with_counter()) {
            ddl::SetOption* start_with_counter =
                alter_sequence.mutable_set_options()->add_options();
            start_with_counter->set_option_name("start_with_counter");
            start_with_counter->set_int64_value(
                column_def.identity_column().start_with_counter());
          }
          if (alter_column.identity_alter_skip_range()) {
            ddl::SetOption* skip_range_min =
                alter_sequence.mutable_set_options()->add_options();
            ddl::SetOption* skip_range_max =
                alter_sequence.mutable_set_options()->add_options();
            skip_range_min->set_option_name("skip_range_min");
            skip_range_max->set_option_name("skip_range_max");
            if (column_def.identity_column().has_skip_range_min()) {
              skip_range_min->set_int64_value(
                  column_def.identity_column().skip_range_min());
              skip_range_max->set_int64_value(
                  column_def.identity_column().skip_range_max());
            } else {
              skip_range_min->set_null_value(true);
              skip_range_max->set_null_value(true);
            }
          }
          ZETASQL_RETURN_IF_ERROR(AlterSequence(alter_sequence, sequence));
        } else {
          ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(
              column,
              [this, &alter_column, &column, &table,
               &dialect](Column::Editor* editor) -> absl::Status {
                return AlterColumnSetDropDefault(alter_column, table, column,
                                                 dialect, editor);
              }));
        }
      } else {
        const auto& column_def = alter_column.column();
        ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(
            column,
            [this, &column_def, &table,
             dialect](Column::Editor* editor) -> absl::Status {
              return AlterColumnDefinition(column_def, table, dialect, editor);
            }));
      }
      return absl::OkStatus();
    }
    case ddl::AlterTable::kDropColumn: {
      const Column* column = table->FindColumn(alter_table.drop_column());
      if (column == nullptr) {
        return error::ColumnNotFound(table->Name(), alter_table.drop_column());
      }
      if (!column->change_streams_explicitly_tracking_column().empty()) {
        std::vector<std::string> change_stream_names_list;
        for (const ChangeStream* const change_stream :
             column->change_streams()) {
          change_stream_names_list.push_back(change_stream->Name());
        }
        std::string change_stream_names =
            absl::StrJoin(change_stream_names_list, ",");
        return error::DropColumnWithChangeStream(
            table->Name(), column->Name(), change_stream_names_list.size(),
            change_stream_names);
      }
      ZETASQL_RETURN_IF_ERROR(DropNode(column));
      if (column->is_identity_column()) {
        ZETASQL_RET_CHECK(column->sequences_used().size() == 1);
        const Sequence* sequence =
            static_cast<const Sequence*>(column->sequences_used().at(0));
        ZETASQL_RETURN_IF_ERROR(DropSequence(sequence));
      }

      if (column->locality_group() != nullptr) {
        ZETASQL_RETURN_IF_ERROR(AlterNode<LocalityGroup>(
            column->locality_group(),
            [](LocalityGroup::Editor* editor) -> absl::Status {
              editor->decrement_use_count();
              return absl::OkStatus();
            }));
      }

      return absl::OkStatus();
    }
    case ddl::AlterTable::kAddRowDeletionPolicy: {
      const auto& policy = alter_table.add_row_deletion_policy();
      if (!table->row_deletion_policy().has_value()) {
        return AlterRowDeletionPolicy(policy, table);
      } else {
        return error::RowDeletionPolicyAlreadyExists(policy.column_name(),
                                                     table->Name());
      }
    }
    case ddl::AlterTable::kAlterRowDeletionPolicy: {
      const auto& policy = alter_table.alter_row_deletion_policy();
      if (table->row_deletion_policy().has_value()) {
        return AlterRowDeletionPolicy(policy, table);
      } else {
        return error::RowDeletionPolicyDoesNotExist(table->Name());
      }
    }
    case ddl::AlterTable::kDropRowDeletionPolicy: {
      if (table->row_deletion_policy().has_value()) {
        return AlterRowDeletionPolicy(std::nullopt, table);
      } else {
        return error::RowDeletionPolicyDoesNotExist(table->Name());
      }
    }
    case ddl::AlterTable::kRenameTo: {
      const auto& rename_to = alter_table.rename_to();
      return RenameTo(rename_to, table);
    }
    case ddl::AlterTable::kAddSynonym: {
      const auto& add_synonym = alter_table.add_synonym();
      if (table->synonym().empty()) {
        return AddSynonym(add_synonym.synonym(), table);
      } else {
        return error::SynonymAlreadyExists(table->synonym(), table->Name());
      }
    }
    case ddl::AlterTable::kDropSynonym: {
      const auto& drop_synonym = alter_table.drop_synonym();
      if (drop_synonym.synonym() == table->synonym()) {
        return DropSynonym(drop_synonym, table);
      } else {
        return error::SynonymDoesNotExist(drop_synonym.synonym(),
                                          table->Name());
      }
    }
    case ddl::AlterTable::kSetOptions: {
      ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(
          table, [this, &alter_table](Table::Editor* editor) -> absl::Status {
            return SetTableOptions(alter_table.set_options().options(), editor);
          }));
      return absl::OkStatus();
    }
    default:
      return error::Internal(absl::StrCat("Unsupported alter table type: ",
                                          alter_table.DebugString()));
  }
  return absl::OkStatus();
}