in webv2/profile/profile.go [115:231]
func CreateConnectionProfile(w http.ResponseWriter, r *http.Request) {
log.Println("request started", "method", r.Method, "path", r.URL.Path)
reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Println("request's body Read Error")
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
}
details := connectionProfileReqV2{}
err = json.Unmarshal(reqBody, &details)
if err != nil {
log.Println("request's Body parse error")
http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest)
return
}
ctx := context.Background()
dsClient, err := datastream.NewClient(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("datastream client can not be created: %v", err), http.StatusBadRequest)
}
defer dsClient.Close()
sessionState := session.GetSessionState()
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()
databaseType, err := helpers.GetSourceDatabaseFromDriver(sessionState.Driver)
if err != nil {
http.Error(w, fmt.Sprintf("Error while getting source database: %v", err), http.StatusBadRequest)
return
}
if sessionState.IsSharded {
if databaseType != constants.MYSQL {
http.Error(w, fmt.Sprintf("%v database type is not currently implemented for sharded migrations: %v", databaseType, err), http.StatusBadRequest)
return
}
dsClient, err := ds.NewDatastreamClientImpl(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
storageclient, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
resGenerator := conversion.NewResourceGenerationImpl(&datastream_accessor.DatastreamAccessorImpl{}, dsClient, &storageaccessor.StorageAccessorImpl{}, storageclient)
req := conversion.ConnectionProfileReq{
ConnectionProfile: conversion.ConnectionProfile{
ProjectId: sessionState.GCPProjectID,
Id: details.Id,
ValidateOnly: details.ValidateOnly,
IsSource: details.IsSource,
Host: details.Host,
Port: details.Port,
Password: details.Password,
User: details.User,
Region: sessionState.Region,
},
Ctx: ctx,
}
mutex := &sync.Mutex{}
result := resGenerator.PrepareMinimalDowntimeResources(&req, mutex)
if result.Err != nil {
http.Error(w, fmt.Sprintf("Resource generation failed: %v", err), http.StatusBadRequest)
return
}
return
}
req := &datastreampb.CreateConnectionProfileRequest{
Parent: fmt.Sprintf("projects/%s/locations/%s", sessionState.GCPProjectID, sessionState.Region),
ConnectionProfileId: details.Id,
ConnectionProfile: &datastreampb.ConnectionProfile{
DisplayName: details.Id,
Connectivity: &datastreampb.ConnectionProfile_StaticServiceIpConnectivity{},
},
ValidateOnly: details.ValidateOnly,
}
var bucketName string
sc, err := storageclient.NewStorageClientImpl(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Error while StorageClientImpl: %v", err), http.StatusBadRequest)
return
}
sa := storageaccessor.StorageAccessorImpl{}
if !details.IsSource {
if sessionState.IsSharded {
bucketName = strings.ToLower(sessionState.Conv.Audit.MigrationRequestId + "-" + details.Id)
} else {
bucketName = strings.ToLower(sessionState.Conv.Audit.MigrationRequestId)
}
err = sa.CreateGCSBucket(ctx, sc, storageaccessor.StorageBucketMetadata{
BucketName: bucketName,
ProjectID: sessionState.GCPProjectID,
Location: sessionState.Region,
Ttl: 0,
MatchesPrefix: nil,
})
if err != nil {
http.Error(w, fmt.Sprintf("Error while creating bucket: %v", err), http.StatusBadRequest)
return
}
}
setConnectionProfileFromSessionState(details.IsSource, *sessionState, req, databaseType)
op, err := dsClient.CreateConnectionProfile(ctx, req)
if err != nil {
http.Error(w, fmt.Sprintf("Error while creating connection profile: %v", err), http.StatusBadRequest)
return
}
_, err = op.Wait(ctx)
if err != nil {
http.Error(w, fmt.Sprintf("Error while creating connection profile: %v", err), http.StatusBadRequest)
return
}
}