accessors/spanner/spanner_accessor.go (398 lines of code) (raw):
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package spanneraccessor
import (
"context"
"fmt"
"strings"
"sync"
"time"
"cloud.google.com/go/spanner"
"cloud.google.com/go/spanner/admin/database/apiv1/databasepb"
"cloud.google.com/go/spanner/admin/instance/apiv1/instancepb"
spanneradmin "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/admin"
spannerclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/client"
spinstanceadmin "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/spanner/instanceadmin"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/parse"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/logger"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
"go.uber.org/zap"
"google.golang.org/api/iterator"
adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1"
)
var (
// Set the maximum number of concurrent workers during foreign key creation.
// This number should not be too high so as to not hit the AdminQuota limit.
// AdminQuota limits are mentioned here: https://cloud.google.com/spanner/quotas#administrative_limits
// If facing a quota limit error, consider reducing this value.
MaxWorkers = 50
)
// The SpannerAccessor provides methods that internally use a spanner client (can be adminClient/databaseclient/instanceclient etc).
// Methods should only contain generic logic here that can be used by multiple workflows.
type SpannerAccessor interface {
// Fetch the dialect of the spanner database.
GetDatabaseDialect(ctx context.Context, dbURI string) (string, error)
// CheckExistingDb checks whether the database with dbURI exists or not.
// If API call doesn't respond then user is informed after every 5 minutes on command line.
CheckExistingDb(ctx context.Context, dbURI string) (bool, error)
// Create a database with no schema.
CreateEmptyDatabase(ctx context.Context, dbURI string) error
// Fetch the leader of the Spanner instance.
GetSpannerLeaderLocation(ctx context.Context, instanceURI string) (string, error)
// Check if a change stream already exists.
CheckIfChangeStreamExists(ctx context.Context, changeStreamName, dbURI string) (bool, error)
// Validate that change stream option 'VALUE_CAPTURE_TYPE' is 'NEW_ROW'.
ValidateChangeStreamOptions(ctx context.Context, changeStreamName, dbURI string) error
// Create a change stream with default options.
CreateChangeStream(ctx context.Context, changeStreamName, dbURI string) error
// Create new Database using conv
CreateDatabase(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) error
// Update Database using conv
UpdateDatabase(ctx context.Context, dbURI string, conv *internal.Conv, driver string) error
// Updates an existing Spanner database or create a new one if one does not exist using Conv
CreateOrUpdateDatabase(ctx context.Context, dbURI, driver string, conv *internal.Conv, migrationType string) error
// Check whether the db exists and if it does, verify if the schema is what we currently support.
VerifyDb(ctx context.Context, dbURI string) (dbExists bool, err error)
// Verify if an existing DB's ddl follows what is supported by Spanner migration tool. Currently, we only support empty schema when db already exists.
ValidateDDL(ctx context.Context, dbURI string) error
// UpdateDDLForeignKeys updates the Spanner database with foreign key constraints using ALTER TABLE statements.
UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string)
// Deletes a database.
DropDatabase(ctx context.Context, dbURI string) error
//Runs a query against the provided spanner database and returns if the executed DML is validate or not
ValidateDML(ctx context.Context, query string) (bool, error)
TableExists(ctx context.Context, tableName string) (bool, error)
}
// This implements the SpannerAccessor interface. This is the primary implementation that should be used in all places other than tests.
type SpannerAccessorImpl struct {
InstanceClient spinstanceadmin.InstanceAdminClient
AdminClient spanneradmin.AdminClient
SpannerClient spannerclient.SpannerClient
}
func NewSpannerAccessorClientImpl(ctx context.Context) (*SpannerAccessorImpl, error) {
instanceClient, err := spinstanceadmin.NewInstanceAdminClientImpl(ctx)
if err != nil {
return nil, err
}
adminClient, err := spanneradmin.NewAdminClientImpl(ctx)
if err != nil {
return nil, err
}
return &SpannerAccessorImpl{InstanceClient: instanceClient, AdminClient: adminClient}, nil
}
func NewSpannerAccessorClientImplWithSpannerClient(ctx context.Context, dbURI string) (*SpannerAccessorImpl, error) {
instanceClient, err := spinstanceadmin.NewInstanceAdminClientImpl(ctx)
if err != nil {
return nil, err
}
adminClient, err := spanneradmin.NewAdminClientImpl(ctx)
if err != nil {
return nil, err
}
spannerClient, err := spannerclient.NewSpannerClientImpl(ctx, dbURI)
if err != nil {
return nil, err
}
return &SpannerAccessorImpl{InstanceClient: instanceClient, AdminClient: adminClient, SpannerClient: spannerClient}, nil
}
func (sp *SpannerAccessorImpl) GetDatabaseDialect(ctx context.Context, dbURI string) (string, error) {
result, err := sp.AdminClient.GetDatabase(ctx, &databasepb.GetDatabaseRequest{Name: dbURI})
if err != nil {
return "", fmt.Errorf("cannot connect to database: %v", err)
}
return strings.ToLower(result.DatabaseDialect.String()), nil
}
func (sp *SpannerAccessorImpl) CheckExistingDb(ctx context.Context, dbURI string) (bool, error) {
gotResponse := make(chan bool)
var err error
go func() {
_, err = sp.AdminClient.GetDatabase(ctx, &databasepb.GetDatabaseRequest{Name: dbURI})
gotResponse <- true
}()
for {
select {
case <-time.After(5 * time.Minute):
logger.Log.Debug("WARNING! API call not responding: make sure that spanner api endpoint is configured properly")
case <-gotResponse:
if err != nil {
if parse.ContainsAny(strings.ToLower(err.Error()), []string{"database not found"}) {
return false, nil
}
return false, fmt.Errorf("can't get database info: %s", err)
}
return true, nil
}
}
}
func (sp *SpannerAccessorImpl) CreateEmptyDatabase(ctx context.Context, dbURI string) error {
project, instance, dbName := parse.ParseDbURI(dbURI)
req := &databasepb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", project, instance),
CreateStatement: "CREATE DATABASE `" + dbName + "`",
}
op, err := sp.AdminClient.CreateDatabase(ctx, req)
if err != nil {
return fmt.Errorf("can't build CreateDatabaseRequest: %w", parse.AnalyzeError(err, dbURI))
}
if _, err := op.Wait(ctx); err != nil {
return fmt.Errorf("createDatabase call failed: %w", parse.AnalyzeError(err, dbURI))
}
return nil
}
func (sp *SpannerAccessorImpl) GetSpannerLeaderLocation(ctx context.Context, instanceURI string) (string, error) {
instanceInfo, err := sp.InstanceClient.GetInstance(ctx, &instancepb.GetInstanceRequest{Name: instanceURI})
if err != nil {
return "", err
}
instanceConfig, err := sp.InstanceClient.GetInstanceConfig(ctx, &instancepb.GetInstanceConfigRequest{Name: instanceInfo.Config})
if err != nil {
return "", err
}
for _, replica := range instanceConfig.Replicas {
if replica.DefaultLeaderLocation {
return replica.Location, nil
}
}
return "", fmt.Errorf("no leader found for spanner instance %s while trying fetch location", instanceURI)
}
// Consider using a CreateChangestream operation and check for alreadyExists error. That uses adminClient which can be unit tested.
func (sp *SpannerAccessorImpl) CheckIfChangeStreamExists(ctx context.Context, changeStreamName, dbURI string) (bool, error) {
spClient, err := spannerclient.GetOrCreateClient(ctx, dbURI)
if err != nil {
return false, err
}
stmt := spanner.Statement{
SQL: `SELECT CHANGE_STREAM_NAME FROM information_schema.change_streams`,
}
iter := spClient.Single().Query(ctx, stmt)
defer iter.Stop()
var cs_name string
csExists := false
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return false, fmt.Errorf("couldn't read row from change_streams table: %w", err)
}
err = row.Columns(&cs_name)
if err != nil {
return false, fmt.Errorf("can't scan row from change_streams table: %v", err)
}
if cs_name == changeStreamName {
csExists = true
break
}
}
return csExists, nil
}
func (sp *SpannerAccessorImpl) ValidateChangeStreamOptions(ctx context.Context, changeStreamName, dbURI string) error {
spClient, err := spannerclient.GetOrCreateClient(ctx, dbURI)
if err != nil {
return err
}
// Validate if change stream options are set correctly.
stmt := spanner.Statement{
SQL: `SELECT option_value FROM information_schema.change_stream_options
WHERE change_stream_name = @p1 AND option_name = 'value_capture_type'`,
Params: map[string]interface{}{
"p1": changeStreamName,
},
}
iter := spClient.Single().Query(ctx, stmt)
defer iter.Stop()
var option_value string
for {
row, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return fmt.Errorf("couldn't read row from change_stream_options table: %w", err)
}
err = row.Columns(&option_value)
if err != nil {
return fmt.Errorf("can't scan row from change_stream_options table: %v", err)
}
if option_value != "NEW_ROW" {
return fmt.Errorf("VALUE_CAPTURE_TYPE for changestream %s is not NEW_ROW. Please update the changestream option or create a new one", changeStreamName)
}
}
return nil
}
func (sp *SpannerAccessorImpl) CreateChangeStream(ctx context.Context, changeStreamName, dbURI string) error {
op, err := sp.AdminClient.UpdateDatabaseDdl(ctx, &databasepb.UpdateDatabaseDdlRequest{
Database: dbURI,
// TODO: create change stream for only the tables present in Spanner.
Statements: []string{fmt.Sprintf("CREATE CHANGE STREAM %s FOR ALL OPTIONS (value_capture_type = 'NEW_ROW', retention_period = '7d')", changeStreamName)},
})
if err != nil {
return fmt.Errorf("cannot submit request create change stream request: %v", err)
}
if err := op.Wait(ctx); err != nil {
return fmt.Errorf("could not update database ddl: %v", err)
} else {
logger.Log.Debug("Successfully created changestream", zap.String("changeStreamName", changeStreamName))
}
return nil
}
// CreateDatabase returns a newly create Spanner DB.
// It automatically determines an appropriate project, selects a
// Spanner instance to use, generates a new Spanner DB name,
// and call into the Spanner admin interface to create the new DB.
func (sp *SpannerAccessorImpl) CreateDatabase(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) error {
project, instance, dbName := parse.ParseDbURI(dbURI)
// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Foreign Keys are set to false since we create them post data migration.
req := &adminpb.CreateDatabaseRequest{
Parent: fmt.Sprintf("projects/%s/instances/%s", project, instance),
}
if conv.SpDialect == constants.DIALECT_POSTGRESQL {
// PostgreSQL dialect doesn't support:
// a) backticks around the database name, and
// b) DDL statements as part of a CreateDatabase operation (so schema
// must be set using a separate UpdateDatabase operation).
req.CreateStatement = "CREATE DATABASE \"" + dbName + "\""
req.DatabaseDialect = adminpb.DatabaseDialect_POSTGRESQL
} else {
req.CreateStatement = "CREATE DATABASE `" + dbName + "`"
if migrationType == constants.DATAFLOW_MIGRATION {
req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
} else {
req.ExtraStatements = ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
}
}
op, err := sp.AdminClient.CreateDatabase(ctx, req)
if err != nil {
return fmt.Errorf("can't build CreateDatabaseRequest: %w", parse.AnalyzeError(err, dbURI))
}
if _, err := op.Wait(ctx); err != nil {
return fmt.Errorf("createDatabase call failed: %w", parse.AnalyzeError(err, dbURI))
}
if conv.SpDialect == constants.DIALECT_POSTGRESQL {
// Update schema separately for PG databases.
return sp.UpdateDatabase(ctx, dbURI, conv, driver)
}
return nil
}
func (sp *SpannerAccessorImpl) TableExists(ctx context.Context, tableName string) (bool, error) {
query := fmt.Sprintf("SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '%s'", tableName)
iter := sp.SpannerClient.Single().Query(ctx, spanner.Statement{SQL: query})
defer iter.Stop()
_, err := iter.Next()
if err == iterator.Done {
return false, nil // Table does not exist
}
if err != nil {
return false, fmt.Errorf("error checking table existence: %v", err)
}
return true, nil // Table exists
}
// UpdateDatabase updates an existing spanner database.
func (sp *SpannerAccessorImpl) UpdateDatabase(ctx context.Context, dbURI string, conv *internal.Conv, driver string) error {
// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Foreign Keys are set to false since we create them post data migration.
schema := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: true, ForeignKeys: false, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, conv.SpSequences)
req := &adminpb.UpdateDatabaseDdlRequest{
Database: dbURI,
Statements: schema,
}
// Update queries for postgres as target db return response after more
// than 1 min for large schemas, therefore, timeout is specified as 5 minutes
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
defer cancel()
op, err := sp.AdminClient.UpdateDatabaseDdl(ctx, req)
if err != nil {
return fmt.Errorf("can't build UpdateDatabaseDdlRequest: %w", parse.AnalyzeError(err, dbURI))
}
if err := op.Wait(ctx); err != nil {
return fmt.Errorf("UpdateDatabaseDdl call failed: %w", parse.AnalyzeError(err, dbURI))
}
return nil
}
// CreatesOrUpdatesDatabase updates an existing Spanner database or creates a new one if one does not exist.
func (sp *SpannerAccessorImpl) CreateOrUpdateDatabase(ctx context.Context, dbURI, driver string, conv *internal.Conv, migrationType string) error {
dbExists, err := sp.VerifyDb(ctx, dbURI)
if err != nil {
return err
}
if dbExists {
if conv.SpDialect != constants.DIALECT_POSTGRESQL && migrationType == constants.DATAFLOW_MIGRATION {
return fmt.Errorf("spanner migration tool does not support minimal downtime schema/schema-and-data migrations to an existing database")
}
err := sp.UpdateDatabase(ctx, dbURI, conv, driver)
if err != nil {
return fmt.Errorf("can't update database schema: %v", err)
}
} else {
err := sp.CreateDatabase(ctx, dbURI, conv, driver, migrationType)
if err != nil {
return fmt.Errorf("can't create database: %v", err)
}
}
return nil
}
// VerifyDb checks whether the db exists and if it does, verifies if the schema is what we currently support.
func (sp *SpannerAccessorImpl) VerifyDb(ctx context.Context, dbURI string) (dbExists bool, err error) {
dbExists, err = sp.CheckExistingDb(ctx, dbURI)
if err != nil {
return dbExists, err
}
if dbExists {
err = sp.ValidateDDL(ctx, dbURI)
}
return dbExists, err
}
// ValidateDDL verifies if an existing DB's ddl follows what is supported by Spanner migration tool. Currently,
// we only support empty schema when db already exists.
func (sp *SpannerAccessorImpl) ValidateDDL(ctx context.Context, dbURI string) error {
dbDdl, err := sp.AdminClient.GetDatabaseDdl(ctx, &adminpb.GetDatabaseDdlRequest{Database: dbURI})
if err != nil {
return fmt.Errorf("can't fetch database ddl: %v", err)
}
if len(dbDdl.Statements) != 0 {
return fmt.Errorf("spanner migration tool supports writing to existing databases only if they have an empty schema")
}
return nil
}
// UpdateDDLForeignKeys updates the Spanner database with foreign key
// constraints using ALTER TABLE statements.
func (sp *SpannerAccessorImpl) UpdateDDLForeignKeys(ctx context.Context, dbURI string, conv *internal.Conv, driver string, migrationType string) {
if conv.SpDialect != constants.DIALECT_POSTGRESQL && migrationType == constants.DATAFLOW_MIGRATION {
//foreign keys were applied as part of CreateDatabase
return
}
// The schema we send to Spanner excludes comments (since Cloud
// Spanner DDL doesn't accept them), and protects table and col names
// using backticks (to avoid any issues with Spanner reserved words).
// Sequences will not be passed as they have already been created.
fkStmts := ddl.GetDDL(ddl.Config{Comments: false, ProtectIds: true, Tables: false, ForeignKeys: true, SpDialect: conv.SpDialect, Source: driver}, conv.SpSchema, make(map[string]ddl.Sequence))
if len(fkStmts) == 0 {
return
}
if len(fkStmts) > 50 {
logger.Log.Warn(`
Warning: Large number of foreign keys detected. Spanner can take a long amount of
time to create foreign keys (over 5 mins per batch of Foreign Keys even with no data).
Spanner migration tool does not have control over a single foreign key creation time. The number
of concurrent Foreign Key Creation Requests sent to spanner can be increased by
tweaking the MaxWorkers variable (https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/conversion/conversion.go#L89).
However, setting it to a very high value might lead to exceeding the admin quota limit. Spanner migration tool tries to stay under the
admin quota limit by spreading the FK creation requests over time.`)
}
msg := fmt.Sprintf("Updating schema of database %s with foreign key constraints ...", dbURI)
conv.Audit.Progress = *internal.NewProgress(int64(len(fkStmts)), msg, internal.Verbose(), true, int(internal.ForeignKeyUpdateInProgress))
workers := make(chan int, MaxWorkers)
for i := 1; i <= MaxWorkers; i++ {
workers <- i
}
var progressMutex sync.Mutex
progress := int64(0)
// We dispatch parallel foreign key create requests to ensure the backfill runs in parallel to reduce overall time.
// This cuts down the time taken to a third (approx) compared to Serial and Batched creation. We also do not want to create
// too many requests and get throttled due to network or hitting catalog memory limits.
// Ensure atmost `MaxWorkers` go routines run in parallel that each update the ddl with one foreign key statement.
for _, fkStmt := range fkStmts {
workerID := <-workers
go func(fkStmt string, workerID int) {
defer func() {
// Locking the progress reporting otherwise progress results displayed could be in random order.
progressMutex.Lock()
progress++
conv.Audit.Progress.MaybeReport(progress)
progressMutex.Unlock()
workers <- workerID
}()
internal.VerbosePrintf("Submitting new FK create request: %s\n", fkStmt)
logger.Log.Debug("Submitting new FK create request", zap.String("fkStmt", fkStmt))
op, err := sp.AdminClient.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: dbURI,
Statements: []string{fkStmt},
})
if err != nil {
logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n")
conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err))
return
}
if err := op.Wait(ctx); err != nil {
logger.Log.Debug("Can't add foreign key with statement:" + fkStmt + "\n due to error:" + err.Error() + " Skipping this foreign key...\n")
conv.Unexpected(fmt.Sprintf("Can't add foreign key with statement %s: %s", fkStmt, err))
return
}
internal.VerbosePrintln("Updated schema with statement: " + fkStmt)
logger.Log.Debug("Updated schema with statement", zap.String("fkStmt", fkStmt))
}(fkStmt, workerID)
// Send out an FK creation request every second, with total of maxWorkers request being present in a batch.
time.Sleep(time.Second)
}
// Wait for all the goroutines to finish.
for i := 1; i <= MaxWorkers; i++ {
<-workers
}
conv.Audit.Progress.UpdateProgress("Foreign key update complete.", 100, internal.ForeignKeyUpdateComplete)
conv.Audit.Progress.Done()
}
func (sp *SpannerAccessorImpl) DropDatabase(ctx context.Context, dbURI string) error {
err := sp.AdminClient.DropDatabase(ctx, &adminpb.DropDatabaseRequest{Database: dbURI})
if err != nil {
return fmt.Errorf("can't build DropDatabaseRequest: %w", parse.AnalyzeError(err, dbURI))
}
return nil
}
func (sp *SpannerAccessorImpl) ValidateDML(ctx context.Context, query string) (bool, error) {
stmt := spanner.Statement{
SQL: query,
}
iter := sp.SpannerClient.Single().Query(ctx, stmt)
defer iter.Stop()
_, err := iter.Next()
// there is an error but the error does not indicate no more rows, means a syntax error.
if err != iterator.Done && err != nil {
return false, err
} else {
return true, nil
}
}
func (sp *SpannerAccessorImpl) Refresh(ctx context.Context, dbURI string) {
sp.SpannerClient.Refresh(ctx, dbURI)
}