in reverse_replication/reverse-replication-runner.go [417:486]
func validateOrCreateChangeStream(ctx context.Context, adminClient *database.DatabaseAdminClient, spClient *spanner.Client, dbUri string) error {
q := `SELECT * FROM information_schema.change_streams`
stmt := spanner.Statement{
SQL: q,
}
iter := spClient.Single().Query(ctx, stmt)
defer iter.Stop()
var cs_catalog, cs_schema, cs_name string
var coversAll bool
csExists := false
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("couldn't read row from change_streams table: %w", err)
}
err = row.Columns(&cs_catalog, &cs_schema, &cs_name, &coversAll)
if err != nil {
return fmt.Errorf("can't scan row from change_streams table: %v", err)
}
if cs_name == changeStreamName {
csExists = true
fmt.Printf("Found changestream %s\n", changeStreamName)
break
}
}
if !csExists {
fmt.Printf("changestream %s not found\n", changeStreamName)
err := createChangeStream(ctx, adminClient, dbUri)
if err != nil {
return fmt.Errorf("could not create changestream: %v", err)
}
return nil
}
q = `SELECT option_value FROM information_schema.change_stream_options WHERE change_stream_name = @p1 AND option_name = 'value_capture_type'`
stmt = spanner.Statement{
SQL: q,
Params: map[string]interface{}{
"p1": changeStreamName,
},
}
iter = spClient.Single().Query(ctx, stmt)
defer iter.Stop()
var option_value string
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("couldn't read row from change_stream_options table: %w", err)
}
err = row.Columns(&option_value)
if err != nil {
return fmt.Errorf("can't scan row from change_stream_options table: %v", err)
}
if option_value != "NEW_ROW" {
return fmt.Errorf("VALUE_CAPTURE_TYPE for changestream %s is not NEW_ROW. Please update the changestream option or create a new one", changeStreamName)
}
}
if !coversAll {
fmt.Printf("\nWARNING: watching definition for the existing changestream %s is not 'ALL'."+
" This means only specific tables and columns are tracked."+
" Only the tables and columns watched by this changestream will get reverse replicated.\n\n", changeStreamName)
}
fmt.Println("Skipping changestream creation ...")
return nil
}