in backend/schema/updater/schema_updater.cc [2127:2221]
absl::Status SchemaUpdaterImpl::ValidateChangeStreamForClause(
const ddl::ChangeStreamForClause& change_stream_for_clause,
absl::string_view change_stream_name) {
if (change_stream_for_clause.has_all()) {
return absl::OkStatus();
}
if (!change_stream_for_clause.has_tracked_tables()) {
return error::CreateChangeStreamForClauseInvalidOneof(change_stream_name);
}
const ddl::ChangeStreamForClause::TrackedTables& tracked_tables =
change_stream_for_clause.tracked_tables();
if (tracked_tables.table_entry_size() == 0) {
return error::CreateChangeStreamForClauseZeroEntriesInTrackedTables(
change_stream_name);
}
absl::flat_hash_set<absl::string_view> tracked_tables_set;
for (const ddl::ChangeStreamForClause::TrackedTables::Entry& entry :
tracked_tables.table_entry()) {
if (!entry.has_table_name()) {
return error::
CreateChangeStreamForClauseTrackedTablesEntryMissingTableName(
change_stream_name);
}
const std::string& table_name = entry.table_name();
// Cannot list the same table more than once.
if (!tracked_tables_set.insert(table_name).second) {
return error::ChangeStreamDuplicateTable(change_stream_name, table_name);
}
// Tracking a change stream is not supported.
if (latest_schema_->FindChangeStream(table_name) != nullptr) {
return error::InvalidTrackedObjectInChangeStream(
change_stream_name, "Change Stream", table_name);
}
// Tracking an index is not supported.
if (latest_schema_->FindIndex(table_name) != nullptr) {
return error::InvalidTrackedObjectInChangeStream(change_stream_name,
"Index", table_name);
}
// TODO: Return error if the change stream is tracking a
// function after function is supported.
// TODO: The parser would have returned a parser error if the
// user specified a FOR clause with table names starting with SPANNER_SYS.*
// or INFORMATION_SCHEMA.*.
const Table* table = latest_schema_->FindTable(table_name);
if (table == nullptr) {
return error::UnsupportedTrackedObjectOrNonExistentTableInChangeStream(
change_stream_name, table_name);
}
// Validate the tracked columns of the change stream
if (!entry.has_all_columns()) {
if (!entry.has_tracked_columns()) {
return error::CreateChangeStreamForClauseTrackedTablesEntryInvalidOneof(
change_stream_name);
}
absl::flat_hash_set<absl::string_view> tracked_columns_set;
// Note: entry.tracked_columns() can contain zero columns, indicating only
// the primary key columns of the table are tracked.
for (const std::string& column_name :
entry.tracked_columns().column_name()) {
// Cannot list the same column more than once.
if (!tracked_columns_set.insert(column_name).second) {
return error::ChangeStreamDuplicateColumn(change_stream_name,
column_name, table_name);
}
const Column* column = table->FindColumn(column_name);
// Check that the column exists in the table
if (column == nullptr) {
return error::NonexistentTrackedColumnInChangeStream(
change_stream_name, column_name, table_name);
}
const KeyColumn* key_column = table->FindKeyColumn(column_name);
// Primary key column should not be listed in the FOR clause.
if (key_column != nullptr) {
return error::KeyColumnInChangeStreamForClause(
change_stream_name, column_name, table_name);
}
if (column->is_generated()) {
// Tracking a non-key Stored Generated column is not supported
return error::InvalidTrackedObjectInChangeStream(
change_stream_name, "non-key generated column",
absl::StrCat(table_name, ".", column_name));
}
}
}
}
return absl::OkStatus();
}