cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_update.go (365 lines of code) (raw):
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package translator
import (
"errors"
"fmt"
"slices"
"strconv"
"strings"
schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/schema-mapping"
cql "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/cqlparser"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities"
"github.com/antlr4-go/antlr/v4"
"github.com/datastax/go-cassandra-native-protocol/primitive"
)
// parseAssignments() processes a list of assignment elements from a CQL update statement,
// generating a structured response that includes the assignments' details and their corresponding
// placeholder values for parameterized queries.
//
// Parameters:
// - assignments: A slice of IAssignmentElementContext, each representing an assignment in the CQL query.
// - tableName: The name of the table being updated.
// - schemaMapping: A pointer to SchemaMappingConfig holding schema information for the table.
// - keyspace: The name of the keyspace containing the table.
//
// Returns:
// - A pointer to an UpdateSetResponse which contains structured information about the assignments,
// including updated values, parameter keys, and parameters map.
// - An error if invalid input is detected, such as an empty assignment list or issues with assignment syntax,
// or if a column type cannot be retrieved from the schema mapping table, or if an attempt is made to assign
// a value to a primary key.
func parseAssignments(assignments []cql.IAssignmentElementContext, tableName string, schemaMapping *schemaMapping.SchemaMappingConfig, keyspace string, prependColumn *[]string) (*UpdateSetResponse, error) {
if len(assignments) == 0 {
return nil, errors.New("invalid input")
}
var setResp []UpdateSetValue
var paramKeys []string
params := make(map[string]interface{})
for i, setVal := range assignments {
assignStr := setVal.GetText()
strSplit := strings.Split(assignStr, "=")
s := strconv.Itoa(i + 1)
placeholder := "set" + s
colObj := setVal.OBJECT_NAME(0)
if colObj == nil {
return nil, errors.New("error parsing column for assignments")
}
columnName := colObj.GetText()
if columnName == "" {
return nil, errors.New("no columnName found for assignments")
}
columnName = strings.ReplaceAll(columnName, literalPlaceholder, "")
var value string
if strings.Contains(assignStr, "=?") {
value = questionMark
} else {
valConst := setVal.Constant()
if valConst == nil {
value = strSplit[1]
if value == "" {
return nil, errors.New("error parsing value for assignments")
}
} else {
value = valConst.GetText()
}
}
value = strings.ReplaceAll(value, "'", "")
var val interface{} //encryted val
columnType, err := schemaMapping.GetColumnType(keyspace, tableName, columnName)
if err != nil {
return nil, fmt.Errorf("undefined column name %s in table %s.%s", columnName, keyspace, tableName)
}
if columnType.IsPrimaryKey {
return nil, fmt.Errorf("primary key not allowed to assignments")
}
if value != questionMark {
if columnType.IsCollection {
val = value
} else {
val, err = formatValues(value, columnType.CQLType, 4)
if err != nil {
return nil, err
}
}
params[placeholder] = val
}
paramKeys = append(paramKeys, placeholder)
if (strings.Contains(columnType.CQLType, "map") || strings.Contains(columnType.CQLType, "list")) && IsMapKey(strSplit[0]) {
_, extractkey := ExtractMapKey(assignStr)
if extractkey == "" {
return nil, fmt.Errorf("invalid format: missing map key")
}
newVal := fmt.Sprintf("%s+{%s:%s}", columnName, extractkey, value)
val = newVal
} else if strings.Contains(columnType.CQLType, "list") && strings.Contains(value, fmt.Sprintf("+%s", columnName)) {
listAssignment := strSplit[1]
valSplit := strings.Split(listAssignment, "+")
newVal := fmt.Sprintf("%s+%s", valSplit[1], valSplit[0])
val = newVal
*prependColumn = append(*prependColumn, columnName)
}
setResp = append(setResp, UpdateSetValue{
Column: columnName,
Value: "@" + placeholder,
Encrypted: val,
CQLType: columnType.CQLType,
})
}
return &UpdateSetResponse{
UpdateSetValues: setResp,
ParamKeys: paramKeys,
Params: params,
}, nil
}
// TranslateUpdateQuerytoBigtable() frames UpdateQueryMapping which translates the update to bigtable mutation
//
// Parameters:
// - query: CQL Update query
//
// Returns: UpdateQueryMapping struct and error if any
func (t *Translator) TranslateUpdateQuerytoBigtable(queryStr string, isPreparedQuery bool) (*UpdateQueryMapping, error) {
lowerQuery := strings.ToLower(queryStr)
query := renameLiterals(queryStr)
query, PrependColumns := TransformPrepareQueryForPrependList(query)
lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
p := cql.NewCqlParser(stream)
updateObj := p.Update()
if updateObj == nil {
return nil, errors.New("error parsing the update object")
}
kwUpdateObj := updateObj.KwUpdate()
if kwUpdateObj == nil {
return nil, errors.New("error parsing the update object")
}
keyspace := updateObj.Keyspace()
updateObj.DOT()
table := updateObj.Table()
if table == nil || keyspace == nil {
return nil, errors.New("invalid input paramaters found for table or keyspace")
}
keyspaceObj := keyspace.OBJECT_NAME()
if keyspaceObj == nil {
return nil, errors.New("invalid input paramaters found for keyspace")
}
keyspaceName := keyspaceObj.GetText()
if !t.SchemaMappingConfig.InstanceExists(keyspaceName) {
return nil, fmt.Errorf("keyspace %s does not exist", keyspaceName)
}
if !t.SchemaMappingConfig.TableExist(keyspaceName, table.GetText()) {
return nil, fmt.Errorf("table %s does not exist", table.GetText())
}
tableobj := table.OBJECT_NAME()
if tableobj == nil {
return nil, errors.New("invalid input paramaters found for table")
}
tableName := tableobj.GetText()
var err error
timestampInfo, err := GetTimestampInfoByUpdate(queryStr, updateObj)
if err != nil {
return nil, err
}
updateObj.KwSet()
assignmentObj := updateObj.Assignments()
allAssignmentObj := assignmentObj.AllAssignmentElement()
if allAssignmentObj == nil {
return nil, errors.New("error parsing all the assignment object")
}
setValues, err := parseAssignments(allAssignmentObj, tableName, t.SchemaMappingConfig, keyspaceName, &PrependColumns)
if err != nil {
return nil, err
}
var QueryClauses QueryClauses
if hasWhere(lowerQuery) {
resp, err := parseWhereByClause(updateObj.WhereSpec(), tableName, t.SchemaMappingConfig, keyspaceName)
if err != nil {
return nil, err
}
QueryClauses = *resp
for k, v := range QueryClauses.Params {
setValues.Params[k] = v
}
setValues.ParamKeys = append(setValues.ParamKeys, QueryClauses.ParamKeys...)
}
ifExistObj := updateObj.IfExist()
var ifExist bool = false
if ifExistObj != nil {
val := strings.ToLower(ifExistObj.GetText())
if val == ifExists {
ifExist = true
}
}
primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableName, keyspaceName)
var actualPrimaryKeys []string
for _, val := range QueryClauses.Clauses {
actualPrimaryKeys = append(actualPrimaryKeys, val.Column)
}
if !ValidateRequiredPrimaryKeys(primaryKeys, actualPrimaryKeys) {
missingPrime := findFirstMissingKey(primaryKeys, actualPrimaryKeys)
missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableName, keyspaceName, missingPrime)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingPrime)
}
if err != nil {
return nil, err
}
var primkeyvalues []string
var rowKey string
var values []interface{}
var columns []Column
for _, val := range setValues.UpdateSetValues {
values = append(values, val.Encrypted)
columns = append(columns, Column{Name: val.Column, ColumnFamily: t.SchemaMappingConfig.SystemColumnFamily, CQLType: val.CQLType})
}
var newValues []interface{} = values
var newColumns []Column = columns
var delColumns []Column
var delColumnFamily []string
var complexMeta map[string]*ComplexOperation
var rawOutput *ProcessRawCollectionsOutput // Declare rawOutput here
if !isPreparedQuery {
pkValues := make(map[string]interface{})
for _, key := range primaryKeys {
for _, clause := range QueryClauses.Clauses {
if !clause.IsPrimaryKey {
return nil, errors.New("any value other then primary key is not accepted in where clause")
}
if clause.IsPrimaryKey && clause.Operator == "=" && key == clause.Column {
pv := clause.Value
if strings.HasPrefix(clause.Value, "@") {
pv = clause.Value[1:]
}
pkValues[clause.Column] = QueryClauses.Params[pv]
primkeyvalues = append(primkeyvalues, fmt.Sprintf("%v", QueryClauses.Params[pv]))
}
}
}
if len(primkeyvalues) != len(primaryKeys) {
return nil, errors.New("missing primary key values in where clause")
}
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableName, keyspaceName, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, pkValues, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, err
}
rowKey = string(rowKeyBytes)
// Building new colum family, qualifier and values for collection type of data.
rawInput := ProcessRawCollectionsInput{
Columns: columns,
Values: values,
TableName: tableName,
Translator: t,
KeySpace: keyspaceName,
PrependColumns: PrependColumns,
}
rawOutput, err = processCollectionColumnsForRawQueries(rawInput)
if err != nil {
return nil, fmt.Errorf("error processing raw collection columns: %w", err)
}
newColumns = rawOutput.NewColumns
newValues = rawOutput.NewValues
delColumnFamily = rawOutput.DelColumnFamily
delColumns = rawOutput.DelColumns
complexMeta = rawOutput.ComplexMeta // Assign complexMeta from output
for _, val := range QueryClauses.Clauses {
var column Column
if columns, exists := t.SchemaMappingConfig.TablesMetaData[keyspaceName][tableName][val.Column]; exists {
column = Column{Name: columns.ColumnName, ColumnFamily: t.SchemaMappingConfig.SystemColumnFamily, CQLType: columns.ColumnType}
}
newColumns = append(newColumns, column)
pv := val.Value
if strings.HasPrefix(val.Value, "@") {
pv = val.Value[1:]
}
value := fmt.Sprintf("%v", QueryClauses.Params[pv])
encryVal, err := formatValues(value, column.CQLType, 4)
if err != nil {
return nil, err
}
newValues = append(newValues, encryVal)
}
} else {
complexMeta, err = t.ProcessComplexUpdate(columns, values, tableName, keyspaceName, PrependColumns)
if err != nil {
return nil, err
}
}
updateQueryData := &UpdateQueryMapping{
Query: query,
QueryType: UPDATE,
Table: tableName,
RowKey: rowKey,
Columns: newColumns,
Values: newValues,
DeleteColumnFamilies: delColumnFamily,
DeleteColumQualifires: delColumns,
IfExists: ifExist,
Keyspace: keyspaceName,
Clauses: QueryClauses.Clauses,
Params: setValues.Params,
ParamKeys: setValues.ParamKeys,
UpdateSetValues: setValues.UpdateSetValues,
PrimaryKeys: primaryKeys,
TimestampInfo: timestampInfo,
ComplexOperation: complexMeta,
}
return updateQueryData, nil
}
// BuildUpdatePrepareQuery() constructs an UpdateQueryMapping for an update operation, preparing the necessary
// components such as columns, values, row keys, and primary keys. It also processes any collection types
// and manages timestamp information for the update query.
//
// Parameters:
// - columnsResponse: A slice of Column structs representing metadata of columns involved in the update.
// - values: A slice of pointers to primitive.Value, representing the values for the update operation.
// - st: A pointer to an UpdateQueryMapping that contains existing query data and metadata.
// - protocolV: The Cassandra protocol version used for encoding and decoding operations.
//
// Returns:
// - A pointer to an UpdateQueryMapping populated with the new query components required for the update.
// - An error if any issues arise during processing, such as failure to fetch primary keys or errors
// handling column data or timestamp.
func (t *Translator) BuildUpdatePrepareQuery(columnsResponse []Column, values []*primitive.Value, st *UpdateQueryMapping, protocolV primitive.ProtocolVersion) (*UpdateQueryMapping, error) {
var newColumns []Column
var newValues []interface{}
var primaryKeys []string = st.PrimaryKeys
var err error
var unencrypted map[string]interface{}
var delColumnFamily []string
var delColumns []Column // Added missing declaration
var prepareOutput *ProcessPrepareCollectionsOutput // Declare prepareOutput
if len(primaryKeys) == 0 {
primaryKeys, err = getPrimaryKeys(t.SchemaMappingConfig, st.Table, st.Keyspace)
if err != nil {
fmt.Println("Not able to fetch primary keys:", err)
return nil, err
}
}
timestampInfo, values, err := ProcessTimestampByUpdate(st, values)
if err != nil {
return nil, err
}
prepareInput := ProcessPrepareCollectionsInput{
ColumnsResponse: columnsResponse,
Values: values,
TableName: st.Table,
ProtocolV: protocolV,
PrimaryKeys: primaryKeys,
Translator: t,
KeySpace: st.Keyspace,
ComplexMeta: st.ComplexOperation,
}
prepareOutput, err = processCollectionColumnsForPrepareQueries(prepareInput)
if err != nil {
return nil, err
}
newColumns = prepareOutput.NewColumns
newValues = prepareOutput.NewValues
unencrypted = prepareOutput.Unencrypted
indexEnd := prepareOutput.IndexEnd
delColumnFamily = prepareOutput.DelColumnFamily
delColumns = prepareOutput.DelColumns
for i, clause := range st.Clauses {
var column Column
if columns, exists := t.SchemaMappingConfig.TablesMetaData[st.Keyspace][st.Table][clause.Column]; exists {
column = Column{Name: columns.ColumnName}
}
if slices.Contains(primaryKeys, column.Name) {
val, _ := utilities.DecodeBytesToCassandraColumnType(values[i+indexEnd+1].Contents, st.VariableMetadata[i+indexEnd+1].Type, protocolV)
unencrypted[column.Name] = val
}
}
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(st.Table, st.Keyspace, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, unencrypted, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, err
}
rowKey := string(rowKeyBytes)
UpdateQueryData := &UpdateQueryMapping{
Query: st.Query,
QueryType: st.QueryType,
Keyspace: st.Keyspace,
Columns: newColumns,
Values: newValues,
PrimaryKeys: primaryKeys,
RowKey: rowKey,
Table: st.Table,
DeleteColumnFamilies: delColumnFamily,
DeleteColumQualifires: delColumns,
Clauses: st.Clauses,
TimestampInfo: timestampInfo,
IfExists: st.IfExists,
ComplexOperation: st.ComplexOperation,
}
return UpdateQueryData, nil
}