in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_insert.go [177:315]
func (t *Translator) TranslateInsertQuerytoBigtable(queryStr string, protocolV primitive.ProtocolVersion, isPreparedQuery bool) (*InsertQueryMapping, error) {
query := renameLiterals(queryStr)
lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
p := cql.NewCqlParser(stream)
insertObj := p.Insert()
if insertObj == nil {
return nil, errors.New("could not parse insert object")
}
kwInsertObj := insertObj.KwInsert()
if kwInsertObj == nil {
return nil, errors.New("could not parse insert object")
}
insertObj.KwInto()
keyspace := insertObj.Keyspace()
table := insertObj.Table()
var columnsResponse ColumnsResponse
if keyspace == nil {
return nil, errors.New("invalid input paramaters found for keyspace")
}
keyspaceObj := keyspace.OBJECT_NAME()
if keyspaceObj == nil {
return nil, errors.New("invalid input paramaters found for keyspace")
}
if table == nil {
return nil, errors.New("invalid input paramaters found for table")
}
tableobj := table.OBJECT_NAME()
if tableobj == nil {
return nil, errors.New("invalid input paramaters found for table")
}
tableName := tableobj.GetText()
keyspaceName := keyspaceObj.GetText()
if keyspaceName == "" {
return nil, errors.New("keyspace is null")
}
if !t.SchemaMappingConfig.InstanceExists(keyspaceName) {
return nil, fmt.Errorf("keyspace %s does not exist", keyspaceName)
}
if !t.SchemaMappingConfig.TableExist(keyspaceName, tableName) {
return nil, fmt.Errorf("table %s does not exist", tableName)
}
ifNotExistObj := insertObj.IfNotExist()
var ifNotExists bool = false
if ifNotExistObj != nil {
val := strings.ToLower(ifNotExistObj.GetText())
if val == "ifnotexists" {
ifNotExists = true
}
}
resp, err := parseColumnsAndValuesFromInsert(insertObj.InsertColumnSpec(), tableName, t.SchemaMappingConfig, keyspaceName)
if err != nil {
return nil, err
}
if resp != nil {
columnsResponse = *resp
}
params, values, unencrypted, err := setParamsFromValues(insertObj.InsertValuesSpec(), columnsResponse.Columns, t.SchemaMappingConfig, protocolV, tableName, keyspaceName, isPreparedQuery)
if err != nil {
return nil, err
}
timestampInfo, err := GetTimestampInfo(queryStr, insertObj, int32(len(columnsResponse.Columns)))
if err != nil {
return nil, err
}
var delColumnFamily []string
var rowKey string
var newValues []interface{} = values
var newColumns []Column = columnsResponse.Columns
var rawOutput *ProcessRawCollectionsOutput // Declare rawOutput
primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableName, keyspaceName)
if err != nil {
return nil, err
}
if !ValidateRequiredPrimaryKeys(primaryKeys, columnsResponse.PrimayColumns) {
missingKey := findFirstMissingKey(primaryKeys, columnsResponse.PrimayColumns)
missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableName, keyspaceName, missingKey)
if err != nil {
return nil, err
}
return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingKey)
}
if !isPreparedQuery {
pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableName, keyspaceName, primaryKeys)
if err != nil {
return nil, err
}
rowKeyBytes, err := createOrderedCodeKey(pmks, unencrypted, t.EncodeIntValuesWithBigEndian)
if err != nil {
return nil, err
}
rowKey = string(rowKeyBytes)
// Building new colum family, qualifier and values for collection type of data.
rawInput := ProcessRawCollectionsInput{
Columns: columnsResponse.Columns,
Values: values,
TableName: tableName,
Translator: t,
KeySpace: keyspaceName,
}
rawOutput, err = processCollectionColumnsForRawQueries(rawInput)
if err != nil {
return nil, fmt.Errorf("error processing raw collection columns: %w", err)
}
newColumns = rawOutput.NewColumns
newValues = rawOutput.NewValues
delColumnFamily = rawOutput.DelColumnFamily
}
insertQueryData := &InsertQueryMapping{
Query: query,
QueryType: INSERT,
Table: tableName,
Keyspace: keyspaceName,
Columns: newColumns,
Values: newValues,
Params: params,
ParamKeys: columnsResponse.ParamKeys,
PrimaryKeys: resp.PrimayColumns,
RowKey: rowKey,
DeleteColumnFamilies: delColumnFamily,
TimestampInfo: timestampInfo,
IfNotExists: ifNotExists,
}
return insertQueryData, nil
}