webv2/web.go (925 lines of code) (raw):

// Copyright 2022 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package web defines web APIs to be used with Spanner migration tool frontend. // Apart from schema conversion, this package involves API to update // converted schema. package webv2 import ( "context" "database/sql" "encoding/json" "fmt" "io/fs" "io/ioutil" "log" "net/http" "os" "path/filepath" "strconv" "strings" "time" instance "cloud.google.com/go/spanner/admin/instance/apiv1" storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage" storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage" "github.com/GoogleCloudPlatform/spanner-migration-tool/cmd" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants" "github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils" "github.com/GoogleCloudPlatform/spanner-migration-tool/conversion" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal" "github.com/GoogleCloudPlatform/spanner-migration-tool/internal/reports" "github.com/GoogleCloudPlatform/spanner-migration-tool/logger" "github.com/GoogleCloudPlatform/spanner-migration-tool/profiles" "github.com/GoogleCloudPlatform/spanner-migration-tool/proto/migration" "github.com/GoogleCloudPlatform/spanner-migration-tool/streaming" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/config" helpers "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/helpers" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/types" utilities "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/utilities" "github.com/pkg/browser" instancepb "google.golang.org/genproto/googleapis/spanner/admin/instance/v1" "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/session" _ "github.com/go-sql-driver/mysql" "github.com/gorilla/handlers" index "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/index" primarykey "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/primarykey" go_ora "github.com/sijms/go-ora/v2" ) // TODO:(searce): // 1) Test cases for APIs // 2) API for saving/updating table-level changes. // 3) API for showing logs // 4) Split all routing to an route.go file // 5) API for downloading the schema file, ddl file and summary report file. // 6) Update schema conv after setting global datatypes and return conv. (setTypeMap) // 7) Add rateConversion() in schema conversion, ddl and report APIs. // 8) Add an overview in summary report API // databaseConnection creates connection with database func databaseConnection(w http.ResponseWriter, r *http.Request) { reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var config types.DriverConfig err = json.Unmarshal(reqBody, &config) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } var dataSourceName string switch config.Driver { case constants.POSTGRES: dataSourceName = fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", config.Host, config.Port, config.User, config.Password, config.Database) case constants.MYSQL: dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.User, config.Password, config.Host, config.Port, config.Database) case constants.SQLSERVER: dataSourceName = fmt.Sprintf(`sqlserver://%s:%s@%s:%s?database=%s`, config.User, config.Password, config.Host, config.Port, config.Database) case constants.ORACLE: portNumber, _ := strconv.Atoi(config.Port) dataSourceName = go_ora.BuildUrl(config.Host, portNumber, config.Database, config.User, config.Password, nil) default: http.Error(w, fmt.Sprintf("Driver : '%s' is not supported", config.Driver), http.StatusBadRequest) return } sourceDB, err := sql.Open(config.Driver, dataSourceName) if err != nil { http.Error(w, fmt.Sprintf("Database connection error, check connection properties, ERROR: %v", err), http.StatusInternalServerError) return } // Open doesn't open a connection. Validate database connection. err = sourceDB.Ping() if err != nil { http.Error(w, fmt.Sprintf("Database connection error, check connection properties, ERROR: %v", err), http.StatusInternalServerError) return } sessionState := session.GetSessionState() sessionState.SourceDB = sourceDB sessionState.DbName = config.Database // schema and user is same in oracle. if config.Driver == constants.ORACLE { sessionState.DbName = config.User } sessionState.Driver = config.Driver sessionState.SessionFile = "" sessionState.Dialect = config.Dialect sessionState.IsSharded = config.IsSharded sessionState.SourceDBConnDetails = session.SourceDBConnDetails{ Host: config.Host, Port: config.Port, User: config.User, Password: config.Password, ConnectionType: helpers.DIRECT_CONNECT_MODE, } w.WriteHeader(http.StatusOK) } func setSourceDBDetailsForDump(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var dc types.DumpConfig err = json.Unmarshal(reqBody, &dc) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } dc.FilePath = "upload-file/" + dc.FilePath _, err = os.Open(dc.FilePath) if err != nil { http.Error(w, fmt.Sprintf("Failed to open dump file : %v, no such file or directory", dc.FilePath), http.StatusNotFound) return } sessionState.SourceDBConnDetails = session.SourceDBConnDetails{ Path: dc.FilePath, ConnectionType: helpers.DUMP_MODE, } w.WriteHeader(http.StatusOK) } // getSourceProfileConfig returns the configured source profile by the user func getSourceProfileConfig(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() sourceProfileConfig := sessionState.SourceProfileConfig if sourceProfileConfig.ConfigType == "dataflow" { for _, dataShard := range sourceProfileConfig.ShardConfigurationDataflow.DataShards { bucket, rootPath, err := conversion.GetBucketFromDatastreamProfile(sessionState.GCPProjectID, sessionState.Region, dataShard.DstConnectionProfile.Name) if err != nil { http.Error(w, fmt.Sprintf("error while getting target bucket: %v", err), http.StatusInternalServerError) return } dataShard.TmpDir = "gs://" + bucket + rootPath } } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(sourceProfileConfig) } func setDatastreamDetailsForShardedMigrations(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var datastreamConfig profiles.DatastreamConfig err = json.Unmarshal(reqBody, &datastreamConfig) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } sessionState.SourceProfileConfig.ShardConfigurationDataflow.DatastreamConfig = datastreamConfig w.WriteHeader(http.StatusOK) } func setGcsDetailsForShardedMigrations(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var gcsConfig profiles.GcsConfig err = json.Unmarshal(reqBody, &gcsConfig) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } sessionState.SourceProfileConfig.ShardConfigurationDataflow.GcsConfig = gcsConfig w.WriteHeader(http.StatusOK) } func setDataflowDetailsForShardedMigrations(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var dataflowConfig profiles.DataflowConfig err = json.Unmarshal(reqBody, &dataflowConfig) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } if dataflowConfig.Location == "" { dataflowConfig.Location = sessionState.Region } sessionState.SourceProfileConfig.ShardConfigurationDataflow.DataflowConfig = dataflowConfig w.WriteHeader(http.StatusOK) } func setShardsSourceDBDetailsForDataflow(w http.ResponseWriter, r *http.Request) { //Take the received object and store it into session state. sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var srcConfig types.ShardedDataflowConfig err = json.Unmarshal(reqBody, &srcConfig) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } sessionState.SourceProfileConfig.ConfigType = srcConfig.MigrationProfile.ConfigType sessionState.SourceProfileConfig.ShardConfigurationDataflow.DataShards = srcConfig.MigrationProfile.ShardConfigurationDataflow.DataShards sessionState.SourceProfileConfig.ShardConfigurationDataflow.SchemaSource = srcConfig.MigrationProfile.ShardConfigurationDataflow.SchemaSource if sessionState.SourceProfileConfig.ShardConfigurationDataflow.DataflowConfig.Location == "" { // Create dataflow config with defaults, it gets overridden if DataflowConfig is specified using the form. sessionState.SourceProfileConfig.ShardConfigurationDataflow.DataflowConfig = profiles.DataflowConfig{ Location: sessionState.Region, Network: "", Subnetwork: "", MaxWorkers: "", NumWorkers: "", ServiceAccountEmail: "", VpcHostProjectId: sessionState.GCPProjectID, } } w.WriteHeader(http.StatusOK) } func setShardsSourceDBDetailsForBulk(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var shardConfigs types.DriverConfigs err = json.Unmarshal(reqBody, &shardConfigs) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } var connDetailsList []profiles.DirectConnectionConfig for i, config := range shardConfigs.DbConfigs { dataSourceName := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.User, config.Password, config.Host, config.Port, config.Database) sourceDB, err := sql.Open(config.Driver, dataSourceName) if err != nil { http.Error(w, "Database connection error, check connection properties.", http.StatusInternalServerError) return } // Open doesn't open a connection. Validate database connection. err = sourceDB.Ping() if err != nil { http.Error(w, "Database connection error, check connection properties.", http.StatusInternalServerError) return } sessionState.DbName = config.Database sessionState.SessionFile = "" connDetail := profiles.DirectConnectionConfig{ Host: config.Host, Port: config.Port, User: config.User, Password: config.Password, DbName: config.Database, DataShardId: config.DataShardId, } connDetailsList = append(connDetailsList, connDetail) //set the first shard as the schema shard when restoring from a session file if shardConfigs.IsRestoredSession == constants.SESSION_FILE { if i == 0 { sessionState.SourceDBConnDetails = session.SourceDBConnDetails{ Host: config.Host, Port: config.Port, User: config.User, Password: config.Password, ConnectionType: helpers.DIRECT_CONNECT_MODE, } } } } sessionState.ShardedDbConnDetails = connDetailsList w.WriteHeader(http.StatusOK) } func setSourceDBDetailsForDirectConnect(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var config types.DriverConfig err = json.Unmarshal(reqBody, &config) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } var dataSourceName string switch config.Driver { case constants.POSTGRES: dataSourceName = fmt.Sprintf("host=%s port=%s user=%s password=%s dbname=%s sslmode=disable", config.Host, config.Port, config.User, config.Password, config.Database) case constants.MYSQL: dataSourceName = fmt.Sprintf("%s:%s@tcp(%s:%s)/%s", config.User, config.Password, config.Host, config.Port, config.Database) case constants.SQLSERVER: dataSourceName = fmt.Sprintf(`sqlserver://%s:%s@%s:%s?database=%s`, config.User, config.Password, config.Host, config.Port, config.Database) case constants.ORACLE: portNumber, _ := strconv.Atoi(config.Port) dataSourceName = go_ora.BuildUrl(config.Host, portNumber, config.Database, config.User, config.Password, nil) default: http.Error(w, fmt.Sprintf("Driver : '%s' is not supported", config.Driver), http.StatusBadRequest) return } sourceDB, err := sql.Open(config.Driver, dataSourceName) if err != nil { http.Error(w, "Database connection error, check connection properties.", http.StatusInternalServerError) return } // Open doesn't open a connection. Validate database connection. err = sourceDB.Ping() if err != nil { http.Error(w, "Database connection error, check connection properties.", http.StatusInternalServerError) return } sessionState.DbName = config.Database // schema and user is same in oracle. if config.Driver == constants.ORACLE { sessionState.DbName = config.User } sessionState.SessionFile = "" sessionState.SourceDBConnDetails = session.SourceDBConnDetails{ Host: config.Host, Port: config.Port, User: config.User, Password: config.Password, ConnectionType: helpers.DIRECT_CONNECT_MODE, } w.WriteHeader(http.StatusOK) } // loadSession load seesion file to Spanner migration tool. func loadSession(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() utilities.InitObjectId() reqBody, err := ioutil.ReadAll(r.Body) if err != nil { http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) return } var s session.SessionParams err = json.Unmarshal(reqBody, &s) if err != nil { http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } conv := internal.MakeConv() metadata := session.SessionMetadata{} err = session.ReadSessionFileForSessionMetadata(&metadata, constants.UPLOAD_FILE_DIR+"/"+s.FilePath) if err != nil { switch err.(type) { case *fs.PathError: http.Error(w, fmt.Sprintf("Failed to open session file : %v, no such file or directory", s.FilePath), http.StatusNotFound) default: http.Error(w, fmt.Sprintf("Failed to parse session file : %v", err), http.StatusBadRequest) } return } dbType := metadata.DatabaseType switch dbType { case constants.PGDUMP: dbType = constants.POSTGRES case constants.MYSQLDUMP: dbType = constants.MYSQL } if dbType != s.Driver { http.Error(w, fmt.Sprintf("Not a valid %v session file", dbType), http.StatusBadRequest) return } err = conversion.ReadSessionFile(conv, constants.UPLOAD_FILE_DIR+"/"+s.FilePath) if err != nil { switch err.(type) { case *fs.PathError: http.Error(w, fmt.Sprintf("Failed to open session file : %v, no such file or directory", s.FilePath), http.StatusNotFound) default: http.Error(w, fmt.Sprintf("Failed to parse session file : %v", err), http.StatusBadRequest) } return } sessionMetadata := session.SessionMetadata{ SessionName: "NewSession", DatabaseType: s.Driver, DatabaseName: metadata.DatabaseName, Dialect: conv.SpDialect, } if sessionMetadata.DatabaseName == "" { sessionMetadata.DatabaseName = strings.TrimRight(filepath.Base(s.FilePath), filepath.Ext(s.FilePath)) } sessionState.Conv.ConvLock.Lock() defer sessionState.Conv.ConvLock.Unlock() sessionState.Conv = conv primarykey.DetectHotspot() index.IndexSuggestion() sessionState.Conv.UsedNames = internal.ComputeUsedNames(sessionState.Conv) sessionState.SessionMetadata = sessionMetadata sessionState.Driver = s.Driver sessionState.SessionFile = constants.UPLOAD_FILE_DIR + s.FilePath sessionState.SourceDBConnDetails = session.SourceDBConnDetails{ Path: constants.UPLOAD_FILE_DIR + "/" + s.FilePath, ConnectionType: helpers.SESSION_FILE_MODE, } sessionState.Dialect = conv.SpDialect convm := session.ConvWithMetadata{ SessionMetadata: sessionMetadata, Conv: *conv, } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(convm) } func fetchLastLoadedSessionDetails(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() convm := session.ConvWithMetadata{ SessionMetadata: sessionState.SessionMetadata, Conv: *sessionState.Conv, } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(convm) } // getSchemaFile generates schema file and returns file path. func getSchemaFile(w http.ResponseWriter, r *http.Request) { ioHelper := &utils.IOStreams{In: os.Stdin, Out: os.Stdout} var err error now := time.Now() filePrefix, err := utilities.GetFilePrefix(now) if err != nil { http.Error(w, fmt.Sprintf("Can not get file prefix : %v", err), http.StatusInternalServerError) } schemaFileName := "frontend/" + filePrefix + "schema.txt" sessionState := session.GetSessionState() sessionState.Conv.ConvLock.RLock() defer sessionState.Conv.ConvLock.RUnlock() conversion.WriteSchemaFile(sessionState.Conv, now, schemaFileName, ioHelper.Out, sessionState.Driver) schemaAbsPath, err := filepath.Abs(schemaFileName) if err != nil { http.Error(w, fmt.Sprintf("Can not create absolute path : %v", err), http.StatusInternalServerError) } w.WriteHeader(http.StatusOK) w.Write([]byte(schemaAbsPath)) } // getIssueDescription maps IssueDB's Category to corresponding CategoryDescription(if present), // or to the Brief if not present and pass the map to frontend to be used in assessment report UI func getIssueDescription(w http.ResponseWriter, r *http.Request) { var issuesMap = make(map[string]string) for _, issue := range reports.IssueDB { if issue.CategoryDescription == "" { issuesMap[issue.Category] = issue.Brief } else { issuesMap[issue.Category] = issue.CategoryDescription } } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(issuesMap) } func getBackendHealth(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } // ToDo : To Remove once Rules Component updated // addIndexes checks the new names for spanner name validity, ensures the new names are already not used by existing tables // secondary indexes or foreign key constraints. If above checks passed then new indexes are added to the schema else appropriate // error thrown. func getSourceDestinationSummary(w http.ResponseWriter, r *http.Request) { sessionState := session.GetSessionState() sessionState.Conv.ConvLock.RLock() defer sessionState.Conv.ConvLock.RUnlock() // GetSourceDestinationSummary is called when the user enters prepare migration page // Getting and populating SpannerProjectId if it doesn't exist. if sessionState.SpannerProjectId == "" { sessionState.SpannerProjectId = sessionState.GCPProjectID } var sessionSummary types.SessionSummary databaseType, err := helpers.GetSourceDatabaseFromDriver(sessionState.Driver) if err != nil { http.Error(w, fmt.Sprintf("Error while getting source database: %v", err), http.StatusBadRequest) return } sessionSummary.DatabaseType = databaseType sessionSummary.SourceDatabaseName = sessionState.DbName sessionSummary.ConnectionType = sessionState.SourceDBConnDetails.ConnectionType sessionSummary.SourceTableCount = len(sessionState.Conv.SrcSchema) sessionSummary.SpannerTableCount = len(sessionState.Conv.SpSchema) sourceIndexCount, spannerIndexCount := 0, 0 for _, spannerSchema := range sessionState.Conv.SpSchema { spannerIndexCount = spannerIndexCount + len(spannerSchema.Indexes) } for _, sourceSchema := range sessionState.Conv.SrcSchema { sourceIndexCount = sourceIndexCount + len(sourceSchema.Indexes) } sessionSummary.SourceIndexCount = sourceIndexCount sessionSummary.SpannerIndexCount = spannerIndexCount ctx := context.Background() instanceClient, err := instance.NewInstanceAdminClient(ctx) if err != nil { log.Println("instance admin client creation error") http.Error(w, fmt.Sprintf("Error while creating instance admin client : %v", err), http.StatusBadRequest) return } instanceInfo, err := instanceClient.GetInstance(ctx, &instancepb.GetInstanceRequest{Name: fmt.Sprintf("projects/%s/instances/%s", sessionState.SpannerProjectId, sessionState.SpannerInstanceID)}) if err != nil { log.Println("get instance error") http.Error(w, fmt.Sprintf("Error while getting instance information : %v", err), http.StatusBadRequest) return } instanceConfig, err := instanceClient.GetInstanceConfig(ctx, &instancepb.GetInstanceConfigRequest{Name: instanceInfo.Config}) if err != nil { log.Println("get instance config error") http.Error(w, fmt.Sprintf("Error while getting instance config : %v", err), http.StatusBadRequest) return } for _, replica := range instanceConfig.Replicas { if replica.DefaultLeaderLocation { sessionSummary.Region = replica.Location } } sessionState.Region = sessionSummary.Region sessionSummary.NodeCount = int(instanceInfo.NodeCount) sessionSummary.ProcessingUnits = int(instanceInfo.ProcessingUnits) sessionSummary.Instance = sessionState.SpannerInstanceID sessionSummary.Dialect = helpers.GetDialectDisplayStringFromDialect(sessionState.Dialect) sessionSummary.IsSharded = sessionState.Conv.IsSharded w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(sessionSummary) } func updateProgress(w http.ResponseWriter, r *http.Request) { var detail types.ProgressDetails sessionState := session.GetSessionState() sessionState.Conv.ConvLock.RLock() defer sessionState.Conv.ConvLock.RUnlock() if sessionState.Error != nil { detail.ErrorMessage = sessionState.Error.Error() } else { detail.ErrorMessage = "" detail.Progress, detail.ProgressStatus = sessionState.Conv.Audit.Progress.ReportProgress() } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(detail) } func migrate(w http.ResponseWriter, r *http.Request) { log.Println("request started", "method", r.Method, "path", r.URL.Path) reqBody, err := ioutil.ReadAll(r.Body) if err != nil { log.Println("request's body Read Error") http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError) } details := types.MigrationDetails{} err = json.Unmarshal(reqBody, &details) if err != nil { log.Println("request's Body parse error") http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest) return } sessionState := session.GetSessionState() sessionState.Error = nil ctx := context.Background() sessionState.Conv.Audit.Progress = internal.Progress{} sessionState.Conv.UI = true sourceProfile, targetProfile, ioHelper, dbName, err := getSourceAndTargetProfiles(sessionState, details) // TODO: Fix UX flow of migration project id migrationProjectId := sessionState.GCPProjectID if sessionState.SpannerProjectId == "" { sessionState.SpannerProjectId = sessionState.GCPProjectID } if err != nil { log.Println("can't get source and target profile") http.Error(w, fmt.Sprintf("Can't get source and target profiles: %v", err), http.StatusBadRequest) return } err = writeSessionFile(ctx, sessionState) if err != nil { log.Println("can't write session file") http.Error(w, fmt.Sprintf("Can't write session file to GCS: %v", err), http.StatusBadRequest) return } sessionState.Conv.ResetStats() sessionState.Conv.Audit.Progress = internal.Progress{} // Set env variable SKIP_METRICS_POPULATION to true in case of dev testing sessionState.Conv.Audit.SkipMetricsPopulation = os.Getenv("SKIP_METRICS_POPULATION") == "true" if details.MigrationMode == helpers.SCHEMA_ONLY { log.Println("Starting schema only migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_SCHEMA_ONLY.Enum() go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, &cmd.SchemaCmd{}, sessionState.Conv, &sessionState.Error) } else if details.MigrationMode == helpers.DATA_ONLY { dataCmd := &cmd.DataCmd{ SkipForeignKeys: details.SkipForeignKeys, WriteLimit: cmd.DefaultWritersLimit, } log.Println("Starting data only migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_DATA_ONLY.Enum() go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, dataCmd, sessionState.Conv, &sessionState.Error) } else { schemaAndDataCmd := &cmd.SchemaAndDataCmd{ SkipForeignKeys: details.SkipForeignKeys, WriteLimit: cmd.DefaultWritersLimit, } log.Println("Starting schema and data migration") sessionState.Conv.Audit.MigrationType = migration.MigrationData_SCHEMA_AND_DATA.Enum() go cmd.MigrateDatabase(ctx, migrationProjectId, targetProfile, sourceProfile, dbName, &ioHelper, schemaAndDataCmd, sessionState.Conv, &sessionState.Error) } w.WriteHeader(http.StatusOK) log.Println("migration completed", "method", r.Method, "path", r.URL.Path, "remoteaddr", r.RemoteAddr) } func getGeneratedResources(w http.ResponseWriter, r *http.Request) { var generatedResources types.GeneratedResources sessionState := session.GetSessionState() sessionState.Conv.ConvLock.RLock() defer sessionState.Conv.ConvLock.RUnlock() generatedResources.MigrationJobId = sessionState.Conv.Audit.MigrationRequestId generatedResources.DatabaseName = sessionState.SpannerDatabaseName generatedResources.DatabaseUrl = fmt.Sprintf("https://console.cloud.google.com/spanner/instances/%v/databases/%v/details/tables?project=%v", sessionState.SpannerInstanceID, sessionState.SpannerDatabaseName, sessionState.SpannerProjectId) generatedResources.BucketName = sessionState.Bucket + sessionState.RootPath generatedResources.BucketUrl = fmt.Sprintf("https://console.cloud.google.com/storage/browser/%v", sessionState.Bucket+sessionState.RootPath) generatedResources.ShardToShardResourcesMap = map[string][]types.ResourceDetails{} if sessionState.Conv.Audit.StreamingStats.DatastreamResources.DatastreamName != "" { generatedResources.DataStreamJobName = sessionState.Conv.Audit.StreamingStats.DatastreamResources.DatastreamName generatedResources.DataStreamJobUrl = fmt.Sprintf("https://console.cloud.google.com/datastream/streams/locations/%v/instances/%v?project=%v", sessionState.Region, sessionState.Conv.Audit.StreamingStats.DatastreamResources.DatastreamName, sessionState.GCPProjectID) } if sessionState.Conv.Audit.StreamingStats.DataflowResources.JobId != "" { generatedResources.DataflowJobName = sessionState.Conv.Audit.StreamingStats.DataflowResources.JobId generatedResources.DataflowJobUrl = fmt.Sprintf("https://console.cloud.google.com/dataflow/jobs/%v/%v?project=%v", sessionState.Conv.Audit.StreamingStats.DataflowResources.Region, sessionState.Conv.Audit.StreamingStats.DataflowResources.JobId, sessionState.GCPProjectID) generatedResources.DataflowGcloudCmd = sessionState.Conv.Audit.StreamingStats.DataflowResources.GcloudCmd } if sessionState.Conv.Audit.StreamingStats.PubsubResources.TopicId != "" { generatedResources.PubsubTopicName = sessionState.Conv.Audit.StreamingStats.PubsubResources.TopicId generatedResources.PubsubTopicUrl = fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/topic/detail/%v?project=%v", sessionState.Conv.Audit.StreamingStats.PubsubResources.TopicId, sessionState.GCPProjectID) } if sessionState.Conv.Audit.StreamingStats.DlqPubsubResources.SubscriptionId != "" { generatedResources.DlqPubsubSubscriptionName = sessionState.Conv.Audit.StreamingStats.DlqPubsubResources.SubscriptionId generatedResources.DlqPubsubSubscriptionUrl = fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/subscription/detail/%v?project=%v", sessionState.Conv.Audit.StreamingStats.DlqPubsubResources.SubscriptionId, sessionState.GCPProjectID) } if sessionState.Conv.Audit.StreamingStats.MonitoringResources.DashboardName != "" { generatedResources.MonitoringDashboardName = sessionState.Conv.Audit.StreamingStats.MonitoringResources.DashboardName generatedResources.MonitoringDashboardUrl = fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", sessionState.Conv.Audit.StreamingStats.MonitoringResources.DashboardName, sessionState.GCPProjectID) } if sessionState.Conv.Audit.StreamingStats.AggMonitoringResources.DashboardName != "" { generatedResources.AggMonitoringDashboardName = sessionState.Conv.Audit.StreamingStats.AggMonitoringResources.DashboardName generatedResources.AggMonitoringDashboardUrl = fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", sessionState.Conv.Audit.StreamingStats.AggMonitoringResources.DashboardName, sessionState.GCPProjectID) } for shardId, shardResources := range sessionState.Conv.Audit.StreamingStats.ShardToShardResourcesMap { //Datastream url := fmt.Sprintf("https://console.cloud.google.com/datastream/streams/locations/%v/instances/%v?project=%v", sessionState.Region, shardResources.DatastreamResources.DatastreamName, sessionState.GCPProjectID) resourceDetails := types.ResourceDetails{ResourceType: constants.DATASTREAM_RESOURCE, ResourceName: shardResources.DatastreamResources.DatastreamName, ResourceUrl: url} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], resourceDetails) //Dataflow dfId := shardResources.DataflowResources.JobId url = fmt.Sprintf("https://console.cloud.google.com/dataflow/jobs/%v/%v?project=%v", sessionState.Conv.Audit.StreamingStats.DataflowResources.Region, dfId, sessionState.GCPProjectID) resourceDetails = types.ResourceDetails{ResourceType: constants.DATAFLOW_RESOURCE, ResourceName: dfId, ResourceUrl: url, GcloudCmd: shardResources.DataflowResources.GcloudCmd} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], resourceDetails) //monitoring url = fmt.Sprintf("https://console.cloud.google.com/monitoring/dashboards/builder/%v?project=%v", shardResources.MonitoringResources.DashboardName, sessionState.GCPProjectID) resourceDetails = types.ResourceDetails{ResourceType: constants.MONITORING_RESOURCE, ResourceName: shardResources.MonitoringResources.DashboardName, ResourceUrl: url} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], resourceDetails) //gcs url = fmt.Sprintf("https://console.cloud.google.com/storage/browser/%v?project=%v", shardResources.GcsResources.BucketName, sessionState.GCPProjectID) resourceDetails = types.ResourceDetails{ResourceType: constants.GCS_RESOURCE, ResourceName: shardResources.GcsResources.BucketName, ResourceUrl: url} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], resourceDetails) //pubsub topicUrl := fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/topic/detail/%v?project=%v", shardResources.PubsubResources.TopicId, sessionState.GCPProjectID) topicResourceDetails := types.ResourceDetails{ResourceType: constants.PUBSUB_TOPIC_RESOURCE, ResourceName: shardResources.PubsubResources.TopicId, ResourceUrl: topicUrl} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], topicResourceDetails) subscriptionUrl := fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/subscription/detail/%v?project=%v", shardResources.PubsubResources.SubscriptionId, sessionState.GCPProjectID) subscriptionResourceDetails := types.ResourceDetails{ResourceType: constants.PUBSUB_SUB_RESOURCE, ResourceName: shardResources.PubsubResources.SubscriptionId, ResourceUrl: subscriptionUrl} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], subscriptionResourceDetails) //dlq-pubsub dlqTopicUrl := fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/topic/detail/%v?project=%v", shardResources.DlqPubsubResources.TopicId, sessionState.GCPProjectID) dlqTopicResourceDetails := types.ResourceDetails{ResourceType: constants.DLQ_PUBSUB_TOPIC_RESOURCE, ResourceName: shardResources.DlqPubsubResources.TopicId, ResourceUrl: dlqTopicUrl} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], dlqTopicResourceDetails) dlqSubscriptionUrl := fmt.Sprintf("https://console.cloud.google.com/cloudpubsub/subscription/detail/%v?project=%v", shardResources.DlqPubsubResources.SubscriptionId, sessionState.GCPProjectID) dlqSubscriptionResourceDetails := types.ResourceDetails{ResourceType: constants.DLQ_PUBSUB_SUB_RESOURCE, ResourceName: shardResources.DlqPubsubResources.SubscriptionId, ResourceUrl: dlqSubscriptionUrl} generatedResources.ShardToShardResourcesMap[shardId] = append(generatedResources.ShardToShardResourcesMap[shardId], dlqSubscriptionResourceDetails) } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(generatedResources) } func getSourceAndTargetProfiles(sessionState *session.SessionState, details types.MigrationDetails) (profiles.SourceProfile, profiles.TargetProfile, utils.IOStreams, string, error) { var ( sourceProfileString string err error ) sourceDBConnectionDetails := sessionState.SourceDBConnDetails if sourceDBConnectionDetails.ConnectionType == helpers.DUMP_MODE { sourceProfileString = fmt.Sprintf("file=%v,format=dump", sourceDBConnectionDetails.Path) } else if details.IsSharded { sourceProfileString, err = getSourceProfileStringForShardedMigrations(sessionState, details) if err != nil { return profiles.SourceProfile{}, profiles.TargetProfile{}, utils.IOStreams{}, "", fmt.Errorf("error while creating config to initiate sharded migration:%v", err) } } else { sourceProfileString = fmt.Sprintf("host=%v,port=%v,user=%v,password=%v,dbName=%v", sourceDBConnectionDetails.Host, sourceDBConnectionDetails.Port, sourceDBConnectionDetails.User, sourceDBConnectionDetails.Password, sessionState.DbName) } sessionState.SpannerDatabaseName = details.TargetDetails.TargetDB targetProfileString := fmt.Sprintf("project=%v,instance=%v,dbName=%v,dialect=%v", sessionState.SpannerProjectId, sessionState.SpannerInstanceID, details.TargetDetails.TargetDB, sessionState.Dialect) if details.MigrationType == helpers.LOW_DOWNTIME_MIGRATION && !details.IsSharded { fileName := sessionState.Conv.Audit.MigrationRequestId + "-streaming.json" sessionState.Bucket, sessionState.RootPath, err = conversion.GetBucketFromDatastreamProfile(sessionState.GCPProjectID, sessionState.Region, details.TargetDetails.TargetConnectionProfileName) if err != nil { return profiles.SourceProfile{}, profiles.TargetProfile{}, utils.IOStreams{}, "", fmt.Errorf("error while getting target bucket: %v", err) } err = createStreamingCfgFile(sessionState, details, fileName) if err != nil { return profiles.SourceProfile{}, profiles.TargetProfile{}, utils.IOStreams{}, "", fmt.Errorf("error while creating streaming config file: %v", err) } sourceProfileString = sourceProfileString + fmt.Sprintf(",streamingCfg=%v", fileName) } else { sessionState.Conv.Audit.MigrationRequestId, _ = utils.GenerateName("smt-job") sessionState.Conv.Audit.MigrationRequestId = strings.Replace(sessionState.Conv.Audit.MigrationRequestId, "_", "-", -1) if details.TargetDetails.GcsMetadataPath.GcsBucketName != "" { if details.TargetDetails.GcsMetadataPath.GcsBucketRootPath == "" { sessionState.Bucket = details.TargetDetails.GcsMetadataPath.GcsBucketName if !strings.HasSuffix(sessionState.Bucket, "/") { sessionState.RootPath = "/" } else { sessionState.RootPath = "" } } else { sessionState.Bucket = details.TargetDetails.GcsMetadataPath.GcsBucketName if !strings.HasSuffix(sessionState.Bucket, "/") { sessionState.Bucket = sessionState.Bucket + "/" } sessionState.RootPath = details.TargetDetails.GcsMetadataPath.GcsBucketRootPath if !strings.HasSuffix(sessionState.RootPath, "/") { sessionState.RootPath = sessionState.RootPath + "/" } } } else { sessionState.Bucket = strings.ToLower(sessionState.Conv.Audit.MigrationRequestId) sessionState.RootPath = "/" } } source, err := helpers.GetSourceDatabaseFromDriver(sessionState.Driver) if err != nil { return profiles.SourceProfile{}, profiles.TargetProfile{}, utils.IOStreams{}, "", fmt.Errorf("error while getting source database: %v", err) } sourceProfile, targetProfile, ioHelper, dbName, err := cmd.PrepareMigrationPrerequisites(sourceProfileString, targetProfileString, source) if err != nil && sourceDBConnectionDetails.ConnectionType != helpers.SESSION_FILE_MODE { return profiles.SourceProfile{}, profiles.TargetProfile{}, utils.IOStreams{}, "", fmt.Errorf("error while preparing prerequisites for migration: %v", err) } sourceProfile.Driver = sessionState.Driver if details.MigrationType == helpers.LOW_DOWNTIME_MIGRATION { sourceProfile.Config.ConfigType = constants.DATAFLOW_MIGRATION } return sourceProfile, targetProfile, ioHelper, dbName, nil } func getSourceProfileStringForShardedMigrations(sessionState *session.SessionState, details types.MigrationDetails) (string, error) { fileName := sessionState.Conv.Audit.MigrationRequestId + "-sharding.cfg" if details.MigrationType != helpers.LOW_DOWNTIME_MIGRATION { err := createConfigFileForShardedBulkMigration(sessionState, details, fileName) if err != nil { return "", err } return fmt.Sprintf("config=%v", fileName), nil } else if details.MigrationType == helpers.LOW_DOWNTIME_MIGRATION { err := createConfigFileForShardedDataflowMigration(sessionState, details, fileName) if err != nil { return "", err } return fmt.Sprintf("config=%v", fileName), nil } else { return "", fmt.Errorf("this migration type is not implemented yet") } } func createConfigFileForShardedDataflowMigration(sessionState *session.SessionState, details types.MigrationDetails, fileName string) error { sourceProfileConfig := sessionState.SourceProfileConfig file, err := json.MarshalIndent(sourceProfileConfig, "", " ") if err != nil { return fmt.Errorf("error while marshalling json: %v", err) } err = ioutil.WriteFile(fileName, file, 0644) if err != nil { return fmt.Errorf("error while writing json to file: %v", err) } return nil } func createConfigFileForShardedBulkMigration(sessionState *session.SessionState, details types.MigrationDetails, fileName string) error { sourceProfileConfig := profiles.SourceProfileConfig{ ConfigType: constants.BULK_MIGRATION, ShardConfigurationBulk: profiles.ShardConfigurationBulk{ SchemaSource: profiles.DirectConnectionConfig{ Host: sessionState.SourceDBConnDetails.Host, User: sessionState.SourceDBConnDetails.User, Password: sessionState.SourceDBConnDetails.Password, Port: sessionState.SourceDBConnDetails.Port, DbName: sessionState.DbName, }, DataShards: sessionState.ShardedDbConnDetails, }, } file, err := json.MarshalIndent(sourceProfileConfig, "", " ") if err != nil { return fmt.Errorf("error while marshalling json: %v", err) } err = ioutil.WriteFile(fileName, file, 0644) if err != nil { return fmt.Errorf("error while writing json to file: %v", err) } return nil } func writeSessionFile(ctx context.Context, sessionState *session.SessionState) error { sc, err := storageclient.NewStorageClientImpl(ctx) if err != nil { return err } sa := storageaccessor.StorageAccessorImpl{} err = sa.CreateGCSBucket(ctx, sc, storageaccessor.StorageBucketMetadata{ BucketName: sessionState.Bucket, ProjectID: sessionState.GCPProjectID, Location: sessionState.Region, Ttl: 0, MatchesPrefix: nil, }) if err != nil { return fmt.Errorf("error while creating bucket: %v", err) } convJSON, err := json.MarshalIndent(sessionState.Conv, "", " ") if err != nil { return fmt.Errorf("can't encode session state to JSON: %v", err) } err = sa.WriteDataToGCS(ctx, sc, "gs://"+sessionState.Bucket+sessionState.RootPath, "session.json", string(convJSON)) if err != nil { return fmt.Errorf("error while writing to GCS: %v", err) } return nil } func createStreamingCfgFile(sessionState *session.SessionState, details types.MigrationDetails, fileName string) error { targetDetails, datastreamConfig, dataflowConfig := details.TargetDetails, details.DatastreamConfig, details.DataflowConfig dfLocation := sessionState.Region if dataflowConfig.Location != "" { dfLocation = dataflowConfig.Location } data := streaming.StreamingCfg{ DatastreamCfg: streaming.DatastreamCfg{ StreamId: "", StreamLocation: sessionState.Region, StreamDisplayName: "", SourceConnectionConfig: streaming.SrcConnCfg{ Name: targetDetails.SourceConnectionProfileName, Location: sessionState.Region, }, DestinationConnectionConfig: streaming.DstConnCfg{ Name: targetDetails.TargetConnectionProfileName, Location: sessionState.Region, }, MaxConcurrentBackfillTasks: datastreamConfig.MaxConcurrentBackfillTasks, MaxConcurrentCdcTasks: datastreamConfig.MaxConcurrentCdcTasks, }, GcsCfg: streaming.GcsCfg{ TtlInDays: details.GcsConfig.TtlInDays, TtlInDaysSet: details.GcsConfig.TtlInDaysSet, }, DataflowCfg: streaming.DataflowCfg{ ProjectId: dataflowConfig.ProjectId, JobName: "", Location: dfLocation, Network: dataflowConfig.Network, Subnetwork: dataflowConfig.Subnetwork, MaxWorkers: dataflowConfig.MaxWorkers, NumWorkers: dataflowConfig.NumWorkers, ServiceAccountEmail: dataflowConfig.ServiceAccountEmail, VpcHostProjectId: dataflowConfig.VpcHostProjectId, MachineType: dataflowConfig.MachineType, AdditionalUserLabels: dataflowConfig.AdditionalUserLabels, KmsKeyName: dataflowConfig.KmsKeyName, GcsTemplatePath: dataflowConfig.GcsTemplatePath, CustomJarPath: dataflowConfig.CustomJarPath, CustomClassName: dataflowConfig.CustomClassName, CustomParameter: dataflowConfig.CustomParameter, }, TmpDir: "gs://" + sessionState.Bucket + sessionState.RootPath, } databaseType, _ := helpers.GetSourceDatabaseFromDriver(sessionState.Driver) if databaseType == constants.POSTGRES { data.DatastreamCfg.Properties = fmt.Sprintf("replicationSlot=%v,publication=%v", targetDetails.ReplicationSlot, targetDetails.Publication) } file, err := json.MarshalIndent(data, "", " ") if err != nil { return fmt.Errorf("error while marshalling json: %v", err) } err = ioutil.WriteFile(fileName, file, 0644) if err != nil { return fmt.Errorf("error while writing json to file: %v", err) } return nil } func uploadFile(w http.ResponseWriter, r *http.Request) { r.ParseMultipartForm(10 << 20) // FormFile returns the first file for the given key `myFile` // it also returns the FileHeader so we can get the Filename, // the Header and the size of the file file, handlers, err := r.FormFile("myFile") if err != nil { http.Error(w, fmt.Sprintf("error retrieving the file"), http.StatusBadRequest) return } defer file.Close() // Remove the existing files err = os.RemoveAll("upload-file/") if err != nil { http.Error(w, fmt.Sprintf("error removing existing files"), http.StatusBadRequest) return } err = os.MkdirAll("upload-file", os.ModePerm) if err != nil { http.Error(w, fmt.Sprintf("error while creating directory"), http.StatusBadRequest) return } f, err := os.Create("upload-file/" + handlers.Filename) if err != nil { http.Error(w, fmt.Sprintf("not able to create file"), http.StatusBadRequest) return } // read all of the contents of our uploaded file into a byte array fileBytes, err := ioutil.ReadAll(file) if err != nil { http.Error(w, fmt.Sprintf("error reading the file"), http.StatusBadRequest) return } if _, err := f.Write(fileBytes); err != nil { http.Error(w, fmt.Sprintf("error writing the file"), http.StatusBadRequest) return } w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode("file uploaded successfully") } // rollback is used to get previous state of conversion in case // some unexpected error occurs during update operations. func rollback(err error) error { sessionState := session.GetSessionState() if sessionState.SessionFile == "" { return fmt.Errorf("encountered error %w. rollback failed because we don't have a session file", err) } sessionState.Conv = internal.MakeConv() sessionState.Conv.SpDialect = constants.DIALECT_GOOGLESQL err2 := conversion.ReadSessionFile(sessionState.Conv, sessionState.SessionFile) if err2 != nil { return fmt.Errorf("encountered error %w. rollback failed: %v", err, err2) } return err } func init() { sessionState := session.GetSessionState() utilities.InitObjectId() sessionState.Conv = internal.MakeConv() config := config.TryInitializeSpannerConfig() session.SetSessionStorageConnectionState(config.GCPProjectID, config.SpannerProjectID, config.SpannerInstanceID) } // App connects to the web app v2. func App(logLevel string, open bool, port int) error { err := logger.InitializeLogger(logLevel) if err != nil { return fmt.Errorf("error initialising webapp, did you specify a valid log-level? [DEBUG, INFO]") } addr := fmt.Sprintf(":%s", strconv.Itoa(port)) router := getRoutes() fmt.Println("Starting Spanner migration tool UI at:", fmt.Sprintf("http://localhost%s", addr)) fmt.Println("Reverse Replication feature in preview: Please refer to https://github.com/GoogleCloudPlatform/spanner-migration-tool/blob/master/reverse_replication/README.md for detailed instructions.") if open { browser.OpenURL(fmt.Sprintf("http://localhost%s", addr)) } return http.ListenAndServe(addr, handlers.CORS(handlers.AllowedHeaders([]string{"X-Requested-With", "Content-Type", "Authorization"}), handlers.AllowedMethods([]string{"GET", "POST", "PUT", "HEAD", "OPTIONS"}), handlers.AllowedOrigins([]string{"*"}))(router)) }