cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_delete.go (328 lines of code) (raw):
/*
* Copyright (C) 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package translator
import (
"errors"
"fmt"
"strconv"
"strings"
schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/schema-mapping"
cql "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/cqlparser"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities"
"github.com/antlr4-go/antlr/v4"
"github.com/datastax/go-cassandra-native-protocol/message"
"github.com/datastax/go-cassandra-native-protocol/primitive"
)
// parseTableFromDelete() extracts the table and keyspace information from the input context of a DELETE query.
//
// Parameters:
// - input: A context interface representing the FROM specification of a CQL DELETE query.
//
// Returns:
// - A pointer to a TableObj containing the extracted table and keyspace names.
// - An error if the input is nil, or if there are issues in parsing the table or keyspace names.
func parseTableFromDelete(input cql.IFromSpecContext) (*TableObj, error) {
if input == nil {
return nil, errors.New("no input parameters found for table and keyspace")
}
fromSpec, err := getFromSpecElement(input)
if err != nil {
return nil, err
}
allObj, err := getAllObjectNames(fromSpec)
if err != nil {
return nil, err
}
tableObj, keyspaceObj, err := getTableAndKeyspaceObjects(allObj)
if err != nil {
return nil, err
}
response := TableObj{
TableName: tableObj.GetText(),
KeyspaceName: keyspaceObj.GetText(),
}
return &response, nil
}
// getFromSpecElement() retrieves the first FROM specification element from the given context.
//
// Parameters:
// - input: A context interface of a FROM specification in a CQL query.
//
// Returns:
// - An element context containing FROM specification details.
// - An error if the element context is not present.
func getFromSpecElement(input cql.IFromSpecContext) (cql.IFromSpecElementContext, error) {
fromSpec := input.FromSpecElement()
if fromSpec == nil {
return nil, errors.New("error while parsing fromSpec")
}
return fromSpec, nil
}
// getAllObjectNames() retrieves all object names (such as tables and keyspaces) from the FROM specification element.
//
// Parameters:
// - fromSpec: A context interface representing an element of a FROM specification in a CQL query.
//
// Returns:
// - A slice of TerminalNode containing all OBJECT_NAME entries.
// - An error if no object names are found or if parsing fails.
func getAllObjectNames(fromSpec cql.IFromSpecElementContext) ([]antlr.TerminalNode, error) {
allObj := fromSpec.AllOBJECT_NAME()
if allObj == nil {
return nil, errors.New("error while parsing all objects from the fromSpec")
}
if len(allObj) == 0 {
return nil, errors.New("could not find table and keyspace name")
}
return allObj, nil
}
// getTableAndKeyspaceObjects() extracts the table and keyspace tokens from a list of object names.
//
// Parameters:
// - allObj: A slice of TerminalNode representing object names parsed from a FROM specification.
//
// Returns:
// - tableObj: The TerminalNode representing the table name.
// - keyspaceObj: The TerminalNode representing the keyspace name.
// - An error if the expected two tokens (table and keyspace) are not found.
func getTableAndKeyspaceObjects(allObj []antlr.TerminalNode) (tableObj, keyspaceObj antlr.TerminalNode, err error) {
if len(allObj) == 2 {
return allObj[1], allObj[0], nil
}
return nil, nil, errors.New("could not find table or keyspace name")
}
// parseClauseFromDelete() parse Clauses from the Delete Query
//
// Parameters:
// - input: The Where Spec context from the antlr Parser.
// - tableName - Table Name
// - schemaMapping - JSON Config which maintains column and its datatypes info.
//
// Returns: QueryClauses and an error if any.
func parseClauseFromDelete(input cql.IWhereSpecContext, tableName string, schemaMapping *schemaMapping.SchemaMappingConfig, keyspace string) (*QueryClauses, error) {
if input == nil {
return nil, errors.New("no input parameters found for clauses")
}
elements, err := getRelationElements(input)
if err != nil {
return nil, err
}
if len(elements) == 0 {
return &QueryClauses{}, nil
}
clauses, params, paramKeys, err := processElements(elements, tableName, schemaMapping, keyspace)
if err != nil {
return nil, err
}
return &QueryClauses{
Clauses: clauses,
Params: params,
ParamKeys: paramKeys,
}, nil
}
// getRelationElements() retrieves all relation elements from the WHERE clause of a CQL query.
//
// Parameters:
// - input: A context interface representing the WHERE specification of a CQL query.
//
// Returns:
// - A slice of IRelationElementContext, each representing a clause in the WHERE condition.
// - An error if no relation elements are found in the input.
func getRelationElements(input cql.IWhereSpecContext) ([]cql.IRelationElementContext, error) {
elements := input.RelationElements().AllRelationElement()
if elements == nil {
return nil, errors.New("no input parameters found for clauses")
}
return elements, nil
}
// processElements() processes a list of relation elements and generates corresponding clauses,
// parameters, and parameter keys. It retrieves column types to handle values appropriately.
//
// Parameters:
// - elements: A slice of IRelationElementContext representing WHERE clause elements.
// - tableName: The name of the table involved in the query.
// - schemaMapping: A pointer to SchemaMappingConfig for retrieving schema information.
// - keyspace: The name of the keyspace containing the table.
//
// Returns:
// - A slice of Clause structs each representing a WHERE condition.
// - A map of parameters to use for prepared statements.
// - A slice of strings representing parameter keys.
// - An error if parsing column names, values, or column types fails.
func processElements(elements []cql.IRelationElementContext, tableName string, schemaMapping *schemaMapping.SchemaMappingConfig, keyspace string) ([]Clause, map[string]interface{}, []string, error) {
var clauses []Clause
params := make(map[string]interface{})
var paramKeys []string
for i, val := range elements {
if val == nil {
return nil, nil, nil, errors.New("could not parse column object")
}
placeholder := "value" + strconv.Itoa(i+1)
paramKeys = append(paramKeys, placeholder)
colName, operator, err := parseColumnAndOperator(val)
if err != nil {
return nil, nil, nil, err
}
columnType, err := schemaMapping.GetColumnType(keyspace, tableName, colName)
if err != nil {
return nil, nil, nil, err
}
acctualVal, err := handleColumnType(val, columnType, placeholder, params)
if err != nil {
return nil, nil, nil, err
}
clause := Clause{
Column: colName,
Operator: operator,
Value: acctualVal,
IsPrimaryKey: columnType.IsPrimaryKey,
}
clauses = append(clauses, clause)
}
return clauses, params, paramKeys, nil
}
// parseColumnAndOperator() extracts the column name and operator from a relation element.
//
// Parameters:
// - val: A relation element context from which the column and operator are parsed.
//
// Returns:
// - A string representing the column name.
// - A string representing the operator used in the relation.
// - An error if parsing the column object or operator fails.
func parseColumnAndOperator(val cql.IRelationElementContext) (string, string, error) {
colObj := val.OBJECT_NAME(0)
if colObj == nil {
return "", "", errors.New("could not parse column object")
}
operator, err := getOperator(val)
if err != nil {
return "", "", err
}
colName := strings.ReplaceAll(colObj.GetText(), literalPlaceholder, "")
if colName == "" {
return "", "", errors.New("could not parse column name")
}
return colName, operator, nil
}
// getOperator() determines the operator used in a relation element.
//
// Parameters:
// - val: A relation element context from which the operator is extracted.
//
// Returns:
// - A string representing the operator.
// - An error if no supported operator is found.
func getOperator(val cql.IRelationElementContext) (string, error) {
switch {
case val.OPERATOR_EQ() != nil:
return val.OPERATOR_EQ().GetText(), nil
//
default:
return "", errors.New("no supported operator found")
}
}
// handleColumnType() processes the value associated with a column, formats it if necessary,
// and updates the parameters map with the formatted value.
//
// Parameters:
// - val: The relation element context containing the value to process.
// - columnType: A pointer to ColumnType providing type information for the column.
// - placeholder: A string used as the key in the parameters map.
// - params: A map for storing formatted parameter values.
//
// Returns:
// - A string representing the actual value.
// - An error if parsing or formatting the value fails.
func handleColumnType(val cql.IRelationElementContext, columnType *schemaMapping.Column, placeholder string, params map[string]interface{}) (string, error) {
if columnType == nil || columnType.CQLType == "" {
return "", nil
}
valConst := val.Constant()
if valConst == nil {
return "", errors.New("could not parse value from query for one of the clauses")
}
value := strings.ReplaceAll(valConst.GetText(), "'", "")
if value == "" {
return "", errors.New("could not parse value from query for one of the clauses")
}
acctualVal := value
if value != "?" {
formattedVal, err := formatValues(value, columnType.CQLType, 4)
if err != nil {
return "", err
}
params[placeholder] = formattedVal
}
return acctualVal, nil
}
// TranslateDeleteQuerytoBigtable() translate the CQL Delete Query into bigtable mutation api equivalent.
//
// Parameters:
// - queryStr: CQL delete query with condition
//
// Returns: QueryClauses and an error if any.
func (t *Translator) TranslateDeleteQuerytoBigtable(queryStr string, isPreparedQuery bool) (*DeleteQueryMapping, error) {
lowerQuery := strings.ToLower(queryStr)
query := renameLiterals(queryStr)
lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
p := cql.NewCqlParser(stream)
deleteObj := p.Delete_()
if deleteObj == nil {
return nil, errors.New("error while parsing delete object")
}
kwDeleteObj := deleteObj.KwDelete()
if kwDeleteObj == nil {
return nil, errors.New("error while parsing delete object")
}
tableSpec, err := parseTableFromDelete(deleteObj.FromSpec())
if err != nil {
return nil, err
} else if tableSpec.TableName == "" || tableSpec.KeyspaceName == "" {
return nil, errors.New("TranslateDeletetQuerytoBigtable: No table or keyspace name found in the query")
}
if !t.SchemaMappingConfig.InstanceExists(tableSpec.KeyspaceName) {
return nil, fmt.Errorf("keyspace %s does not exist", tableSpec.KeyspaceName)
}
if !t.SchemaMappingConfig.TableExist(tableSpec.KeyspaceName, tableSpec.TableName) {
return nil, fmt.Errorf("table %s does not exist", tableSpec.TableName)
}
selectedColumns, err := parseDeleteColumns(deleteObj.DeleteColumnList(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
if err != nil {
return nil, err
}
timestampInfo, err := GetTimestampInfoForRawDelete(lowerQuery, deleteObj)
if err != nil {
return nil, err
}
ifExistObj := deleteObj.IfExist()
var ifExist bool = false
if ifExistObj != nil {
val := strings.ToLower(ifExistObj.GetText())
if val == ifExists {
ifExist = true
}
}
var QueryClauses QueryClauses
if hasWhere(lowerQuery) {
resp, err := parseClauseFromDelete(deleteObj.WhereSpec(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
if err != nil {
return nil, errors.New("TranslateDeletetQuerytoBigtable: Invalid Where clause condition")
}
QueryClauses = *resp
}
primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableSpec.TableName, tableSpec.KeyspaceName)
if err != nil {
return nil, err
}
var primaryKeysFound []string
pkValues := make(map[string]interface{})
for _, key := range primaryKeys {
for _, clause := range QueryClauses.Clauses {
if !clause.IsPrimaryKey {
return nil, fmt.Errorf("non PRIMARY KEY columns found in where clause: %s", clause.Column)
}
if clause.IsPrimaryKey && clause.Operator == "=" && key == clause.Column {
pkValues[clause.Column] = clause.Value
primaryKeysFound = append(primaryKeysFound, fmt.Sprintf("%v", clause.Value))
}
}
}
// The below code checking the reuired primary keys and actual primary keys when we are having clause statements
if len(primaryKeysFound) != len(primaryKeys) && len(QueryClauses.Clauses) > 0 {
missingPrime := findFirstMissingKey(primaryKeys, primaryKeysFound)
missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableSpec.TableName, tableSpec.KeyspaceName, missingPrime)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingPrime)
}
var rowKey string
if !isPreparedQuery {
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableSpec.TableName, tableSpec.KeyspaceName, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, pkValues, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, fmt.Errorf("key encoding failed. %w", err)
}
rowKey = string(rowKeyBytes)
}
deleteQueryData := &DeleteQueryMapping{
Query: query,
QueryType: DELETE,
Table: tableSpec.TableName,
Keyspace: tableSpec.KeyspaceName,
Clauses: QueryClauses.Clauses,
Params: QueryClauses.Params,
ParamKeys: QueryClauses.ParamKeys,
PrimaryKeys: primaryKeys,
RowKey: rowKey,
TimestampInfo: timestampInfo,
IfExists: ifExist,
SelectedColumns: selectedColumns,
}
return deleteQueryData, nil
}
// BuildDeletePrepareQuery() Function to accept the values clause columns and form the rowKey and return the same
func (t *Translator) BuildDeletePrepareQuery(values []*primitive.Value, st *DeleteQueryMapping, variableColumnMetadata []*message.ColumnMetadata, protocolV primitive.ProtocolVersion) (string, TimestampInfo, error) {
timestamp, values, err := ProcessTimestampByDelete(st, values)
if err != nil {
return "", TimestampInfo{}, fmt.Errorf("error while getting timestamp value")
}
valueMap := make(map[string]interface{})
for i, col := range variableColumnMetadata {
val, _ := utilities.DecodeBytesToCassandraColumnType(values[i].Contents, variableColumnMetadata[i].Type, protocolV)
valueMap[col.Name] = val
}
pmks, err := t.SchemaMappingConfig.GetPkByTableName(st.Table, st.Keyspace)
if err != nil {
return "", TimestampInfo{}, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, valueMap, t.EncodeIntValuesWithBigEndian)
if err != nil {
return "", timestamp, fmt.Errorf("key encoding failed. %w", err)
}
rowKey := string(rowKeyBytes)
return rowKey, timestamp, nil
}
// Parses the delete columns from a CQL DELETE statement and returns the selected columns with their associated map keys or list indices.
func parseDeleteColumns(deleteColumns cql.IDeleteColumnListContext, tableName string, tableConf *schemaMapping.SchemaMappingConfig, keySpace string) ([]schemaMapping.SelectedColumns, error) {
if deleteColumns == nil {
return nil, nil
}
cols := deleteColumns.AllDeleteColumnItem()
var Columns []schemaMapping.SelectedColumns
var decimalLiteral, stringLiteral string
for _, v := range cols {
var Column schemaMapping.SelectedColumns
Column.Name = v.OBJECT_NAME().GetText()
if v.LS_BRACKET() != nil {
if v.DecimalLiteral() != nil { // for list index
decimalLiteral = v.DecimalLiteral().GetText()
Column.ListIndex = decimalLiteral
}
if v.StringLiteral() != nil { //for map Key
stringLiteral = v.StringLiteral().GetText()
stringLiteral = strings.Trim(stringLiteral, "'")
Column.MapKey = stringLiteral
}
}
_, err := tableConf.GetColumnType(keySpace, tableName, Column.Name)
if err != nil {
return nil, fmt.Errorf("undefined column name %s in table %s.%s", Column.Name, keySpace, tableName)
}
Columns = append(Columns, Column)
}
return Columns, nil
}
// findFirstMissingKey() finds the first primary key that's missing from primaryKeysFound
func findFirstMissingKey(primaryKeys []string, primaryKeysFound []string) string {
foundMap := make(map[string]bool)
for _, key := range primaryKeysFound {
foundMap[key] = true
}
for _, key := range primaryKeys {
if !foundMap[key] {
return key
}
}
return ""
}