cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_insert.go (303 lines of code) (raw):
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package translator
import (
"errors"
"fmt"
"strings"
schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/schema-mapping"
cql "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/cqlparser"
"github.com/antlr4-go/antlr/v4"
"github.com/datastax/go-cassandra-native-protocol/primitive"
)
// IsCollection() Function to determine if provided colunm if type of collection or not
// It returns true if given column is of collection type or else return false
func (t *Translator) IsCollection(keySpace, tableName, columnName string) bool {
if colType, err := t.SchemaMappingConfig.GetColumnType(keySpace, tableName, columnName); err == nil {
return colType.IsCollection
}
return false
}
// parseColumnsAndValuesFromInsert() parses columns and values from the Insert query
//
// Parameters:
// - input: Insert Column Spec Context from antlr parser.
// - tableName: Table Name
// - schemaMapping: JSON Config which maintains column and its datatypes info.
//
// Returns: ColumnsResponse struct and error if any
func parseColumnsAndValuesFromInsert(input cql.IInsertColumnSpecContext, tableName string, schemaMapping *schemaMapping.SchemaMappingConfig, keyspace string) (*ColumnsResponse, error) {
if input == nil {
return nil, errors.New("parseColumnsAndValuesFromInsert: No Input paramaters found for columns")
}
columnListObj := input.ColumnList()
if columnListObj == nil {
return nil, errors.New("parseColumnsAndValuesFromInsert: error while parsing columns")
}
columns := columnListObj.AllColumn()
if columns == nil {
return nil, errors.New("parseColumnsAndValuesFromInsert: error while parsing columns")
}
if len(columns) <= 0 {
return nil, errors.New("parseColumnsAndValuesFromInsert: No Columns found in the Insert Query")
}
var columnArr []Column
var paramKeys []string
var primaryColumns []string
for _, val := range columns {
columnName := val.GetText()
if columnName == "" {
return nil, errors.New("parseColumnsAndValuesFromInsert: No Columns found in the Insert Query")
}
columnName = strings.ReplaceAll(columnName, literalPlaceholder, "")
columnType, err := schemaMapping.GetColumnType(keyspace, tableName, columnName)
if err != nil {
return nil, fmt.Errorf("undefined column name %s in table %s.%s", columnName, keyspace, tableName)
}
isPrimaryKey := false
if columnType.IsPrimaryKey {
isPrimaryKey = true
}
column := Column{
Name: columnName,
ColumnFamily: schemaMapping.SystemColumnFamily,
CQLType: columnType.CQLType,
IsPrimaryKey: isPrimaryKey,
}
if columnType.IsPrimaryKey {
primaryColumns = append(primaryColumns, columnName)
}
columnArr = append(columnArr, column)
paramKeys = append(paramKeys, columnName)
}
response := &ColumnsResponse{
Columns: columnArr,
ParamKeys: paramKeys,
PrimayColumns: primaryColumns,
}
return response, nil
}
// setParamsFromValues() parses Values from the Insert Query
//
// Parameters:
// - input: Insert Valye Spec Context from antlr parser.
// - columns: Array of Column Names
//
// Returns: Map Interface for param name as key and its value and error if any
func setParamsFromValues(input cql.IInsertValuesSpecContext, columns []Column, schemaMapping *schemaMapping.SchemaMappingConfig, protocolV primitive.ProtocolVersion, tableName string, keySpace string, isPreparedQuery bool) (map[string]interface{}, []interface{}, map[string]interface{}, error) {
if input != nil {
valuesExpressionList := input.ExpressionList()
if valuesExpressionList == nil {
return nil, nil, nil, errors.New("setParamsFromValues: error while parsing values")
}
values := valuesExpressionList.AllExpression()
if values == nil {
return nil, nil, nil, errors.New("setParamsFromValues: error while parsing values")
}
var valuesArr []string
var respValue []interface{}
for _, val := range values {
valueName := val.GetText()
if valueName != "" {
valuesArr = append(valuesArr, valueName)
} else {
return nil, nil, nil, errors.New("setParamsFromValues: Invalid Value paramaters")
}
}
response := make(map[string]interface{})
unencrypted := make(map[string]interface{})
for i, col := range columns {
value := strings.ReplaceAll(valuesArr[i], "'", "")
colName := col.Name
if !isPreparedQuery {
var val interface{}
var unenVal interface{}
var err error
columnType, er := schemaMapping.GetColumnType(keySpace, tableName, col.Name)
if er != nil {
return nil, nil, nil, er
}
if columnType.IsCollection {
val = value
unenVal = value
} else {
unenVal = value
val, err = formatValues(value, col.CQLType, protocolV)
if err != nil {
return nil, nil, nil, err
}
}
response[colName] = val
unencrypted[colName] = unenVal
respValue = append(respValue, val)
}
}
return response, respValue, unencrypted, nil
}
return nil, nil, nil, errors.New("setParamsFromValues: No Value paramaters found")
}
// TranslateInsertQuerytoBigtable() parses Values from the Insert Query
//
// Parameters:
// - queryStr: Read the query, parse its columns, values, table name, type of query and keyspaces etc.
// - protocolV: Array of Column Names
//
// Returns: InsertQueryMapping, build the InsertQueryMapping and return it with nil value of error. In case of error
// InsertQueryMapping will return as nil and error will contains the error object
func (t *Translator) TranslateInsertQuerytoBigtable(queryStr string, protocolV primitive.ProtocolVersion, isPreparedQuery bool) (*InsertQueryMapping, error) {
query := renameLiterals(queryStr)
lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
p := cql.NewCqlParser(stream)
insertObj := p.Insert()
if insertObj == nil {
return nil, errors.New("could not parse insert object")
}
kwInsertObj := insertObj.KwInsert()
if kwInsertObj == nil {
return nil, errors.New("could not parse insert object")
}
insertObj.KwInto()
keyspace := insertObj.Keyspace()
table := insertObj.Table()
var columnsResponse ColumnsResponse
if keyspace == nil {
return nil, errors.New("invalid input paramaters found for keyspace")
}
keyspaceObj := keyspace.OBJECT_NAME()
if keyspaceObj == nil {
return nil, errors.New("invalid input paramaters found for keyspace")
}
if table == nil {
return nil, errors.New("invalid input paramaters found for table")
}
tableobj := table.OBJECT_NAME()
if tableobj == nil {
return nil, errors.New("invalid input paramaters found for table")
}
tableName := tableobj.GetText()
keyspaceName := keyspaceObj.GetText()
if keyspaceName == "" {
return nil, errors.New("keyspace is null")
}
if !t.SchemaMappingConfig.InstanceExists(keyspaceName) {
return nil, fmt.Errorf("keyspace %s does not exist", keyspaceName)
}
if !t.SchemaMappingConfig.TableExist(keyspaceName, tableName) {
return nil, fmt.Errorf("table %s does not exist", tableName)
}
ifNotExistObj := insertObj.IfNotExist()
var ifNotExists bool = false
if ifNotExistObj != nil {
val := strings.ToLower(ifNotExistObj.GetText())
if val == "ifnotexists" {
ifNotExists = true
}
}
resp, err := parseColumnsAndValuesFromInsert(insertObj.InsertColumnSpec(), tableName, t.SchemaMappingConfig, keyspaceName)
if err != nil {
return nil, err
}
if resp != nil {
columnsResponse = *resp
}
params, values, unencrypted, err := setParamsFromValues(insertObj.InsertValuesSpec(), columnsResponse.Columns, t.SchemaMappingConfig, protocolV, tableName, keyspaceName, isPreparedQuery)
if err != nil {
return nil, err
}
timestampInfo, err := GetTimestampInfo(queryStr, insertObj, int32(len(columnsResponse.Columns)))
if err != nil {
return nil, err
}
var delColumnFamily []string
var rowKey string
var newValues []interface{} = values
var newColumns []Column = columnsResponse.Columns
var rawOutput *ProcessRawCollectionsOutput // Declare rawOutput
primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableName, keyspaceName)
if err != nil {
return nil, err
}
if !ValidateRequiredPrimaryKeys(primaryKeys, columnsResponse.PrimayColumns) {
missingKey := findFirstMissingKey(primaryKeys, columnsResponse.PrimayColumns)
missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableName, keyspaceName, missingKey)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingKey)
}
if !isPreparedQuery {
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableName, keyspaceName, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, unencrypted, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, err
}
rowKey = string(rowKeyBytes)
// Building new colum family, qualifier and values for collection type of data.
rawInput := ProcessRawCollectionsInput{
Columns: columnsResponse.Columns,
Values: values,
TableName: tableName,
Translator: t,
KeySpace: keyspaceName,
}
rawOutput, err = processCollectionColumnsForRawQueries(rawInput)
if err != nil {
return nil, fmt.Errorf("error processing raw collection columns: %w", err)
}
newColumns = rawOutput.NewColumns
newValues = rawOutput.NewValues
delColumnFamily = rawOutput.DelColumnFamily
}
insertQueryData := &InsertQueryMapping{
Query: query,
QueryType: INSERT,
Table: tableName,
Keyspace: keyspaceName,
Columns: newColumns,
Values: newValues,
Params: params,
ParamKeys: columnsResponse.ParamKeys,
PrimaryKeys: resp.PrimayColumns,
RowKey: rowKey,
DeleteColumnFamilies: delColumnFamily,
TimestampInfo: timestampInfo,
IfNotExists: ifNotExists,
}
return insertQueryData, nil
}
// BuildInsertPrepareQuery() reads the insert query from cache, forms the columns and its value for collection type of data.
//
// Parameters:
// - columnsResponse: List of all the columns
// - values: Array of values for all columns
// - tableName: tablename on which operation has to be performed
// - protocolV: cassandra protocol version
//
// Returns: InsertQueryMapping, build the InsertQueryMapping and return it with nil value of error. In case of error
// InsertQueryMapping will return as nil and error will contains the error object
func (t *Translator) BuildInsertPrepareQuery(columnsResponse []Column, values []*primitive.Value, st *InsertQueryMapping, protocolV primitive.ProtocolVersion) (*InsertQueryMapping, error) {
var newColumns []Column
var newValues []interface{}
var primaryKeys []string = st.PrimaryKeys
var delColumnFamily []string
var err error
var unencrypted map[string]interface{}
var prepareOutput *ProcessPrepareCollectionsOutput // Declare prepareOutput
if len(primaryKeys) == 0 {
primaryKeys, _ = getPrimaryKeys(t.SchemaMappingConfig, st.Table, st.Keyspace)
}
timestampInfo, err := ProcessTimestamp(st, values)
if err != nil {
return nil, err
}
// Building new colum family, qualifier and values for collection type of data.
prepareInput := ProcessPrepareCollectionsInput{
ColumnsResponse: columnsResponse,
Values: values,
TableName: st.Table,
ProtocolV: protocolV,
PrimaryKeys: primaryKeys,
Translator: t,
KeySpace: st.Keyspace,
ComplexMeta: nil, // Assuming nil for insert
}
prepareOutput, err = processCollectionColumnsForPrepareQueries(prepareInput)
if err != nil {
fmt.Println("Error processing prepared collection columns:", err)
return nil, err
}
newColumns = prepareOutput.NewColumns
newValues = prepareOutput.NewValues
unencrypted = prepareOutput.Unencrypted
delColumnFamily = prepareOutput.DelColumnFamily
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(st.Table, st.Keyspace, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, unencrypted, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, err
}
rowKey := string(rowKeyBytes)
insertQueryData := &InsertQueryMapping{
Columns: newColumns,
Values: newValues,
PrimaryKeys: primaryKeys,
RowKey: rowKey,
DeleteColumnFamilies: delColumnFamily,
TimestampInfo: timestampInfo,
Query: st.Query,
QueryType: st.QueryType,
Table: st.Table,
Keyspace: st.Keyspace,
Params: st.Params,
ParamKeys: st.PrimaryKeys,
IfNotExists: st.IfNotExists,
}
return insertQueryData, nil
}