profiles/source_profile.go (666 lines of code) (raw):

// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package profiles import ( "encoding/json" "fmt" "io/ioutil" "os" "strconv" "strings" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" ) type SourceProfileType int const ( SourceProfileTypeUnset = iota SourceProfileTypeFile SourceProfileTypeConnection SourceProfileTypeConfig SourceProfileTypeCsv SourceProfileTypeCloudSQL ) type SourceProfileFile struct { Path string Format string } // Interface to create source profiles for different database dialects type SourceProfileDialectInterface interface { NewSourceProfileConnectionCloudSQLMySQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionCloudSQLMySQL, error) NewSourceProfileConnectionMySQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionMySQL, error) NewSourceProfileConnectionCloudSQLPostgreSQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionCloudSQLPostgreSQL, error) NewSourceProfileConnectionPostgreSQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionPostgreSQL, error) NewSourceProfileConnectionSqlServer(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionSqlServer, error) NewSourceProfileConnectionDynamoDB(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionDynamoDB, error) NewSourceProfileConnectionOracle(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionOracle, error) } type SourceProfileDialectImpl struct{} // Interface to create new source profiles for different input types type NewSourceProfileInterface interface { NewSourceProfileFile(params map[string]string) SourceProfileFile NewSourceProfileConfig(source string, path string) (SourceProfileConfig, error) NewSourceProfileConnectionCloudSQL(source string, params map[string]string, s SourceProfileDialectInterface) (SourceProfileConnectionCloudSQL, error) NewSourceProfileConnection(source string, params map[string]string, s SourceProfileDialectInterface) (SourceProfileConnection, error) } type NewSourceProfileImpl struct{} func (nsp *NewSourceProfileImpl) NewSourceProfileFile(params map[string]string) SourceProfileFile { profile := SourceProfileFile{} if !filePipedToStdin() { profile.Path = params["file"] } if format, ok := params["format"]; ok { profile.Format = format // TODO: Add check that format takes values from ["dump", "csv", "avro", ... etc] } else { fmt.Printf("source-profile format defaulting to `dump`\n") profile.Format = "dump" } return profile } type SourceProfileConnectionType int const ( SourceProfileConnectionTypeUnset = iota SourceProfileConnectionTypeMySQL SourceProfileConnectionTypePostgreSQL SourceProfileConnectionTypeDynamoDB SourceProfileConnectionTypeSqlServer SourceProfileConnectionTypeOracle ) type SourceProfileConnectionTypeCloudSQL int const ( SourceProfileConnectionTypeCloudSQLUnset = iota SourceProfileConnectionTypeCloudSQLMySQL SourceProfileConnectionTypeCloudSQLPostgreSQL ) type SourceProfileConnectionCloudSQLMySQL struct { User string Db string InstanceName string Project string Region string } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionCloudSQLMySQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionCloudSQLMySQL, error) { mysql := SourceProfileConnectionCloudSQLMySQL{} user, userOk := params["user"] db, dbOk := params["dbName"] instance, instanceOk := params["instance"] project, projectOk := params["project"] var err error if !projectOk { project, err = g.GetProject() if err != nil { return mysql, fmt.Errorf("project for cloudsql instance not specified in source-profile, and unable to fetch from gcloud. Please specify project in the source-profile or configure in gcloud") } } region, regionOk := params["region"] if !userOk || !dbOk || !instanceOk || !regionOk { return mysql, fmt.Errorf("please specify user, dbName, instance and region in the source-profile") } mysql.User = user mysql.Db = db mysql.InstanceName = instance mysql.Project = project mysql.Region = region return mysql, nil } type SourceProfileConnectionMySQL struct { Host string // Same as MYSQLHOST environment variable Port string // Same as MYSQLPORT environment variable User string // Same as MYSQLUSER environment variable Db string // Same as MYSQLDATABASE environment variable Pwd string // Same as MYSQLPWD environment variable StreamingConfig string } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionMySQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionMySQL, error) { mysql := SourceProfileConnectionMySQL{} host, hostOk := params["host"] user, userOk := params["user"] db, dbOk := params["dbName"] port, portOk := params["port"] pwd, pwdOk := params["password"] streamingConfig, cfgOk := params["streamingCfg"] if cfgOk && streamingConfig == "" { return mysql, fmt.Errorf("specify a non-empty streaming config file path") } mysql.StreamingConfig = streamingConfig // We don't users to mix and match params from source-profile and environment variables. // We either try to get all params from the source-profile and if none are set, we read from the env variables. if !(hostOk || userOk || dbOk || portOk || pwdOk) { // No connection params provided through source-profile. Fetching from env variables. fmt.Printf("Connection parameters not specified in source-profile. Reading from " + "environment variables MYSQLHOST, MYSQLUSER, MYSQLDATABASE, MYSQLPORT, MYSQLPWD...\n") mysql.Host = os.Getenv("MYSQLHOST") mysql.User = os.Getenv("MYSQLUSER") mysql.Db = os.Getenv("MYSQLDATABASE") mysql.Port = os.Getenv("MYSQLPORT") mysql.Pwd = os.Getenv("MYSQLPWD") // Throw error if the input entered is empty. if mysql.Host == "" || mysql.User == "" || mysql.Db == "" { return mysql, fmt.Errorf("found empty string for MYSQLHOST/MYSQLUSER/MYSQLDATABASE. Please specify these environment variables with correct values") } } else if hostOk && userOk && dbOk { // If atleast host, username and dbName are provided through source-profile, // go ahead and use source-profile. Port and password handled later even if they are empty. mysql.Host, mysql.User, mysql.Db, mysql.Port, mysql.Pwd = host, user, db, port, pwd // Throw error if the input entered is empty. if mysql.Host == "" || mysql.User == "" || mysql.Db == "" { return mysql, fmt.Errorf("found empty string for host/user/dbName. Please specify host, port, user and dbName in the source-profile") } } else { // Partial params provided through source-profile. Ask user to provide all through the source-profile. return mysql, fmt.Errorf("please specify host, port, user and dbName in the source-profile") } // Throw same error if the input entered is empty. if mysql.Host == "" || mysql.User == "" || mysql.Db == "" { return mysql, fmt.Errorf("found empty string for host/user/db. please specify host, port, user and dbName in the source-profile") } if mysql.Port == "" { // Set default port for mysql, which rarely changes. mysql.Port = "3306" } if mysql.Pwd == "" { mysql.Pwd = g.GetPassword() } return mysql, nil } type SourceProfileConnectionCloudSQLPostgreSQL struct { User string Db string InstanceName string Project string Region string } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionCloudSQLPostgreSQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionCloudSQLPostgreSQL, error) { postgres := SourceProfileConnectionCloudSQLPostgreSQL{} user, userOk := params["user"] db, dbOk := params["dbName"] instance, instanceOk := params["instance"] project, projectOk := params["project"] var err error if !projectOk { project, err = g.GetProject() if err != nil { return postgres, fmt.Errorf("project for cloudsql instance not specified in source-profile, and unable to fetch from gcloud. Please specify project in the source-profile or configure in gcloud") } } region, regionOk := params["region"] if !userOk || !dbOk || !instanceOk || !regionOk { return postgres, fmt.Errorf("please specify user, dbName, instance and region in the source-profile") } postgres.User = user postgres.Db = db postgres.InstanceName = instance postgres.Project = project postgres.Region = region return postgres, nil } type SourceProfileConnectionPostgreSQL struct { Host string // Same as PGHOST environment variable Port string // Same as PGPORT environment variable User string // Same as PGUSER environment variable Db string // Same as PGDATABASE environment variable Pwd string // Same as PGPASSWORD environment variable StreamingConfig string } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionPostgreSQL(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionPostgreSQL, error) { pg := SourceProfileConnectionPostgreSQL{} host, hostOk := params["host"] user, userOk := params["user"] db, dbOk := params["dbName"] port, portOk := params["port"] pwd, pwdOk := params["password"] streamingConfig, cfgOk := params["streamingCfg"] if cfgOk && streamingConfig == "" { return pg, fmt.Errorf("specify a non-empty streaming config file path") } pg.StreamingConfig = streamingConfig // We don't users to mix and match params from source-profile and environment variables. // We either try to get all params from the source-profile and if none are set, we read from the env variables. if !(hostOk || userOk || dbOk || portOk || pwdOk) { // No connection params provided through source-profile. Fetching from env variables. fmt.Printf("Connection parameters not specified in source-profile. Reading from " + "environment variables PGHOST, PGUSER, PGDATABASE, PGPORT, PGPASSWORD...\n") pg.Host = os.Getenv("PGHOST") pg.User = os.Getenv("PGUSER") pg.Db = os.Getenv("PGDATABASE") pg.Port = os.Getenv("PGPORT") pg.Pwd = os.Getenv("PGPASSWORD") // Throw error if the input entered is empty. if pg.Host == "" || pg.User == "" || pg.Db == "" { return pg, fmt.Errorf("found empty string for PGHOST/PGUSER/PGDATABASE. Please specify these environment variables with correct values") } } else if hostOk && userOk && dbOk { // All connection params provided through source-profile. Port and password handled later. pg.Host, pg.User, pg.Db, pg.Port, pg.Pwd = host, user, db, port, pwd // Throw error if the input entered is empty. if pg.Host == "" || pg.User == "" || pg.Db == "" { return pg, fmt.Errorf("found empty string for host/user/dbName. Please specify host, port, user and dbName in the source-profile") } } else { // Partial params provided through source-profile. Ask user to provide all through the source-profile. return pg, fmt.Errorf("please specify host, port, user and dbName in the source-profile") } if pg.Port == "" { // Set default port for postgresql, which rarely changes. pg.Port = "5432" } if pg.Pwd == "" { pg.Pwd = g.GetPassword() } return pg, nil } type SourceProfileConnectionSqlServer struct { Host string Port string User string Db string Pwd string } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionSqlServer(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionSqlServer, error) { ss := SourceProfileConnectionSqlServer{} host, hostOk := params["host"] user, userOk := params["user"] db, dbOk := params["dbName"] port, portOk := params["port"] pwd, pwdOk := params["password"] // We don't allow users to mix and match params from source-profile and environment variables. // We either try to get all params from the source-profile and if none are set, we read from the env variables. if !(hostOk || userOk || dbOk || portOk || pwdOk) { // No connection params provided through source-profile. Fetching from env variables. fmt.Printf("Connection parameters not specified in source-profile. Reading from " + "environment variables MSSQL_IP_ADDRESS, MSSQL_USER, MSSQL_DATABASE, MSSQL_TCP_PORT, MSSQL_SA_PASSWORD...\n") ss.Host = os.Getenv("MSSQL_IP_ADDRESS") //For default SQL Server instances. ss.Port = os.Getenv("MSSQL_TCP_PORT") ss.Pwd = os.Getenv("MSSQL_SA_PASSWORD") ss.Db = os.Getenv("MSSQL_DATABASE") //Non standard env variable. Defined for Spanner migration tool. ss.User = os.Getenv("MSSQL_SA_USER") //Non standard env variable. Defined for Spanner migration tool. if ss.User == "" { fmt.Printf("MSSQL_SA_USER environment variable is not set. Default admin user 'SA' will be used for further processing.\n") ss.User = "SA" } // Throw error if the input entered is empty. if ss.Host == "" || ss.Db == "" { return ss, fmt.Errorf("found empty string for MSSQL_IP_ADDRESS/MSSQL_DATABASE. Please specify these environment variables with correct values") } } else if hostOk && userOk && dbOk { // All connection params provided through source-profile. Port and password handled later. ss.Host, ss.User, ss.Db, ss.Port, ss.Pwd = host, user, db, port, pwd // Throw error if the input entered is empty. if ss.Host == "" || ss.User == "" || ss.Db == "" { return ss, fmt.Errorf("found empty string for host/user/dbName. Please specify host, port, user and dbName in the source-profile") } } else { // Partial params provided through source-profile. Ask user to provide all through the source-profile. return ss, fmt.Errorf("please specify host, port, user and dbName in the source-profile") } if ss.Port == "" { // Set default port for sql server, which rarely changes. ss.Port = "1433" } // Try to get admin password from env if saPas := os.Getenv("MSSQL_SA_PASSWORD"); saPas != "" { ss.Pwd = saPas } // If source profile and env do not have password then get password via prompt. if ss.Pwd == "" { ss.Pwd = g.GetPassword() } return ss, nil } type SourceProfileConnectionDynamoDB struct { // These connection params are not used currently because the SDK reads directly from the env variables. // These are still kept around as reference when we refactor passing // SourceProfile instead of sqlConnectionStr around. AwsAccessKeyID string // Same as AWS_ACCESS_KEY_ID environment variable AwsSecretAccessKey string // Same as AWS_SECRET_ACCESS_KEY environment variable AwsRegion string // Same as AWS_REGION environment variable DydbEndpoint string // Same as DYNAMODB_ENDPOINT_OVERRIDE environment variable SchemaSampleSize int64 // Number of rows to use for inferring schema (default 100,000) enableStreaming string // Used for confirming streaming migration (valid options: `yes`,`no`,`true`,`false`) } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionDynamoDB(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionDynamoDB, error) { dydb := SourceProfileConnectionDynamoDB{} if schemaSampleSize, ok := params["schema-sample-size"]; ok { schemaSampleSizeInt, err := strconv.Atoi(schemaSampleSize) if err != nil { return dydb, fmt.Errorf("could not parse schema-sample-size = %v as a valid int64", schemaSampleSize) } dydb.SchemaSampleSize = int64(schemaSampleSizeInt) } // For DynamoDB, the preferred way to provide connection params is through env variables. // Unlike postgres and mysql, there may not be deprecation of env variables, hence it // is better to override env variables optionally via source profile params. var ok bool if dydb.AwsAccessKeyID, ok = params["aws-access-key-id"]; ok { os.Setenv("AWS_ACCESS_KEY_ID", dydb.AwsAccessKeyID) } if dydb.AwsSecretAccessKey, ok = params["aws-secret-access-key"]; ok { os.Setenv("AWS_SECRET_ACCESS_KEY", dydb.AwsSecretAccessKey) } if dydb.AwsRegion, ok = params["aws-region"]; ok { os.Setenv("AWS_REGION", dydb.AwsRegion) } if dydb.DydbEndpoint, ok = params["dydb-endpoint"]; ok { os.Setenv("DYNAMODB_ENDPOINT_OVERRIDE", dydb.DydbEndpoint) } if dydb.enableStreaming, ok = params["enableStreaming"]; ok { switch dydb.enableStreaming { case "yes", "true": dydb.enableStreaming = "yes" case "no", "false": dydb.enableStreaming = "no" default: return dydb, fmt.Errorf("please specify a valid choice for enableStreaming: available choices(yes, no, true, false)") } } return dydb, nil } type SourceProfileConnectionOracle struct { Host string Port string User string Db string Pwd string StreamingConfig string } func (spd *SourceProfileDialectImpl) NewSourceProfileConnectionOracle(params map[string]string, g utils.GetUtilInfoInterface) (SourceProfileConnectionOracle, error) { ss := SourceProfileConnectionOracle{} host, hostOk := params["host"] user, userOk := params["user"] db, dbOk := params["dbName"] port := params["port"] pwd := params["password"] streamingConfig, cfgOk := params["streamingCfg"] if cfgOk && streamingConfig == "" { return ss, fmt.Errorf("specify a non-empty streaming config file path") } ss.StreamingConfig = streamingConfig if hostOk && userOk && dbOk { // All connection params provided through source-profile. Port and password handled later. ss.Host, ss.User, ss.Db, ss.Port, ss.Pwd = host, user, db, port, pwd // Throw error if the input entered is empty. if ss.Host == "" || ss.User == "" || ss.Db == "" { return ss, fmt.Errorf("found empty string for host/user/dbName. Please specify host, port, user and dbName in the source-profile") } } else { // Partial params provided through source-profile. Ask user to provide all through the source-profile. return ss, fmt.Errorf("please specify host, port, user and dbName in the source-profile") } if ss.Port == "" { // Set default port for oracle, which rarely changes. ss.Port = "1521" } if ss.Pwd == "" { ss.Pwd = g.GetPassword() } return ss, nil } type SourceProfileConnection struct { Ty SourceProfileConnectionType Streaming bool Mysql SourceProfileConnectionMySQL Pg SourceProfileConnectionPostgreSQL Dydb SourceProfileConnectionDynamoDB SqlServer SourceProfileConnectionSqlServer Oracle SourceProfileConnectionOracle } type SourceProfileConnectionCloudSQL struct { Ty SourceProfileConnectionTypeCloudSQL Mysql SourceProfileConnectionCloudSQLMySQL Pg SourceProfileConnectionCloudSQLPostgreSQL } func (nsp *NewSourceProfileImpl) NewSourceProfileConnection(source string, params map[string]string, s SourceProfileDialectInterface) (SourceProfileConnection, error) { conn := SourceProfileConnection{} var err error switch strings.ToLower(source) { case "mysql": { conn.Ty = SourceProfileConnectionTypeMySQL conn.Mysql, err = s.NewSourceProfileConnectionMySQL(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } if conn.Mysql.StreamingConfig != "" { conn.Streaming = true } } case "postgresql", "postgres", "pg": { conn.Ty = SourceProfileConnectionTypePostgreSQL conn.Pg, err = s.NewSourceProfileConnectionPostgreSQL(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } if conn.Pg.StreamingConfig != "" { conn.Streaming = true } } case "dynamodb": { conn.Ty = SourceProfileConnectionTypeDynamoDB conn.Dydb, err = s.NewSourceProfileConnectionDynamoDB(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } if conn.Dydb.enableStreaming == "yes" { conn.Streaming = true } } case "sqlserver", "mssql": { conn.Ty = SourceProfileConnectionTypeSqlServer conn.SqlServer, err = s.NewSourceProfileConnectionSqlServer(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } } case "oracle": { conn.Ty = SourceProfileConnectionTypeOracle conn.Oracle, err = s.NewSourceProfileConnectionOracle(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } if conn.Oracle.StreamingConfig != "" { conn.Streaming = true } } default: return conn, fmt.Errorf("please specify a valid source database using -source flag, received source = %v", source) } return conn, nil } func (nsp *NewSourceProfileImpl) NewSourceProfileConnectionCloudSQL(source string, params map[string]string, s SourceProfileDialectInterface) (SourceProfileConnectionCloudSQL, error) { conn := SourceProfileConnectionCloudSQL{} var err error switch strings.ToLower(source) { case "mysql": { conn.Ty = SourceProfileConnectionTypeCloudSQLMySQL conn.Mysql, err = s.NewSourceProfileConnectionCloudSQLMySQL(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } } case "postgresql", "postgres", "pg": { conn.Ty = SourceProfileConnectionTypeCloudSQLPostgreSQL conn.Pg, err = s.NewSourceProfileConnectionCloudSQLPostgreSQL(params, &utils.GetUtilInfoImpl{}) if err != nil { return conn, err } } } return conn, nil } type DirectConnectionConfig struct { DataShardId string `json:"dataShardId"` Host string `json:"host"` User string `json:"user"` Password string `json:"password"` Port string `json:"port"` DbName string `json:"dbName"` } type DatastreamConnProfileSource struct { Name string `json:"name"` Host string `json:"host"` User string `json:"user"` Port string `json:"port"` Password string `json:"password"` Location string `json:"location"` } type DatastreamConnProfileTarget struct { Name string `json:"name"` Location string `json:"location"` } type DatastreamConfig struct { MaxConcurrentBackfillTasks string `json:"maxConcurrentBackfillTasks"` MaxConcurrentCdcTasks string `json:"maxConcurrentCdcTasks"` } type GcsConfig struct { TtlInDays int64 `json:"ttlInDays,string"` TtlInDaysSet bool `json:"ttlInDaysSet"` } type DataflowConfig struct { ProjectId string `json:"projectId"` Location string `json:"location"` Network string `json:"network"` Subnetwork string `json:"subnetwork"` VpcHostProjectId string `json:"hostProjectId"` MaxWorkers string `json:"maxWorkers"` NumWorkers string `json:"numWorkers"` ServiceAccountEmail string `json:"serviceAccountEmail"` JobName string `json:"jobName"` MachineType string `json:"machineType"` AdditionalUserLabels string `json:"additionalUserLabels"` KmsKeyName string `json:"kmsKeyName"` GcsTemplatePath string `json:"gcsTemplatePath"` CustomJarPath string `json:"customJarPath"` CustomClassName string `json:"customClassName"` CustomParameter string `json:"customParameter"` } type DataShard struct { DataShardId string `json:"dataShardId"` SrcConnectionProfile DatastreamConnProfileSource `json:"srcConnectionProfile"` DstConnectionProfile DatastreamConnProfileTarget `json:"dstConnectionProfile"` DatastreamConfig DatastreamConfig `json:"datastreamConfig"` GcsConfig GcsConfig `json:"gcsConfig"` DataflowConfig DataflowConfig `json:"dataflowConfig"` TmpDir string `json:"tmpDir"` StreamLocation string `json:"streamLocation"` LogicalShards []LogicalShard `json:"databases"` } type LogicalShard struct { DbName string `json:"dbName"` LogicalShardId string `json:"databaseId"` RefDataShardId string `json:"refDataShardId"` } type ShardConfigurationDataflow struct { SchemaSource DirectConnectionConfig `json:"schemaSource"` DataShards []*DataShard `json:"dataShards"` DatastreamConfig DatastreamConfig `json:"datastreamConfig"` GcsConfig GcsConfig `json:"gcsConfig"` DataflowConfig DataflowConfig `json:"dataflowConfig"` } type ShardConfigurationBulk struct { SchemaSource DirectConnectionConfig `json:"schemaSource"` DataShards []DirectConnectionConfig `json:"dataShards"` } // TODO: Define the sharding structure for DMS migrations here. type ShardConfigurationDMS struct { } type SourceProfileConfig struct { ConfigType string `json:"configType"` ShardConfigurationBulk ShardConfigurationBulk `json:"shardConfigurationBulk"` ShardConfigurationDataflow ShardConfigurationDataflow `json:"shardConfigurationDataflow"` ShardConfigurationDMS ShardConfigurationDMS `json:"shardConfigurationDMS"` } func (nsp *NewSourceProfileImpl) NewSourceProfileConfig(source string, path string) (SourceProfileConfig, error) { //given the source, the fact that this 'config=', determine the appropiate object to marshal into switch source { case constants.MYSQL: //load the JSON configuration into file configFile, err := ioutil.ReadFile(path) if err != nil { return SourceProfileConfig{}, fmt.Errorf("cannot read config file due to: %v", err) } sourceProfileConfig := SourceProfileConfig{} //unmarshal the JSON into object err = json.Unmarshal(configFile, &sourceProfileConfig) return sourceProfileConfig, err default: return SourceProfileConfig{}, fmt.Errorf("sharded migrations are currrently only supported for MySQL databases") } } type SourceProfileCsv struct { Manifest string Delimiter string NullStr string } func NewSourceProfileCsv(params map[string]string) SourceProfileCsv { csvProfile := SourceProfileCsv{} csvProfile.Manifest = params["manifest"] csvProfile.Delimiter = "," csvProfile.NullStr = "" if delimiter, ok := params["delimiter"]; ok { csvProfile.Delimiter = delimiter } if nullStr, ok := params["nullStr"]; ok { csvProfile.NullStr = nullStr } return csvProfile } type SourceProfile struct { Driver string Ty SourceProfileType File SourceProfileFile Conn SourceProfileConnection ConnCloudSQL SourceProfileConnectionCloudSQL Config SourceProfileConfig Csv SourceProfileCsv } // UseTargetSchema returns true if the driver expects an existing schema // to use in the target database. func (src SourceProfile) UseTargetSchema() bool { return (src.Driver == constants.CSV) } // ToLegacyDriver converts source-profile to equivalent legacy global flags // e.g., -driver, -dump-file etc since the rest of the codebase still uses the // same. TODO: Deprecate this function and pass around SourceProfile across the // codebase wherever information about source connection is required. func (src SourceProfile) ToLegacyDriver(source string) (string, error) { switch src.Ty { case SourceProfileTypeFile: { switch strings.ToLower(source) { case "mysql": return constants.MYSQLDUMP, nil case "postgresql", "postgres", "pg": return constants.PGDUMP, nil case "dynamodb": return "", fmt.Errorf("dump files are not supported with DynamoDB") default: return "", fmt.Errorf("please specify a valid source database using -source flag, received source = %v", source) } } // No need to handle unsupported streaming source specified as it is already covered during source profile creation. case SourceProfileTypeConnection: { switch strings.ToLower(source) { case "mysql": return constants.MYSQL, nil case "postgresql", "postgres", "pg": return constants.POSTGRES, nil case "dynamodb": return constants.DYNAMODB, nil case "sqlserver", "mssql": return constants.SQLSERVER, nil case "oracle": return constants.ORACLE, nil default: return "", fmt.Errorf("please specify a valid source database using -source flag, received source = %v", source) } } case SourceProfileTypeCloudSQL: { switch strings.ToLower(source) { case "mysql": return constants.MYSQL, nil case "postgresql", "postgres", "pg": return constants.POSTGRES, nil default: return "", fmt.Errorf("please specify a valid source database using -source flag, received source = %v", source) } } case SourceProfileTypeConfig: { switch strings.ToLower(source) { case constants.MYSQL: return constants.MYSQL, nil default: return "", fmt.Errorf("specifying source-profile using config for non-mysql databases not implemented") } } case SourceProfileTypeCsv: return constants.CSV, nil default: return "", fmt.Errorf("invalid source-profile, could not infer type") } } // Flag source-profile is passed as a list of key value pairs on the command line. // Following 3 formats are supported as a valid source-profile. // // Format 1. Specify file path and file format. // File path can be a local file path or a gcs file path. Support for more file // path types can be added in future. // File format can be "dump" e.g., when specifying a mysqldump or pgdump etc. // Support for more formats e.g., "csv", "avro" etc can be added in future. // // Example: -source-profile="file=/tmp/abc, format=dump" // Example: -source-profile="file=gcs://bucket_name/cart.txt, format=dump" // // Format 2. Specify source connection parameters. If none specified, then read // from envrironment variables. // // Format 3. Specify a config file that specifies source connection profile. func NewSourceProfile(s string, source string, n NewSourceProfileInterface) (SourceProfile, error) { if source == "" { return SourceProfile{}, fmt.Errorf("cannot leave -source flag empty, please specify source databases e.g., -source=postgres etc") } params, err := ParseMap(s) if err != nil { return SourceProfile{}, fmt.Errorf("could not parse source-profile, error = %v", err) } if strings.ToLower(source) == constants.CSV { return SourceProfile{Ty: SourceProfileTypeCsv, Csv: NewSourceProfileCsv(params)}, nil } if _, ok := params["file"]; ok || filePipedToStdin() { profile := n.NewSourceProfileFile(params) return SourceProfile{Ty: SourceProfileTypeFile, File: profile}, nil } else if format, ok := params["format"]; ok { // File is not passed in from stdin or specified using "file" flag. return SourceProfile{Ty: SourceProfileTypeFile}, fmt.Errorf("file not specified, but format set to %v", format) } else if file, ok := params["config"]; ok { config, err := n.NewSourceProfileConfig(strings.ToLower(source), file) return SourceProfile{Ty: SourceProfileTypeConfig, Config: config}, err } else if _, ok := params["instance"]; ok { conn, err := n.NewSourceProfileConnectionCloudSQL(source, params, &SourceProfileDialectImpl{}) return SourceProfile{Ty: SourceProfileTypeCloudSQL, ConnCloudSQL: conn}, err } else { // Assume connection profile type connection by default, since // connection parameters could be specified as part of environment // variables. conn, err := n.NewSourceProfileConnection(source, params, &SourceProfileDialectImpl{}) return SourceProfile{Ty: SourceProfileTypeConnection, Conn: conn}, err } } var filePipedToStdin = func() bool { stat, _ := os.Stdin.Stat() // Data is being piped to stdin, if true. Else, stdin is from a terminal. return (stat.Mode() & os.ModeCharDevice) == 0 }