absl::Status SchemaUpdaterImpl::CreateTable()

in backend/schema/updater/schema_updater.cc [2841:3005]


absl::Status SchemaUpdaterImpl::CreateTable(
    const ddl::CreateTable& ddl_table,
    const database_api::DatabaseDialect& dialect) {
  if (latest_schema_->tables().size() >= limits::kMaxTablesPerDatabase) {
    return error::TooManyTablesPerDatabase(ddl_table.table_name(),
                                           limits::kMaxTablesPerDatabase);
  }

  if (global_names_.HasName(ddl_table.table_name()) &&
      ddl_table.existence_modifier() == ddl::IF_NOT_EXISTS) {
    return absl::OkStatus();
  }

  ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", ddl_table.table_name()));

  Table::Builder builder;
  std::optional<uint32_t> oid = pg_oid_assigner_->GetNextPostgresqlOid();
  builder.set_postgresql_oid(oid);
  if (oid.has_value()) {
    ZETASQL_VLOG(2) << "Assigned oid " << oid.value() << " to table "
            << ddl_table.table_name();
  }
  builder.set_id(table_id_generator_->NextId(ddl_table.table_name()))
      .set_name(ddl_table.table_name());

  for (const ddl::ColumnDefinition& ddl_column : ddl_table.column()) {
    ZETASQL_ASSIGN_OR_RETURN(
        const Column* column,
        CreateColumn(ddl_column, builder.get(), &ddl_table, dialect));
    builder.add_column(column);
  }

  for (const Column* column : builder.get()->columns()) {
    if (column->is_generated()) {
      const_cast<Column*>(column)->PopulateDependentColumns();
    }
  }

  // Some constraints have a dependency on the primary key, so create it first.
  for (const ddl::KeyPartClause& ddl_key_part : ddl_table.primary_key()) {
    ZETASQL_RETURN_IF_ERROR(CreatePrimaryKeyConstraint(ddl_key_part, &builder,
                                               /*with_oid=*/true));
  }
  // Assign OID for PRIMARY KEY index.
  oid = pg_oid_assigner_->GetNextPostgresqlOid();
  builder.set_primary_key_index_postgresql_oid(oid);
  if (oid.has_value()) {
    ZETASQL_VLOG(2) << "Assigned oid " << oid.value()
            << " for PRIMARY KEY index on table " << ddl_table.table_name();
  }

  for (const ddl::ForeignKey& ddl_foreign_key : ddl_table.foreign_key()) {
    ZETASQL_RETURN_IF_ERROR(CreateForeignKeyConstraint(ddl_foreign_key, builder.get()));
  }
  if (ddl_table.has_interleave_clause()) {
    ZETASQL_RETURN_IF_ERROR(
        CreateInterleaveConstraint(ddl_table.interleave_clause(), &builder));
    if (ddl_table.interleave_clause().type() ==
        ddl::InterleaveClause::IN_PARENT) {
      oid = pg_oid_assigner_->GetNextPostgresqlOid();
      builder.set_interleave_in_parent_postgresql_oid(oid);
      if (oid.has_value()) {
        ZETASQL_VLOG(2) << "Assigned oid " << oid.value()
                << " for IN_PARENT interleave on table "
                << ddl_table.table_name();
      }
    }
  }
  for (const ddl::CheckConstraint& ddl_check_constraint :
       ddl_table.check_constraint()) {
    const Table* table = builder.get();
    if (dialect == database_api::DatabaseDialect::POSTGRESQL) {
      ddl::CheckConstraint mutable_check_constraint = ddl_check_constraint;
      ZETASQL_RET_CHECK(mutable_check_constraint.has_expression_origin());
      ZETASQL_ASSIGN_OR_RETURN(ExpressionTranslateResult result,
                       TranslatePostgreSqlExpression(
                           table, &ddl_table,
                           mutable_check_constraint.expression_origin()
                               .original_expression()));
      mutable_check_constraint.mutable_expression_origin()
          ->set_original_expression(result.original_postgresql_expression);
      mutable_check_constraint.set_expression(
          result.translated_googlesql_expression);
      ZETASQL_RETURN_IF_ERROR(
          CreateCheckConstraint(mutable_check_constraint, table, &ddl_table));
    } else {
      ZETASQL_RETURN_IF_ERROR(
          CreateCheckConstraint(ddl_check_constraint, table, &ddl_table));
    }
  }

  if (ddl_table.has_row_deletion_policy()) {
    ZETASQL_RETURN_IF_ERROR(
        CreateRowDeletionPolicy(ddl_table.row_deletion_policy(), &builder));
  }
  if (ddl_table.has_synonym()) {
    ZETASQL_RETURN_IF_ERROR(global_names_.AddName("Table", ddl_table.synonym()));
    builder.set_synonym(ddl_table.synonym());
  }
  if (builder.get()->is_trackable_by_change_stream()) {
    // If change streams implicitly tracking the entire database, newly added
    // tables should be automatically watched by those change streams.
    for (const ChangeStream* change_stream : latest_schema_->change_streams()) {
      if (change_stream->track_all()) {
        std::vector<std::string> columns = builder.get()->trackable_columns();
        for (const Column* column : builder.get()->columns()) {
          if (column->is_trackable_by_change_stream() &&
              !column->table()->FindKeyColumn(column->Name())) {
            // Register the trackable columns of the newly added table to the
            // list of change streams implicitly tracking the entire database
            ZETASQL_RETURN_IF_ERROR(AlterNode<Column>(
                column,
                [change_stream](Column::Editor* editor) -> absl::Status {
                  editor->add_change_stream(change_stream);
                  return absl::OkStatus();
                }));
          }
        }
        // Register the newly added table to the list of change streams
        // implicitly tracking the entire database
        ZETASQL_RETURN_IF_ERROR(AlterNode<ChangeStream>(
            change_stream,
            [ddl_table, columns](
                ChangeStream::Editor* change_stream_editor) -> absl::Status {
              change_stream_editor->add_tracked_tables_columns(
                  ddl_table.table_name(), columns);
              return absl::OkStatus();
            }));
        // Register the change stream tracking the entire database to the newly
        // added table
        builder.add_change_stream(change_stream);
      }
    }
  }

  if (!ddl_table.set_options().empty()) {
    ZETASQL_RETURN_IF_ERROR(AlterNode<Table>(
        builder.get(),
        [this, &ddl_table](Table::Editor* editor) -> absl::Status {
          return SetTableOptions(ddl_table.set_options(), editor);
        }));
  }

  if (SDLObjectName::IsFullyQualifiedName(ddl_table.table_name())) {
    const absl::string_view schema_name =
        SDLObjectName::GetSchemaName(ddl_table.table_name());
    const NamedSchema* named_schema =
        latest_schema_->FindNamedSchema(std::string(schema_name));
    if (named_schema == nullptr) {
      return error::NamedSchemaNotFound(schema_name);
    }

    ZETASQL_RETURN_IF_ERROR(AlterNode<NamedSchema>(
        named_schema, [&](NamedSchema::Editor* editor) -> absl::Status {
          editor->add_table(builder.get());
          if (ddl_table.has_synonym()) {
            editor->add_synonym(builder.get());
          }
          return absl::OkStatus();
        }));
  }

  ZETASQL_RETURN_IF_ERROR(AddNode(builder.build()));
  return absl::OkStatus();
}