absl::Status SchemaUpdaterImpl::ValidateChangeStreamForClause()

in backend/schema/updater/schema_updater.cc [2127:2221]


absl::Status SchemaUpdaterImpl::ValidateChangeStreamForClause(
    const ddl::ChangeStreamForClause& change_stream_for_clause,
    absl::string_view change_stream_name) {
  if (change_stream_for_clause.has_all()) {
    return absl::OkStatus();
  }
  if (!change_stream_for_clause.has_tracked_tables()) {
    return error::CreateChangeStreamForClauseInvalidOneof(change_stream_name);
  }
  const ddl::ChangeStreamForClause::TrackedTables& tracked_tables =
      change_stream_for_clause.tracked_tables();
  if (tracked_tables.table_entry_size() == 0) {
    return error::CreateChangeStreamForClauseZeroEntriesInTrackedTables(
        change_stream_name);
  }

  absl::flat_hash_set<absl::string_view> tracked_tables_set;
  for (const ddl::ChangeStreamForClause::TrackedTables::Entry& entry :
       tracked_tables.table_entry()) {
    if (!entry.has_table_name()) {
      return error::
          CreateChangeStreamForClauseTrackedTablesEntryMissingTableName(
              change_stream_name);
    }

    const std::string& table_name = entry.table_name();
    // Cannot list the same table more than once.
    if (!tracked_tables_set.insert(table_name).second) {
      return error::ChangeStreamDuplicateTable(change_stream_name, table_name);
    }

    // Tracking a change stream is not supported.
    if (latest_schema_->FindChangeStream(table_name) != nullptr) {
      return error::InvalidTrackedObjectInChangeStream(
          change_stream_name, "Change Stream", table_name);
    }

    // Tracking an index is not supported.
    if (latest_schema_->FindIndex(table_name) != nullptr) {
      return error::InvalidTrackedObjectInChangeStream(change_stream_name,
                                                       "Index", table_name);
    }

    // TODO: Return error if the change stream is tracking a
    // function after function is supported.
    // TODO: The parser would have returned a parser error if the
    // user specified a FOR clause with table names starting with SPANNER_SYS.*
    // or INFORMATION_SCHEMA.*.

    const Table* table = latest_schema_->FindTable(table_name);
    if (table == nullptr) {
      return error::UnsupportedTrackedObjectOrNonExistentTableInChangeStream(
          change_stream_name, table_name);
    }

    // Validate the tracked columns of the change stream
    if (!entry.has_all_columns()) {
      if (!entry.has_tracked_columns()) {
        return error::CreateChangeStreamForClauseTrackedTablesEntryInvalidOneof(
            change_stream_name);
      }

      absl::flat_hash_set<absl::string_view> tracked_columns_set;
      // Note: entry.tracked_columns() can contain zero columns, indicating only
      // the primary key columns of the table are tracked.
      for (const std::string& column_name :
           entry.tracked_columns().column_name()) {
        // Cannot list the same column more than once.
        if (!tracked_columns_set.insert(column_name).second) {
          return error::ChangeStreamDuplicateColumn(change_stream_name,
                                                    column_name, table_name);
        }
        const Column* column = table->FindColumn(column_name);
        // Check that the column exists in the table
        if (column == nullptr) {
          return error::NonexistentTrackedColumnInChangeStream(
              change_stream_name, column_name, table_name);
        }
        const KeyColumn* key_column = table->FindKeyColumn(column_name);
        // Primary key column should not be listed in the FOR clause.
        if (key_column != nullptr) {
          return error::KeyColumnInChangeStreamForClause(
              change_stream_name, column_name, table_name);
        }
        if (column->is_generated()) {
          // Tracking a non-key Stored Generated column is not supported
          return error::InvalidTrackedObjectInChangeStream(
              change_stream_name, "non-key generated column",
              absl::StrCat(table_name, ".", column_name));
        }
      }
    }
  }
  return absl::OkStatus();
}