func validateOrCreateChangeStream()

in reverse_replication/reverse-replication-runner.go [417:486]


func validateOrCreateChangeStream(ctx context.Context, adminClient *database.DatabaseAdminClient, spClient *spanner.Client, dbUri string) error {
	q := `SELECT * FROM information_schema.change_streams`
	stmt := spanner.Statement{
		SQL: q,
	}
	iter := spClient.Single().Query(ctx, stmt)
	defer iter.Stop()
	var cs_catalog, cs_schema, cs_name string
	var coversAll bool
	csExists := false
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("couldn't read row from change_streams table: %w", err)
		}
		err = row.Columns(&cs_catalog, &cs_schema, &cs_name, &coversAll)
		if err != nil {
			return fmt.Errorf("can't scan row from change_streams table: %v", err)
		}
		if cs_name == changeStreamName {
			csExists = true
			fmt.Printf("Found changestream %s\n", changeStreamName)
			break
		}
	}
	if !csExists {
		fmt.Printf("changestream %s not found\n", changeStreamName)
		err := createChangeStream(ctx, adminClient, dbUri)
		if err != nil {
			return fmt.Errorf("could not create changestream: %v", err)
		}
		return nil
	}
	q = `SELECT option_value FROM information_schema.change_stream_options WHERE change_stream_name = @p1 AND option_name = 'value_capture_type'`
	stmt = spanner.Statement{
		SQL: q,
		Params: map[string]interface{}{
			"p1": changeStreamName,
		},
	}
	iter = spClient.Single().Query(ctx, stmt)
	defer iter.Stop()
	var option_value string
	for {
		row, err := iter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("couldn't read row from change_stream_options table: %w", err)
		}
		err = row.Columns(&option_value)
		if err != nil {
			return fmt.Errorf("can't scan row from change_stream_options table: %v", err)
		}
		if option_value != "NEW_ROW" {
			return fmt.Errorf("VALUE_CAPTURE_TYPE for changestream %s is not NEW_ROW. Please update the changestream option or create a new one", changeStreamName)
		}
	}
	if !coversAll {
		fmt.Printf("\nWARNING: watching definition for the existing changestream %s is not 'ALL'."+
			" This means only specific tables and columns are tracked."+
			" Only the tables and columns watched by this changestream will get reverse replicated.\n\n", changeStreamName)
	}
	fmt.Println("Skipping changestream creation ...")
	return nil
}