webv2/api/rules.go (361 lines of code) (raw):
package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/spanner-migration-tool/common/constants"
"github.com/GoogleCloudPlatform/spanner-migration-tool/internal"
"github.com/GoogleCloudPlatform/spanner-migration-tool/sources/common"
"github.com/GoogleCloudPlatform/spanner-migration-tool/spanner/ddl"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/index"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/primarykey"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/session"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/table"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/types"
"github.com/GoogleCloudPlatform/spanner-migration-tool/webv2/utilities"
)
// ApplyRule allows to add rules that changes the schema
// currently it supports two types of operations viz. SetGlobalDataType and AddIndex
func ApplyRule(w http.ResponseWriter, r *http.Request) {
reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, fmt.Sprintf("Body Read Error : %v", err), http.StatusInternalServerError)
return
}
var rule internal.Rule
err = json.Unmarshal(reqBody, &rule)
if err != nil {
http.Error(w, fmt.Sprintf("Request Body parse error : %v", err), http.StatusBadRequest)
return
}
sessionState := session.GetSessionState()
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()
if rule.Type == constants.GlobalDataTypeChange {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
typeMap := map[string]string{}
err = json.Unmarshal(d, &typeMap)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
setGlobalDataType(typeMap)
} else if rule.Type == constants.AddIndex {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
newIdx := ddl.CreateIndex{}
err = json.Unmarshal(d, &newIdx)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
addedIndex, err := addIndex(newIdx)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rule.Data = addedIndex
} else if rule.Type == constants.EditColumnMaxLength {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
var colMaxLength types.ColMaxLength
err = json.Unmarshal(d, &colMaxLength)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
setSpColMaxLength(colMaxLength, rule.AssociatedObjects)
} else if rule.Type == constants.AddShardIdPrimaryKey {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
var shardIdPrimaryKey types.ShardIdPrimaryKey
err = json.Unmarshal(d, &shardIdPrimaryKey)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
tableName := checkInterleaving()
if tableName != "" {
http.Error(w, fmt.Sprintf("Rule cannot be added because some tables, eg: %v are interleaved. Please remove interleaving and try again.", tableName), http.StatusBadRequest)
return
}
setShardIdColumnAsPrimaryKey(shardIdPrimaryKey.AddedAtTheStart)
addShardIdColumnToForeignKeys(shardIdPrimaryKey.AddedAtTheStart)
} else {
http.Error(w, "Invalid rule type", http.StatusInternalServerError)
return
}
ruleId := internal.GenerateRuleId()
rule.Id = ruleId
sessionState.Conv.Rules = append(sessionState.Conv.Rules, rule)
session.UpdateSessionFile()
convm := session.ConvWithMetadata{
SessionMetadata: sessionState.SessionMetadata,
Conv: *sessionState.Conv,
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(convm)
}
func DropRule(w http.ResponseWriter, r *http.Request) {
ruleId := r.FormValue("id")
if ruleId == "" {
http.Error(w, fmt.Sprint("Rule id is empty"), http.StatusBadRequest)
return
}
sessionState := session.GetSessionState()
sessionState.Conv.ConvLock.Lock()
defer sessionState.Conv.ConvLock.Unlock()
conv := sessionState.Conv
var rule internal.Rule
position := -1
for i, r := range conv.Rules {
if r.Id == ruleId {
rule = r
position = i
break
}
}
if position == -1 {
http.Error(w, fmt.Sprint("Rule to be deleted not found"), http.StatusBadRequest)
return
}
if rule.Type == constants.AddIndex {
if rule.Enabled {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
var index ddl.CreateIndex
err = json.Unmarshal(d, &index)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
tableId := index.TableId
indexId := index.Id
err = dropSecondaryIndexHelper(tableId, indexId)
if err != nil {
http.Error(w, fmt.Sprintf("%v", err), http.StatusBadRequest)
return
}
}
} else if rule.Type == constants.GlobalDataTypeChange {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
typeMap := map[string]string{}
err = json.Unmarshal(d, &typeMap)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
revertGlobalDataType(typeMap)
} else if rule.Type == constants.EditColumnMaxLength {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
var colMaxLength types.ColMaxLength
err = json.Unmarshal(d, &colMaxLength)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
revertSpColMaxLength(colMaxLength, rule.AssociatedObjects)
} else if rule.Type == constants.AddShardIdPrimaryKey {
d, err := json.Marshal(rule.Data)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
var shardIdPrimaryKey types.ShardIdPrimaryKey
err = json.Unmarshal(d, &shardIdPrimaryKey)
if err != nil {
http.Error(w, "Invalid rule data", http.StatusInternalServerError)
return
}
tableName := checkInterleaving()
if tableName != "" {
http.Error(w, fmt.Sprintf("Rule cannot be deleted because some tables, eg: %v are interleaved. Please remove interleaving and try again.", tableName), http.StatusBadRequest)
return
}
revertShardIdColumnAsPrimaryKey(shardIdPrimaryKey.AddedAtTheStart)
removeShardIdColumnFromForeignKeys(shardIdPrimaryKey.AddedAtTheStart)
} else {
http.Error(w, "Invalid rule type", http.StatusInternalServerError)
return
}
sessionState.Conv.Rules = append(conv.Rules[:position], conv.Rules[position+1:]...)
if len(sessionState.Conv.Rules) == 0 {
sessionState.Conv.Rules = nil
}
session.UpdateSessionFile()
convm := session.ConvWithMetadata{
SessionMetadata: sessionState.SessionMetadata,
Conv: *sessionState.Conv,
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(convm)
}
// setGlobalDataType allows to change Spanner type globally.
// It takes a map from source type to Spanner type and updates
// the Spanner schema accordingly.
func setGlobalDataType(typeMap map[string]string) {
sessionState := session.GetSessionState()
// Redo source-to-Spanner typeMap using t (the mapping specified in the http request).
// We drive this process by iterating over the Spanner schema because we want to preserve all
// other customizations that have been performed via the UI (dropping columns, renaming columns
// etc). In particular, note that we can't just blindly redo schema conversion (using an appropriate
// version of 'toDDL' with the new typeMap).
for tableId, spSchema := range sessionState.Conv.SpSchema {
for colId := range spSchema.ColDefs {
srcColDef := sessionState.Conv.SrcSchema[tableId].ColDefs[colId]
// If the srcCol's type is in the map, then recalculate the Spanner type
// for this column using the map. Otherwise, leave the ColDef for this
// column as is. Note that per-column type overrides could be lost in
// this process -- the mapping in typeMap always takes precendence.
if _, found := typeMap[srcColDef.Type.Name]; found {
utilities.UpdateDataType(sessionState.Conv, typeMap[srcColDef.Type.Name], tableId, colId)
}
}
common.ComputeNonKeyColumnSize(sessionState.Conv, tableId)
}
}
// addIndex checks the new name for spanner name validity, ensures the new name is already not used by existing tables
// secondary indexes or foreign key constraints. If above checks passed then new indexes are added to the schema else appropriate
// error thrown.
func addIndex(newIndex ddl.CreateIndex) (ddl.CreateIndex, error) {
// Check new name for spanner name validity.
newNames := []string{}
newNames = append(newNames, newIndex.Name)
if ok, invalidNames := utilities.CheckSpannerNamesValidity(newNames); !ok {
return ddl.CreateIndex{}, fmt.Errorf("following names are not valid Spanner identifiers: %s", strings.Join(invalidNames, ","))
}
// Check that the new names are not already used by existing tables, secondary indexes or foreign key constraints.
if ok, err := utilities.CanRename(newNames, newIndex.TableId); !ok {
return ddl.CreateIndex{}, err
}
sessionState := session.GetSessionState()
sp := sessionState.Conv.SpSchema[newIndex.TableId]
newIndexes := []ddl.CreateIndex{newIndex}
index.CheckIndexSuggestion(newIndexes, sp)
for i := 0; i < len(newIndexes); i++ {
newIndexes[i].Id = internal.GenerateIndexesId()
}
sessionState.Conv.UsedNames[strings.ToLower(newIndex.Name)] = true
sp.Indexes = append(sp.Indexes, newIndexes...)
sessionState.Conv.SpSchema[newIndex.TableId] = sp
return newIndexes[0], nil
}
func setSpColMaxLength(spColMaxLength types.ColMaxLength, associatedObjects string) {
sessionState := session.GetSessionState()
if associatedObjects == "All table" {
for tId := range sessionState.Conv.SpSchema {
for _, colDef := range sessionState.Conv.SpSchema[tId].ColDefs {
if colDef.T.Name == spColMaxLength.SpDataType {
spColDef := colDef
if spColDef.T.Len == ddl.MaxLength {
spColDef.T.Len, _ = strconv.ParseInt(spColMaxLength.SpColMaxLength, 10, 64)
}
sessionState.Conv.SpSchema[tId].ColDefs[colDef.Id] = spColDef
}
}
common.ComputeNonKeyColumnSize(sessionState.Conv, tId)
}
} else {
for _, colDef := range sessionState.Conv.SpSchema[associatedObjects].ColDefs {
if colDef.T.Name == spColMaxLength.SpDataType {
spColDef := colDef
if spColDef.T.Len == ddl.MaxLength {
table.UpdateColumnSize(spColMaxLength.SpColMaxLength, associatedObjects, colDef.Id, sessionState.Conv)
}
}
}
common.ComputeNonKeyColumnSize(sessionState.Conv, associatedObjects)
}
}
func revertSpColMaxLength(spColMaxLength types.ColMaxLength, associatedObjects string) {
sessionState := session.GetSessionState()
spColLen, _ := strconv.ParseInt(spColMaxLength.SpColMaxLength, 10, 64)
if associatedObjects == "All tables" {
for tId := range sessionState.Conv.SpSchema {
for colId, colDef := range sessionState.Conv.SpSchema[tId].ColDefs {
if colDef.T.Name == spColMaxLength.SpDataType {
utilities.UpdateMaxColumnLen(sessionState.Conv, spColMaxLength.SpDataType, tId, colId, spColLen)
}
}
common.ComputeNonKeyColumnSize(sessionState.Conv, tId)
}
} else {
for colId, colDef := range sessionState.Conv.SpSchema[associatedObjects].ColDefs {
if colDef.T.Name == spColMaxLength.SpDataType {
utilities.UpdateMaxColumnLen(sessionState.Conv, spColMaxLength.SpDataType, associatedObjects, colId, spColLen)
}
}
common.ComputeNonKeyColumnSize(sessionState.Conv, associatedObjects)
}
}
// revertGlobalDataType revert back the spanner type to default
// when the rule that is used to apply the data-type change is deleted.
// It takes a map from source type to Spanner type and updates
// the Spanner schema accordingly.
func revertGlobalDataType(typeMap map[string]string) {
sessionState := session.GetSessionState()
for tableId, spSchema := range sessionState.Conv.SpSchema {
for colId, colDef := range spSchema.ColDefs {
srcColDef, found := sessionState.Conv.SrcSchema[tableId].ColDefs[colId]
if !found {
continue
}
spType, found := typeMap[srcColDef.Type.Name]
if !found {
continue
}
if colDef.T.Name == spType {
utilities.UpdateDataType(sessionState.Conv, "", tableId, colId)
}
}
common.ComputeNonKeyColumnSize(sessionState.Conv, tableId)
}
}
func removeShardIdColumnFromForeignKeys(isAddedAtFirst bool) {
sessionState := session.GetSessionState()
for tableId, table := range sessionState.Conv.SpSchema {
for i, fk := range table.ForeignKeys {
if isAddedAtFirst {
fk.ColIds = fk.ColIds[1:]
fk.ReferColumnIds = fk.ReferColumnIds[1:]
} else {
fk.ColIds = fk.ColIds[:len(fk.ColIds)-1]
fk.ReferColumnIds = fk.ReferColumnIds[:len(fk.ReferColumnIds)-1]
}
sessionState.Conv.SpSchema[tableId].ForeignKeys[i] = fk
}
}
}
func revertShardIdColumnAsPrimaryKey(isAddedAtFirst bool) {
sessionState := session.GetSessionState()
for _, table := range sessionState.Conv.SpSchema {
pkRequest := primarykey.PrimaryKeyRequest{
TableId: table.Id,
Columns: []ddl.IndexKey{},
}
for index := range table.PrimaryKeys {
pk := table.PrimaryKeys[index]
if pk.ColId != table.ShardIdColumn {
decrement := 0
if isAddedAtFirst {
decrement = 1
}
pkRequest.Columns = append(pkRequest.Columns, ddl.IndexKey{ColId: pk.ColId, Order: pk.Order - decrement, Desc: pk.Desc})
}
}
primarykey.UpdatePrimaryKey(pkRequest)
}
}
func checkInterleaving() string {
sessionState := session.GetSessionState()
for _, spSchema := range sessionState.Conv.SpSchema {
if spSchema.ParentTable.Id != "" {
return spSchema.Name
}
}
return ""
}