in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_update.go [142:359]
func (t *Translator) TranslateUpdateQuerytoBigtable(queryStr string, isPreparedQuery bool) (*UpdateQueryMapping, error) {
lowerQuery := strings.ToLower(queryStr)
query := renameLiterals(queryStr)
query, PrependColumns := TransformPrepareQueryForPrependList(query)
lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
p := cql.NewCqlParser(stream)
updateObj := p.Update()
if updateObj == nil {
return nil, errors.New("error parsing the update object")
}
kwUpdateObj := updateObj.KwUpdate()
if kwUpdateObj == nil {
return nil, errors.New("error parsing the update object")
}
keyspace := updateObj.Keyspace()
updateObj.DOT()
table := updateObj.Table()
if table == nil || keyspace == nil {
return nil, errors.New("invalid input paramaters found for table or keyspace")
}
keyspaceObj := keyspace.OBJECT_NAME()
if keyspaceObj == nil {
return nil, errors.New("invalid input paramaters found for keyspace")
}
keyspaceName := keyspaceObj.GetText()
if !t.SchemaMappingConfig.InstanceExists(keyspaceName) {
return nil, fmt.Errorf("keyspace %s does not exist", keyspaceName)
}
if !t.SchemaMappingConfig.TableExist(keyspaceName, table.GetText()) {
return nil, fmt.Errorf("table %s does not exist", table.GetText())
}
tableobj := table.OBJECT_NAME()
if tableobj == nil {
return nil, errors.New("invalid input paramaters found for table")
}
tableName := tableobj.GetText()
var err error
timestampInfo, err := GetTimestampInfoByUpdate(queryStr, updateObj)
if err != nil {
return nil, err
}
updateObj.KwSet()
assignmentObj := updateObj.Assignments()
allAssignmentObj := assignmentObj.AllAssignmentElement()
if allAssignmentObj == nil {
return nil, errors.New("error parsing all the assignment object")
}
setValues, err := parseAssignments(allAssignmentObj, tableName, t.SchemaMappingConfig, keyspaceName, &PrependColumns)
if err != nil {
return nil, err
}
var QueryClauses QueryClauses
if hasWhere(lowerQuery) {
resp, err := parseWhereByClause(updateObj.WhereSpec(), tableName, t.SchemaMappingConfig, keyspaceName)
if err != nil {
return nil, err
}
QueryClauses = *resp
for k, v := range QueryClauses.Params {
setValues.Params[k] = v
}
setValues.ParamKeys = append(setValues.ParamKeys, QueryClauses.ParamKeys...)
}
ifExistObj := updateObj.IfExist()
var ifExist bool = false
if ifExistObj != nil {
val := strings.ToLower(ifExistObj.GetText())
if val == ifExists {
ifExist = true
}
}
primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableName, keyspaceName)
var actualPrimaryKeys []string
for _, val := range QueryClauses.Clauses {
actualPrimaryKeys = append(actualPrimaryKeys, val.Column)
}
if !ValidateRequiredPrimaryKeys(primaryKeys, actualPrimaryKeys) {
missingPrime := findFirstMissingKey(primaryKeys, actualPrimaryKeys)
missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableName, keyspaceName, missingPrime)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingPrime)
}
if err != nil {
return nil, err
}
var primkeyvalues []string
var rowKey string
var values []interface{}
var columns []Column
for _, val := range setValues.UpdateSetValues {
values = append(values, val.Encrypted)
columns = append(columns, Column{Name: val.Column, ColumnFamily: t.SchemaMappingConfig.SystemColumnFamily, CQLType: val.CQLType})
}
var newValues []interface{} = values
var newColumns []Column = columns
var delColumns []Column
var delColumnFamily []string
var complexMeta map[string]*ComplexOperation
var rawOutput *ProcessRawCollectionsOutput // Declare rawOutput here
if !isPreparedQuery {
pkValues := make(map[string]interface{})
for _, key := range primaryKeys {
for _, clause := range QueryClauses.Clauses {
if !clause.IsPrimaryKey {
return nil, errors.New("any value other then primary key is not accepted in where clause")
}
if clause.IsPrimaryKey && clause.Operator == "=" && key == clause.Column {
pv := clause.Value
if strings.HasPrefix(clause.Value, "@") {
pv = clause.Value[1:]
}
pkValues[clause.Column] = QueryClauses.Params[pv]
primkeyvalues = append(primkeyvalues, fmt.Sprintf("%v", QueryClauses.Params[pv]))
}
}
}
if len(primkeyvalues) != len(primaryKeys) {
return nil, errors.New("missing primary key values in where clause")
}
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableName, keyspaceName, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, pkValues, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, err
}
rowKey = string(rowKeyBytes)
// Building new colum family, qualifier and values for collection type of data.
rawInput := ProcessRawCollectionsInput{
Columns: columns,
Values: values,
TableName: tableName,
Translator: t,
KeySpace: keyspaceName,
PrependColumns: PrependColumns,
}
rawOutput, err = processCollectionColumnsForRawQueries(rawInput)
if err != nil {
return nil, fmt.Errorf("error processing raw collection columns: %w", err)
}
newColumns = rawOutput.NewColumns
newValues = rawOutput.NewValues
delColumnFamily = rawOutput.DelColumnFamily
delColumns = rawOutput.DelColumns
complexMeta = rawOutput.ComplexMeta // Assign complexMeta from output
for _, val := range QueryClauses.Clauses {
var column Column
if columns, exists := t.SchemaMappingConfig.TablesMetaData[keyspaceName][tableName][val.Column]; exists {
column = Column{Name: columns.ColumnName, ColumnFamily: t.SchemaMappingConfig.SystemColumnFamily, CQLType: columns.ColumnType}
}
newColumns = append(newColumns, column)
pv := val.Value
if strings.HasPrefix(val.Value, "@") {
pv = val.Value[1:]
}
value := fmt.Sprintf("%v", QueryClauses.Params[pv])
encryVal, err := formatValues(value, column.CQLType, 4)
if err != nil {
return nil, err
}
newValues = append(newValues, encryVal)
}
} else {
complexMeta, err = t.ProcessComplexUpdate(columns, values, tableName, keyspaceName, PrependColumns)
if err != nil {
return nil, err
}
}
updateQueryData := &UpdateQueryMapping{
Query: query,
QueryType: UPDATE,
Table: tableName,
RowKey: rowKey,
Columns: newColumns,
Values: newValues,
DeleteColumnFamilies: delColumnFamily,
DeleteColumQualifires: delColumns,
IfExists: ifExist,
Keyspace: keyspaceName,
Clauses: QueryClauses.Clauses,
Params: setValues.Params,
ParamKeys: setValues.ParamKeys,
UpdateSetValues: setValues.UpdateSetValues,
PrimaryKeys: primaryKeys,
TimestampInfo: timestampInfo,
ComplexOperation: complexMeta,
}
return updateQueryData, nil
}