in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_delete.go [315:425]
func (t *Translator) TranslateDeleteQuerytoBigtable(queryStr string, isPreparedQuery bool) (*DeleteQueryMapping, error) {
lowerQuery := strings.ToLower(queryStr)
query := renameLiterals(queryStr)
lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
p := cql.NewCqlParser(stream)
deleteObj := p.Delete_()
if deleteObj == nil {
return nil, errors.New("error while parsing delete object")
}
kwDeleteObj := deleteObj.KwDelete()
if kwDeleteObj == nil {
return nil, errors.New("error while parsing delete object")
}
tableSpec, err := parseTableFromDelete(deleteObj.FromSpec())
if err != nil {
return nil, err
} else if tableSpec.TableName == "" || tableSpec.KeyspaceName == "" {
return nil, errors.New("TranslateDeletetQuerytoBigtable: No table or keyspace name found in the query")
}
if !t.SchemaMappingConfig.InstanceExists(tableSpec.KeyspaceName) {
return nil, fmt.Errorf("keyspace %s does not exist", tableSpec.KeyspaceName)
}
if !t.SchemaMappingConfig.TableExist(tableSpec.KeyspaceName, tableSpec.TableName) {
return nil, fmt.Errorf("table %s does not exist", tableSpec.TableName)
}
selectedColumns, err := parseDeleteColumns(deleteObj.DeleteColumnList(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
if err != nil {
return nil, err
}
timestampInfo, err := GetTimestampInfoForRawDelete(lowerQuery, deleteObj)
if err != nil {
return nil, err
}
ifExistObj := deleteObj.IfExist()
var ifExist bool = false
if ifExistObj != nil {
val := strings.ToLower(ifExistObj.GetText())
if val == ifExists {
ifExist = true
}
}
var QueryClauses QueryClauses
if hasWhere(lowerQuery) {
resp, err := parseClauseFromDelete(deleteObj.WhereSpec(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
if err != nil {
return nil, errors.New("TranslateDeletetQuerytoBigtable: Invalid Where clause condition")
}
QueryClauses = *resp
}
primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableSpec.TableName, tableSpec.KeyspaceName)
if err != nil {
return nil, err
}
var primaryKeysFound []string
pkValues := make(map[string]interface{})
for _, key := range primaryKeys {
for _, clause := range QueryClauses.Clauses {
if !clause.IsPrimaryKey {
return nil, fmt.Errorf("non PRIMARY KEY columns found in where clause: %s", clause.Column)
}
if clause.IsPrimaryKey && clause.Operator == "=" && key == clause.Column {
pkValues[clause.Column] = clause.Value
primaryKeysFound = append(primaryKeysFound, fmt.Sprintf("%v", clause.Value))
}
}
}
// The below code checking the reuired primary keys and actual primary keys when we are having clause statements
if len(primaryKeysFound) != len(primaryKeys) && len(QueryClauses.Clauses) > 0 {
missingPrime := findFirstMissingKey(primaryKeys, primaryKeysFound)
missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableSpec.TableName, tableSpec.KeyspaceName, missingPrime)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingPrime)
}
var rowKey string
if !isPreparedQuery {
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableSpec.TableName, tableSpec.KeyspaceName, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, pkValues, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, fmt.Errorf("key encoding failed. %w", err)
}
rowKey = string(rowKeyBytes)
}
deleteQueryData := &DeleteQueryMapping{
Query: query,
QueryType: DELETE,
Table: tableSpec.TableName,
Keyspace: tableSpec.KeyspaceName,
Clauses: QueryClauses.Clauses,
Params: QueryClauses.Params,
ParamKeys: QueryClauses.ParamKeys,
PrimaryKeys: primaryKeys,
RowKey: rowKey,
TimestampInfo: timestampInfo,
IfExists: ifExist,
SelectedColumns: selectedColumns,
}
return deleteQueryData, nil
}