cmd/utils.go (231 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cmd
import (
"context"
"encoding/base64"
"fmt"
"time"
sp "cloud.google.com/go/spanner"
database "cloud.google.com/go/spanner/admin/database/apiv1"
datastreamclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/datastream"
storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage"
datastream_accessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/datastream"
spanneraccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/spanner"
storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/metrics"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/parse"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/conversion"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/writer"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/helpers"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/proto"
)
var (
badDataFile = ".dropped.txt"
schemaFile = ".schema.txt"
sessionFile = ".session.json"
)
const (
DefaultWritersLimit = 40
completionPercentage = 100
)
func metricsPopulation(ctx context.Context, driver string, conv *internal.Conv) {
if !conv.Audit.SkipMetricsPopulation {
// Adding migration metadata to the outgoing context.
migrationData := metrics.GetMigrationData(conv, driver, constants.SchemaConv)
serializedMigrationData, _ := proto.Marshal(migrationData)
migrationMetadataValue := base64.StdEncoding.EncodeToString(serializedMigrationData)
ctx = metadata.AppendToOutgoingContext(ctx, constants.MigrationMetadataKey, migrationMetadataValue)
}
}
// CreateDatabaseClient creates new database client and admin client.
func CreateDatabaseClient(ctx context.Context, targetProfile profiles.TargetProfile, driver, dbName string, ioHelper utils.IOStreams) (*database.DatabaseAdminClient, *sp.Client, string, error) {
if targetProfile.Conn.Sp.Dbname == "" {
targetProfile.Conn.Sp.Dbname = dbName
}
project, instance, dbName, err := targetProfile.GetResourceIds(ctx, time.Now(), driver, ioHelper.Out, &utils.GetUtilInfoImpl{})
if err != nil {
return nil, nil, "", err
}
fmt.Println("Using Google Cloud project:", project)
fmt.Println("Using Cloud Spanner instance:", instance)
utils.PrintPermissionsWarning(driver, ioHelper.Out)
dbURI := fmt.Sprintf("projects/%s/instances/%s/databases/%s", project, instance, dbName)
adminClient, err := utils.NewDatabaseAdminClient(ctx)
if err != nil {
err = fmt.Errorf("can't create admin client: %v", parse.AnalyzeError(err, dbURI))
return nil, nil, dbURI, err
}
client, err := utils.GetClient(ctx, dbURI)
if err != nil {
err = fmt.Errorf("can't create client for db %s: %v", dbURI, err)
return adminClient, nil, dbURI, err
}
return adminClient, client, dbURI, nil
}
// PrepareMigrationPrerequisites creates source and target profiles, opens a new IOStream and generates the database name.
func PrepareMigrationPrerequisites(sourceProfileString, targetProfileString, source string) (profiles.SourceProfile, profiles.TargetProfile, utils.IOStreams, string, error) {
targetProfile, err := profiles.NewTargetProfile(targetProfileString)
if err != nil {
return profiles.SourceProfile{}, profiles.TargetProfile{}, utils.IOStreams{}, "", err
}
n := profiles.NewSourceProfileImpl{}
sourceProfile, err := profiles.NewSourceProfile(sourceProfileString, source, &n)
if err != nil {
return profiles.SourceProfile{}, targetProfile, utils.IOStreams{}, "", err
}
sourceProfile.Driver, err = sourceProfile.ToLegacyDriver(source)
if err != nil {
return profiles.SourceProfile{}, targetProfile, utils.IOStreams{}, "", err
}
dumpFilePath := ""
if sourceProfile.Ty == profiles.SourceProfileTypeFile && (sourceProfile.File.Format == "" || sourceProfile.File.Format == "dump") {
dumpFilePath = sourceProfile.File.Path
}
ioHelper := utils.NewIOStreams(sourceProfile.Driver, dumpFilePath)
if ioHelper.SeekableIn != nil {
defer ioHelper.In.Close()
}
getInfo := utils.GetUtilInfoImpl{}
dbName, err := getInfo.GetDatabaseName(sourceProfile.Driver, time.Now())
if err != nil {
err = fmt.Errorf("can't generate database name for prefix: %v", err)
return sourceProfile, targetProfile, ioHelper, "", err
}
// check or create the internal metadata database for all flows.
helpers.CheckOrCreateMetadataDb(targetProfile.Conn.Sp.Project, targetProfile.Conn.Sp.Instance)
return sourceProfile, targetProfile, ioHelper, dbName, nil
}
// MigrateData creates database and populates data in it.
func MigrateDatabase(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile, dbName string, ioHelper *utils.IOStreams, cmd interface{}, conv *internal.Conv, migrationError *error) (*writer.BatchWriter, error) {
var (
bw *writer.BatchWriter
err error
)
defer func() {
if err != nil && migrationError != nil {
*migrationError = err
}
}()
adminClient, client, dbURI, err := CreateDatabaseClient(ctx, targetProfile, sourceProfile.Driver, dbName, *ioHelper)
if err != nil {
err = fmt.Errorf("can't create database client: %v", err)
return nil, err
}
defer adminClient.Close()
defer client.Close()
switch v := cmd.(type) {
case *SchemaCmd:
err = migrateSchema(ctx, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient)
case *DataCmd:
bw, err = migrateData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
case *SchemaAndDataCmd:
bw, err = migrateSchemaAndData(ctx, migrationProjectId, targetProfile, sourceProfile, ioHelper, conv, dbURI, adminClient, client, v)
}
if err != nil {
err = fmt.Errorf("can't migrate database: %v", err)
return nil, err
}
return bw, nil
}
func migrateSchema(ctx context.Context, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient) error {
spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return err
}
err = spA.CreateOrUpdateDatabase(ctx, dbURI, sourceProfile.Driver, conv, sourceProfile.Config.ConfigType)
if err != nil {
err = fmt.Errorf("can't create/update database: %v", err)
return err
}
metricsPopulation(ctx, sourceProfile.Driver, conv)
conv.Audit.Progress.UpdateProgress("Schema migration complete.", completionPercentage, internal.SchemaMigrationComplete)
return nil
}
func migrateData(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, cmd *DataCmd) (*writer.BatchWriter, error) {
var (
bw *writer.BatchWriter
err error
)
if !sourceProfile.UseTargetSchema() {
err = validateExistingDb(ctx, conv.SpDialect, dbURI, adminClient, client, conv)
if err != nil {
err = fmt.Errorf("error while validating existing database: %v", err)
return nil, err
}
fmt.Printf("Schema validated successfully for data migration for db %s\n", dbURI)
}
// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
return nil, err
}
}
c := &conversion.ConvImpl{}
bw, err = c.DataConv(ctx, migrationProjectId, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return nil, err
}
conv.Audit.Progress.UpdateProgress("Data migration complete.", completionPercentage, internal.DataMigrationComplete)
if !cmd.SkipForeignKeys {
spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return bw, err
}
spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}
return bw, nil
}
func migrateSchemaAndData(ctx context.Context, migrationProjectId string, targetProfile profiles.TargetProfile, sourceProfile profiles.SourceProfile,
ioHelper *utils.IOStreams, conv *internal.Conv, dbURI string, adminClient *database.DatabaseAdminClient, client *sp.Client, cmd *SchemaAndDataCmd) (*writer.BatchWriter, error) {
spA, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return nil, err
}
err = spA.CreateOrUpdateDatabase(ctx, dbURI, sourceProfile.Driver, conv, sourceProfile.Config.ConfigType)
if err != nil {
err = fmt.Errorf("can't create/update database: %v", err)
return nil, err
}
metricsPopulation(ctx, sourceProfile.Driver, conv)
conv.Audit.Progress.UpdateProgress("Schema migration complete.", completionPercentage, internal.SchemaMigrationComplete)
// If migration type is Minimal Downtime, validate if required resources can be generated
if !conv.UI && sourceProfile.Driver == constants.MYSQL && sourceProfile.Ty == profiles.SourceProfileTypeConfig && sourceProfile.Config.ConfigType == constants.DATAFLOW_MIGRATION {
err := ValidateResourceGenerationHelper(ctx, migrationProjectId, targetProfile.Conn.Sp.Instance, sourceProfile, conv)
if err != nil {
return nil, err
}
}
convImpl := &conversion.ConvImpl{}
bw, err := convImpl.DataConv(ctx, migrationProjectId, sourceProfile, targetProfile, ioHelper, client, conv, true, cmd.WriteLimit, &conversion.DataFromSourceImpl{})
if err != nil {
err = fmt.Errorf("can't finish data conversion for db %s: %v", dbURI, err)
return nil, err
}
conv.Audit.Progress.UpdateProgress("Data migration complete.", completionPercentage, internal.DataMigrationComplete)
if !cmd.SkipForeignKeys {
spA.UpdateDDLForeignKeys(ctx, dbURI, conv, sourceProfile.Driver, sourceProfile.Config.ConfigType)
}
return bw, nil
}
func ValidateResourceGenerationHelper(ctx context.Context, migrationProjectId string, instanceId string, sourceProfile profiles.SourceProfile, conv *internal.Conv) error {
spanneraccessor, err := spanneraccessor.NewSpannerAccessorClientImpl(ctx)
if err != nil {
return err
}
dsClient, err := datastreamclient.NewDatastreamClientImpl(ctx)
if err != nil {
return err
}
storageclient, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
return err
}
validateResource := conversion.NewValidateResourcesImpl(spanneraccessor, &datastream_accessor.DatastreamAccessorImpl{},
dsClient, &storageaccessor.StorageAccessorImpl{}, storageclient)
err = validateResource.ValidateResourceGeneration(ctx, migrationProjectId, instanceId, sourceProfile, conv)
if err != nil {
return err
}
return nil
}