webv2/session/session_handler.go (199 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package session
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"time"
"cloud.google.com/go/spanner"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
helpers "github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/helpers"
"github.com/google/uuid"
"github.com/gorilla/mux"
)
// session contains the metadata for a session file.
// A session file is a snapshot of an ongoing Spanner migration tool conversion session,
// and consists of an internal.Conv struct in JSON format.
type SessionParams struct {
Driver string `json:"driver"`
FilePath string `json:"filePath"`
DBName string `json:"dbName"`
CreatedAt string `json:"createdAt"`
}
func IsOfflineSession(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(GetSessionState().IsOffline)
}
func GetSessions(w http.ResponseWriter, r *http.Request) {
var sessions []SchemaConversionSession
var err error
if GetSessionState().IsOffline {
sessions, err = getLocalSessions()
} else {
sessions, err = getRemoteSessions()
}
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(sessions)
}
func GetConv(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
vid, ok := vars["versionId"]
if !ok {
http.Error(w, "VersionId not supplied", http.StatusBadRequest)
return
}
var convm ConvWithMetadata
var err error
if GetSessionState().IsOffline {
convm, err = getLocalConv(vid)
} else {
convm, err = getRemoteConv(vid)
}
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(convm)
}
func ResumeSession(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
vid, ok := vars["versionId"]
if !ok {
http.Error(w, "VersionId not supplied", http.StatusBadRequest)
return
}
var convm ConvWithMetadata
var err error
if GetSessionState().IsOffline {
convm, err = getLocalConv(vid)
} else {
convm, err = getRemoteConv(vid)
}
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusInternalServerError)
return
}
sessionState := GetSessionState()
sessionState.Conv = &convm.Conv
sessionState.Driver = convm.DatabaseType
sessionState.DbName = convm.DatabaseName
sessionState.SourceDBConnDetails = SourceDBConnDetails{
ConnectionType: helpers.SESSION_FILE_MODE,
}
sessionState.Conv.UsedNames = internal.ComputeUsedNames(sessionState.Conv)
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(convm)
}
func SaveRemoteSession(w http.ResponseWriter, r *http.Request) {
reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
var sm SessionMetadata
err = json.Unmarshal(reqBody, &sm)
if err != nil {
http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest)
return
}
ctx := context.Background()
spannerClient, err := spanner.NewClient(ctx, getMetadataDbUri())
if err != nil {
http.Error(w, fmt.Sprintf("Spanner Client error : %v", err), http.StatusInternalServerError)
return
}
defer spannerClient.Close()
sessionState := GetSessionState()
ssvc := NewSessionService(ctx, NewRemoteSessionStore(spannerClient))
conv, err := json.Marshal(sessionState.Conv)
if err != nil {
http.Error(w, fmt.Sprintf("Conv object error : %v", err), http.StatusInternalServerError)
return
}
// TODO: To compute few metadata fields if empty
t := time.Now()
switch sessionState.Driver {
case constants.MYSQLDUMP:
sm.DatabaseType = constants.MYSQL
case constants.PGDUMP:
sm.DatabaseType = constants.POSTGRES
default:
sm.DatabaseType = sessionState.Driver
}
sm.Dialect = helpers.GetDialectDisplayStringFromDialect(sessionState.Dialect)
scs := SchemaConversionSession{
VersionId: uuid.New().String(),
PreviousVersionId: []string{},
SchemaConversionObject: string(conv),
CreateTimestamp: t,
SessionMetadata: sm,
}
err = ssvc.SaveSession(scs)
if err != nil {
http.Error(w, fmt.Sprintf("Spanner Transaction error : %v", err), http.StatusInternalServerError)
return
}
sessionMetaData := GetSessionState().SessionMetadata
sessionMetaData.DatabaseName = sm.DatabaseName
sessionMetaData.DatabaseType = sm.DatabaseType
sessionMetaData.SessionName = sm.SessionName
sessionMetaData.Dialect = sm.Dialect
GetSessionState().SessionMetadata = sessionMetaData
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode("Save successful, VersionId : " + scs.VersionId)
}
//Helpers
func getRemoteSessions() ([]SchemaConversionSession, error) {
ctx := context.Background()
spannerClient, err := spanner.NewClient(ctx, getMetadataDbUri())
if err != nil {
return nil, fmt.Errorf("Spanner Client error : %v", err)
}
defer spannerClient.Close()
svc := NewSessionService(ctx, NewRemoteSessionStore(spannerClient))
result, err := svc.GetSessionsMetadata()
if err != nil {
return nil, fmt.Errorf("Spanner Transaction error : %v", err)
}
return result, nil
}
func getLocalSessions() ([]SchemaConversionSession, error) {
svc := NewSessionService(context.Background(), NewLocalSessionStore())
result, err := svc.GetSessionsMetadata()
if err != nil {
return nil, fmt.Errorf("Local session store error : %v", err)
}
return result, nil
}
func getRemoteConv(versionId string) (ConvWithMetadata, error) {
var convm ConvWithMetadata
ctx := context.Background()
spannerClient, err := spanner.NewClient(ctx, getMetadataDbUri())
if err != nil {
return convm, err
}
defer spannerClient.Close()
ssvc := NewSessionService(ctx, NewRemoteSessionStore(spannerClient))
convm, err = ssvc.GetConvWithMetadata(versionId)
if err != nil {
return convm, err
}
return convm, nil
}
func getLocalConv(versionId string) (ConvWithMetadata, error) {
svc := NewSessionService(context.Background(), NewLocalSessionStore())
result, err := svc.GetConvWithMetadata(versionId)
if err != nil {
return result, fmt.Errorf("Local session store error : %v", err)
}
return result, nil
}
func getMetadataDbUri() string {
sessionState := GetSessionState()
if sessionState.SpannerProjectId == "" || sessionState.SpannerInstanceID == "" {
return ""
}
return helpers.GetSpannerUri(sessionState.SpannerProjectId, sessionState.SpannerInstanceID)
}