in backend/schema/updater/schema_updater.cc [2841:3005]
absl::Status SchemaUpdaterImpl::CreateTable(
const ddl::CreateTable& ddl_table,
const database_api::DatabaseDialect& dialect) {
if (latest_schema_->tables().size() >= limits::kMaxTablesPerDatabase) {
return error::TooManyTablesPerDatabase(ddl_table.table_name(),
limits::kMaxTablesPerDatabase);
}
if (global_names_.HasName(ddl_table.table_name()) &&
ddl_table.existence_modifier() == ddl::IF_NOT_EXISTS) {
return absl::OkStatus();
}
ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", ddl_table.table_name()));
Table::Builder builder;
std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid();
builder.set_postgresql_oid(oid);
if (oid.has_value()) {
ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to table "
<< ddl_table.table_name();
}
builder.set_id(table_id_generator_->NextId(ddl_table.table_name()))
.set_name(ddl_table.table_name());
for (const ddl::ColumnDefinition& ddl_column : ddl_table.column()) {
ZETASQL_ASSIGN_OR_RETURN(
const Column* column,
CreateColumn(ddl_column, builder.get(), &ddl_table, dialect));
builder.add_column(column);
}
for (const Column* column : builder.get()->columns()) {
if (column->is_generated()) {
const_cast<Column*>(column)->PopulateDependentColumns();
}
}
// Some constraints have a dependency on the primary key, so create it first.
for (const ddl::KeyPartClause& ddl_key_part : ddl_table.primary_key()) {
ZETASQL_RETURN_IF_ERROR(CreatePrimaryKeyConstraint(ddl_key_part, &builder,
/*with_oid=*/true));
}
// Assign OID for PRIMARY KEY index.
oid = pg_oid_assigner_->GetNextPostgresqlOid();
builder.set_primary_key_index_postgresql_oid(oid);
if (oid.has_value()) {
ZETASQL_VLOG(2) << "Assigned oid " << oid.value()
<< " for PRIMARY KEY index on table " << ddl_table.table_name();
}
for (const ddl::ForeignKey& ddl_foreign_key : ddl_table.foreign_key()) {
ZETASQL_RETURN_IF_ERROR(CreateForeignKeyConstraint(ddl_foreign_key, builder.get()));
}
if (ddl_table.has_interleave_clause()) {
ZETASQL_RETURN_IF_ERROR(
CreateInterleaveConstraint(ddl_table.interleave_clause(), &builder));
if (ddl_table.interleave_clause().type() ==
ddl::InterleaveClause::IN_PARENT) {
oid = pg_oid_assigner_->GetNextPostgresqlOid();
builder.set_interleave_in_parent_postgresql_oid(oid);
if (oid.has_value()) {
ZETASQL_VLOG(2) << "Assigned oid " << oid.value()
<< " for IN_PARENT interleave on table "
<< ddl_table.table_name();
}
}
}
for (const ddl::CheckConstraint& ddl_check_constraint :
ddl_table.check_constraint()) {
const Table* table = builder.get();
if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
ddl::CheckConstraint mutable_check_constraint = ddl_check_constraint;
ZETASQL_RET_CHECK(mutable_check_constraint.has_expression_origin());
ZETASQL_ASSIGN_OR_RETURN(ExpressionTranslateResult result,
TranslatePostgreSqlExpression(
table, &ddl_table,
mutable_check_constraint.expression_origin()
.original_expression()));
mutable_check_constraint.mutable_expression_origin()
->set_original_expression(result.original_postgresql_expression);
mutable_check_constraint.set_expression(
result.translated_googlesql_expression);
ZETASQL_RETURN_IF_ERROR(
CreateCheckConstraint(mutable_check_constraint, table, &ddl_table));
} else {
ZETASQL_RETURN_IF_ERROR(
CreateCheckConstraint(ddl_check_constraint, table, &ddl_table));
}
}
if (ddl_table.has_row_deletion_policy()) {
ZETASQL_RETURN_IF_ERROR(
CreateRowDeletionPolicy(ddl_table.row_deletion_policy(), &builder));
}
if (ddl_table.has_synonym()) {
ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", ddl_table.synonym()));
builder.set_synonym(ddl_table.synonym());
}
if (builder.get()->is_trackable_by_change_stream()) {
// If change streams implicitly tracking the entire database, newly added
// tables should be automatically watched by those change streams.
for (const ChangeStream* change_stream : latest_schema_->change_streams()) {
if (change_stream->track_all()) {
std::vector<std::string> columns = builder.get()->trackable_columns();
for (const Column* column : builder.get()->columns()) {
if (column->is_trackable_by_change_stream() &&
!column->table()->FindKeyColumn(column->Name())) {
// Register the trackable columns of the newly added table to the
// list of change streams implicitly tracking the entire database
ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(
column,
[change_stream](Column::Editor* editor) -> absl::Status {
editor->add_change_stream(change_stream);
return absl::OkStatus();
}));
}
}
// Register the newly added table to the list of change streams
// implicitly tracking the entire database
ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>(
change_stream,
[ddl_table, columns](
ChangeStream::Editor* change_stream_editor) -> absl::Status {
change_stream_editor->add_tracked_tables_columns(
ddl_table.table_name(), columns);
return absl::OkStatus();
}));
// Register the change stream tracking the entire database to the newly
// added table
builder.add_change_stream(change_stream);
}
}
}
if (!ddl_table.set_options().empty()) {
ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(
builder.get(),
[this, &ddl_table](Table::Editor* editor) -> absl::Status {
return SetTableOptions(ddl_table.set_options(), editor);
}));
}
if (SDLObjectName::IsFullyQualifiedName(ddl_table.table_name())) {
const absl::string_view schema_name =
SDLObjectName::GetSchemaName(ddl_table.table_name());
const NamedSchema* named_schema =
latest_schema_->FindNamedSchema(std::string(schema_name));
if (named_schema == nullptr) {
return error::NamedSchemaNotFound(schema_name);
}
ZETASQL_RETURN_IF_ERROR(AlterNode<NamedSchema>(
named_schema, [&](NamedSchema::Editor* editor) -> absl::Status {
editor->add_table(builder.get());
if (ddl_table.has_synonym()) {
editor->add_synonym(builder.get());
}
return absl::OkStatus();
}));
}
ZETASQL_RETURN_IF_ERROR(AddNode(builder.build()));
return absl::OkStatus();
}