webv2/profile/profile.go (313 lines of code) (raw):
package profile
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"strconv"
"strings"
"sync"
datastream "cloud.google.com/go/datastream/apiv1"
ds "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/datastream"
storageclient "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/clients/storage"
datastream_accessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/datastream"
storageaccessor "github.com/GoogleCloudPlatform/spanner-migration-tool/accessors/storage"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/utils"
"github.com/GoogleCloudPlatform/spanner-migration-tool/conversion"
"github.com/GoogleCloudPlatform/spanner-migration-tool/profiles"
"github.com/GoogleCloudPlatform/spanner-migration-tool/streaming"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/helpers"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/session"
"google.golang.org/api/iterator"
datastreampb "google.golang.org/genproto/googleapis/cloud/datastream/v1"
)
type shardedDataflowConfig struct {
MigrationProfile profiles.SourceProfileConfig
}
type ProfileAPIHandler struct {
ValidateResources conversion.ValidateResourcesInterface
}
func ListConnectionProfiles(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
dsClient, err := datastream.NewClient(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("datastream client can not be created: %v", err), http.StatusBadRequest)
}
defer dsClient.Close()
sessionState := session.GetSessionState()
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()
source := r.FormValue("source") == "true"
if !source {
sessionState.Conv.Audit.MigrationRequestId, _ = utils.GenerateName("smt-job")
sessionState.Conv.Audit.MigrationRequestId = strings.Replace(sessionState.Conv.Audit.MigrationRequestId, "_", "-", -1)
sessionState.Bucket = strings.ToLower(sessionState.Conv.Audit.MigrationRequestId) + "/"
}
databaseType, err := helpers.GetSourceDatabaseFromDriver(sessionState.Driver)
if err != nil {
http.Error(w, fmt.Sprintf("Error while getting source database: %v", err), http.StatusBadRequest)
return
}
var connectionProfileList []connectionProfile
req := &datastreampb.ListConnectionProfilesRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", sessionState.GCPProjectID, sessionState.Region),
}
it := dsClient.ListConnectionProfiles(ctx, req)
for {
resp, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
http.Error(w, fmt.Sprintf("Error while getting list of connection profiles: %v", err), http.StatusBadRequest)
return
}
if source && databaseType == constants.MYSQL && resp.GetMysqlProfile().GetHostname() != "" {
connectionProfileList = append(connectionProfileList, connectionProfile{Name: resp.GetName(), DisplayName: resp.GetDisplayName()})
} else if source && databaseType == constants.ORACLE && resp.GetOracleProfile().GetHostname() != "" {
connectionProfileList = append(connectionProfileList, connectionProfile{Name: resp.GetName(), DisplayName: resp.GetDisplayName()})
} else if source && databaseType == constants.POSTGRES && resp.GetPostgresqlProfile().GetHostname() != "" {
connectionProfileList = append(connectionProfileList, connectionProfile{Name: resp.GetName(), DisplayName: resp.GetDisplayName()})
} else if !source && resp.GetGcsProfile().GetBucket() != "" {
connectionProfileList = append(connectionProfileList, connectionProfile{Name: resp.GetName(), DisplayName: resp.GetDisplayName()})
}
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(connectionProfileList)
}
func GetStaticIps(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
dsClient, err := datastream.NewClient(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("datastream client can not be created: %v", err), http.StatusBadRequest)
}
defer dsClient.Close()
sessionState := session.GetSessionState()
req := &datastreampb.FetchStaticIpsRequest{
Name: fmt.Sprintf("projects/%s/locations/%s", sessionState.GCPProjectID, sessionState.Region),
}
it := dsClient.FetchStaticIps(ctx, req)
var staticIpList []string
for {
resp, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
http.Error(w, fmt.Sprintf("Error while fetching static Ips: %v", err), http.StatusBadRequest)
return
}
staticIpList = append(staticIpList, resp)
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(staticIpList)
}
func CreateConnectionProfile(w http.ResponseWriter, r *http.Request) {
log.Println("request started", "method", r.Method, "path", r.URL.Path)
reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("request's body Read Error")
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
}
details := connectionProfileReqV2{}
err = json.Unmarshal(reqBody, &details)
if err != nil {
log.Println("request's Body parse error")
http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest)
return
}
ctx := context.Background()
dsClient, err := datastream.NewClient(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("datastream client can not be created: %v", err), http.StatusBadRequest)
}
defer dsClient.Close()
sessionState := session.GetSessionState()
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()
databaseType, err := helpers.GetSourceDatabaseFromDriver(sessionState.Driver)
if err != nil {
http.Error(w, fmt.Sprintf("Error while getting source database: %v", err), http.StatusBadRequest)
return
}
if sessionState.IsSharded {
if databaseType != constants.MYSQL {
http.Error(w, fmt.Sprintf("%v database type is not currently implemented for sharded migrations: %v", databaseType, err), http.StatusBadRequest)
return
}
dsClient, err := ds.NewDatastreamClientImpl(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
storageclient, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
resGenerator := conversion.NewResourceGenerationImpl(&datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageclient)
req := conversion.ConnectionProfileReq{
ConnectionProfile: conversion.ConnectionProfile{
ProjectId: sessionState.GCPProjectID,
Id: details.Id,
ValidateOnly: details.ValidateOnly,
IsSource: details.IsSource,
Host: details.Host,
Port: details.Port,
Password: details.Password,
User: details.User,
Region: sessionState.Region,
},
Ctx: ctx,
}
mutex := &sync.Mutex{}
result := resGenerator.PrepareMinimalDowntimeResources(&req, mutex)
if result.Err != nil {
http.Error(w, fmt.Sprintf("Resource generation failed: %v", err), http.StatusBadRequest)
return
}
return
}
req := &datastreampb.CreateConnectionProfileRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", sessionState.GCPProjectID, sessionState.Region),
ConnectionProfileId: details.Id,
ConnectionProfile: &datastreampb.ConnectionProfile{
DisplayName: details.Id,
Connectivity: &datastreampb.ConnectionProfile_StaticServiceIpConnectivity{},
},
ValidateOnly: details.ValidateOnly,
}
var bucketName string
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Error while StorageClientImpl: %v", err), http.StatusBadRequest)
return
}
sa := storageaccessor.StorageAccessorImpl{}
if !details.IsSource {
if sessionState.IsSharded {
bucketName = strings.ToLower(sessionState.Conv.Audit.MigrationRequestId + "-" + details.Id)
} else {
bucketName = strings.ToLower(sessionState.Conv.Audit.MigrationRequestId)
}
err = sa.CreateGCSBucket(ctx, sc, storageaccessor.StorageBucketMetadata{
BucketName: bucketName,
ProjectID: sessionState.GCPProjectID,
Location: sessionState.Region,
Ttl: 0,
MatchesPrefix: nil,
})
if err != nil {
http.Error(w, fmt.Sprintf("Error while creating bucket: %v", err), http.StatusBadRequest)
return
}
}
setConnectionProfileFromSessionState(details.IsSource, *sessionState, req, databaseType)
op, err := dsClient.CreateConnectionProfile(ctx, req)
if err != nil {
http.Error(w, fmt.Sprintf("Error while creating connection profile: %v", err), http.StatusBadRequest)
return
}
_, err = op.Wait(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Error while creating connection profile: %v", err), http.StatusBadRequest)
return
}
}
func (profileHandler *ProfileAPIHandler) VerifyJsonConfiguration(w http.ResponseWriter, r *http.Request) {
reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
var srcConfig shardedDataflowConfig
err = json.Unmarshal(reqBody, &srcConfig)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
ctx := context.Background()
sessionState := session.GetSessionState()
sourceProfileConfig := srcConfig.MigrationProfile
sourceProfile := profiles.SourceProfile{Ty: profiles.SourceProfileTypeConfig, Config: sourceProfileConfig}
err = profileHandler.ValidateResources.ValidateResourceGeneration(ctx, sessionState.GCPProjectID, sessionState.SpannerInstanceID, sourceProfile, sessionState.Conv)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
}
func setConnectionProfileFromSessionState(isSource bool, sessionState session.SessionState, req *datastreampb.CreateConnectionProfileRequest, databaseType string) {
if isSource {
port, _ := strconv.ParseInt((sessionState.SourceDBConnDetails.Port), 10, 32)
if databaseType == constants.MYSQL {
req.ConnectionProfile.Profile = &datastreampb.ConnectionProfile_MysqlProfile{
MysqlProfile: &datastreampb.MysqlProfile{
Hostname: sessionState.SourceDBConnDetails.Host,
Port: int32(port),
Username: sessionState.SourceDBConnDetails.User,
Password: sessionState.SourceDBConnDetails.Password,
},
}
} else if databaseType == constants.ORACLE {
req.ConnectionProfile.Profile = &datastreampb.ConnectionProfile_OracleProfile{
OracleProfile: &datastreampb.OracleProfile{
Hostname: sessionState.SourceDBConnDetails.Host,
Port: int32(port),
Username: sessionState.SourceDBConnDetails.User,
Password: sessionState.SourceDBConnDetails.Password,
},
}
} else if databaseType == constants.POSTGRES {
req.ConnectionProfile.Profile = &datastreampb.ConnectionProfile_PostgresqlProfile{
PostgresqlProfile: &datastreampb.PostgresqlProfile{
Hostname: sessionState.SourceDBConnDetails.Host,
Port: int32(port),
Username: sessionState.SourceDBConnDetails.User,
Password: sessionState.SourceDBConnDetails.Password,
Database: sessionState.DbName,
},
}
}
} else {
req.ConnectionProfile.Profile = &datastreampb.ConnectionProfile_GcsProfile{
GcsProfile: &datastreampb.GcsProfile{
Bucket: strings.ToLower(sessionState.Conv.Audit.MigrationRequestId),
RootPath: "/",
},
}
}
}
// Cleanup streaming jobs API assumes defaults while performing cleanup.
// The underlying backend library exposes more hooks which can are not yet implemented on the UI, and are only available via the CLI.
func CleanUpStreamingJobs(w http.ResponseWriter, r *http.Request) {
ctx := context.Background()
sessionState := session.GetSessionState()
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()
jobCleanupOptions := streaming.JobCleanupOptions{
Datastream: true,
Dataflow: true,
Pubsub: true,
Monitoring: true,
}
streaming.InitiateJobCleanup(ctx, sessionState.Conv.Audit.MigrationRequestId, nil, jobCleanupOptions, sessionState.GCPProjectID, sessionState.SpannerProjectId, sessionState.SpannerInstanceID)
}
type connectionProfileReq struct {
Id string
ValidateOnly bool
IsSource bool
}
type connectionProfileReqV2 struct {
Id string
ValidateOnly bool
IsSource bool
Host string
Port string
Password string
User string
}
type connectionProfile struct {
Name string
DisplayName string
}