conversion/get_info.go (175 lines of code) (raw):

// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package conversion import ( "context" "database/sql" "fmt" "net" "strings" "cloud.google.com/go/cloudsqlconn" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" "github.com/GoogleCloudPlatform/spanner-migration-tool/profiles" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/dynamodb" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/mysql" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/oracle" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/postgres" "github.com/GoogleCloudPlatform/spanner-migration-tool/sources/sqlserver" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" dydb "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodbstreams" mysqldriver "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/stdlib" ) type GetInfoInterface interface { getInfoSchemaForShard(migrationProjectId string, shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, sourceProfileDialect profiles.SourceProfileDialectInterface, getInfo GetInfoInterface) (common.InfoSchema, error) GetInfoSchemaFromCloudSQL(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) GetInfoSchema(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) } type GetInfoImpl struct{} func (gi *GetInfoImpl) getInfoSchemaForShard(migrationProjectId string, shardConnInfo profiles.DirectConnectionConfig, driver string, targetProfile profiles.TargetProfile, sourceProfileDialect profiles.SourceProfileDialectInterface, getInfo GetInfoInterface) (common.InfoSchema, error) { params := make(map[string]string) params["host"] = shardConnInfo.Host params["user"] = shardConnInfo.User params["dbName"] = shardConnInfo.DbName params["port"] = shardConnInfo.Port params["password"] = shardConnInfo.Password //while adding other sources, a switch-case will be added here on the basis of the driver input param passed. //pased on the driver name, profiles.NewSourceProfileConnection<DBName> will need to be called to create //the source profile information. getUtilsInfo := utils.GetUtilInfoImpl{} sourceProfileConnectionMySQL, err := sourceProfileDialect.NewSourceProfileConnectionMySQL(params, &getUtilsInfo) if err != nil { return nil, fmt.Errorf("cannot parse connection configuration for the primary shard") } sourceProfileConnection := profiles.SourceProfileConnection{Mysql: sourceProfileConnectionMySQL, Ty: profiles.SourceProfileConnectionTypeMySQL} //create a source profile which contains the sourceProfileConnection object for the primary shard //this is done because GetSQLConnectionStr() should not be aware of sharding newSourceProfile := profiles.SourceProfile{Conn: sourceProfileConnection, Ty: profiles.SourceProfileTypeConnection} newSourceProfile.Driver = driver infoSchema, err := getInfo.GetInfoSchema(migrationProjectId, newSourceProfile, targetProfile) if err != nil { return nil, err } return infoSchema, nil } func (gi *GetInfoImpl) GetInfoSchemaFromCloudSQL(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { driver := sourceProfile.Driver switch driver { case constants.MYSQL: d, err := cloudsqlconn.NewDialer(context.Background(), cloudsqlconn.WithIAMAuthN()) if err != nil { return nil, fmt.Errorf("cloudsqlconn.NewDialer: %w", err) } var opts []cloudsqlconn.DialOption instanceName := fmt.Sprintf("%s:%s:%s", sourceProfile.ConnCloudSQL.Mysql.Project, sourceProfile.ConnCloudSQL.Mysql.Region, sourceProfile.ConnCloudSQL.Mysql.InstanceName) mysqldriver.RegisterDialContext("cloudsqlconn", func(ctx context.Context, addr string) (net.Conn, error) { return d.Dial(ctx, instanceName, opts...) }) dbURI := fmt.Sprintf("%s:empty@cloudsqlconn(localhost:3306)/%s?parseTime=true", sourceProfile.ConnCloudSQL.Mysql.User, sourceProfile.ConnCloudSQL.Mysql.Db) db, err := sql.Open("mysql", dbURI) if err != nil { return nil, fmt.Errorf("sql.Open: %w", err) } return mysql.InfoSchemaImpl{ DbName: sourceProfile.ConnCloudSQL.Mysql.Db, Db: db, MigrationProjectId: migrationProjectId, SourceProfile: sourceProfile, TargetProfile: targetProfile, }, nil case constants.POSTGRES: d, err := cloudsqlconn.NewDialer(context.Background(), cloudsqlconn.WithIAMAuthN()) if err != nil { return nil, fmt.Errorf("cloudsqlconn.NewDialer: %w", err) } var opts []cloudsqlconn.DialOption dsn := fmt.Sprintf("user=%s database=%s", sourceProfile.ConnCloudSQL.Pg.User, sourceProfile.ConnCloudSQL.Pg.Db) config, err := pgx.ParseConfig(dsn) if err != nil { return nil, err } instanceName := fmt.Sprintf("%s:%s:%s", sourceProfile.ConnCloudSQL.Pg.Project, sourceProfile.ConnCloudSQL.Pg.Region, sourceProfile.ConnCloudSQL.Pg.InstanceName) config.DialFunc = func(ctx context.Context, network, instance string) (net.Conn, error) { return d.Dial(ctx, instanceName, opts...) } dbURI := stdlib.RegisterConnConfig(config) db, err := sql.Open("pgx", dbURI) if err != nil { return nil, fmt.Errorf("sql.Open: %w", err) } temp := false return postgres.InfoSchemaImpl{ Db: db, MigrationProjectId: migrationProjectId, SourceProfile: sourceProfile, TargetProfile: targetProfile, IsSchemaUnique: &temp, //this is a workaround to set a bool pointer }, nil default: return nil, fmt.Errorf("driver %s not supported", driver) } } func (gi *GetInfoImpl) GetInfoSchema(migrationProjectId string, sourceProfile profiles.SourceProfile, targetProfile profiles.TargetProfile) (common.InfoSchema, error) { connectionConfig, err := ConnectionConfig(sourceProfile) if err != nil { return nil, err } driver := sourceProfile.Driver switch driver { case constants.MYSQL: db, err := sql.Open(driver, connectionConfig.(string)) dbName := getDbNameFromSQLConnectionStr(driver, connectionConfig.(string)) if err != nil { return nil, err } return mysql.InfoSchemaImpl{ DbName: dbName, Db: db, MigrationProjectId: migrationProjectId, SourceProfile: sourceProfile, TargetProfile: targetProfile, }, nil case constants.POSTGRES: db, err := sql.Open(driver, connectionConfig.(string)) if err != nil { return nil, err } temp := false return postgres.InfoSchemaImpl{ Db: db, MigrationProjectId: migrationProjectId, SourceProfile: sourceProfile, TargetProfile: targetProfile, IsSchemaUnique: &temp, //this is a workaround to set a bool pointer }, nil case constants.DYNAMODB: mySession := session.Must(session.NewSession()) dydbClient := dydb.New(mySession, connectionConfig.(*aws.Config)) var dydbStreamsClient *dynamodbstreams.DynamoDBStreams if sourceProfile.Conn.Streaming { newSession := session.Must(session.NewSession()) dydbStreamsClient = dynamodbstreams.New(newSession, connectionConfig.(*aws.Config)) } return dynamodb.InfoSchemaImpl{ DynamoClient: dydbClient, SampleSize: profiles.GetSchemaSampleSize(sourceProfile), DynamoStreamsClient: dydbStreamsClient, }, nil case constants.SQLSERVER: db, err := sql.Open(driver, connectionConfig.(string)) dbName := getDbNameFromSQLConnectionStr(driver, connectionConfig.(string)) if err != nil { return nil, err } return sqlserver.InfoSchemaImpl{DbName: dbName, Db: db}, nil case constants.ORACLE: db, err := sql.Open(driver, connectionConfig.(string)) dbName := getDbNameFromSQLConnectionStr(driver, connectionConfig.(string)) if err != nil { return nil, err } return oracle.InfoSchemaImpl{DbName: strings.ToUpper(dbName), Db: db, MigrationProjectId: migrationProjectId, SourceProfile: sourceProfile, TargetProfile: targetProfile}, nil default: return nil, fmt.Errorf("driver %s not supported", driver) } }