func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_insert.go [177:315]


func (t *Translator) TranslateInsertQuerytoBigtable(queryStr string, protocolV primitive.ProtocolVersion, isPreparedQuery bool) (*InsertQueryMapping, error) {
	query := renameLiterals(queryStr)
	lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
	stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
	p := cql.NewCqlParser(stream)

	insertObj := p.Insert()
	if insertObj == nil {
		return nil, errors.New("could not parse insert object")
	}
	kwInsertObj := insertObj.KwInsert()
	if kwInsertObj == nil {
		return nil, errors.New("could not parse insert object")
	}
	insertObj.KwInto()
	keyspace := insertObj.Keyspace()
	table := insertObj.Table()

	var columnsResponse ColumnsResponse

	if keyspace == nil {
		return nil, errors.New("invalid input paramaters found for keyspace")
	}
	keyspaceObj := keyspace.OBJECT_NAME()
	if keyspaceObj == nil {
		return nil, errors.New("invalid input paramaters found for keyspace")
	}

	if table == nil {
		return nil, errors.New("invalid input paramaters found for table")
	}
	tableobj := table.OBJECT_NAME()
	if tableobj == nil {
		return nil, errors.New("invalid input paramaters found for table")
	}
	tableName := tableobj.GetText()
	keyspaceName := keyspaceObj.GetText()

	if keyspaceName == "" {
		return nil, errors.New("keyspace is null")
	}
	if !t.SchemaMappingConfig.InstanceExists(keyspaceName) {
		return nil, fmt.Errorf("keyspace %s does not exist", keyspaceName)
	}
	if !t.SchemaMappingConfig.TableExist(keyspaceName, tableName) {
		return nil, fmt.Errorf("table %s does not exist", tableName)
	}

	ifNotExistObj := insertObj.IfNotExist()
	var ifNotExists bool = false
	if ifNotExistObj != nil {
		val := strings.ToLower(ifNotExistObj.GetText())
		if val == "ifnotexists" {
			ifNotExists = true
		}
	}
	resp, err := parseColumnsAndValuesFromInsert(insertObj.InsertColumnSpec(), tableName, t.SchemaMappingConfig, keyspaceName)
	if err != nil {
		return nil, err
	}

	if resp != nil {
		columnsResponse = *resp
	}
	params, values, unencrypted, err := setParamsFromValues(insertObj.InsertValuesSpec(), columnsResponse.Columns, t.SchemaMappingConfig, protocolV, tableName, keyspaceName, isPreparedQuery)
	if err != nil {
		return nil, err
	}

	timestampInfo, err := GetTimestampInfo(queryStr, insertObj, int32(len(columnsResponse.Columns)))
	if err != nil {
		return nil, err
	}
	var delColumnFamily []string
	var rowKey string
	var newValues []interface{} = values
	var newColumns []Column = columnsResponse.Columns
	var rawOutput *ProcessRawCollectionsOutput // Declare rawOutput

	primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableName, keyspaceName)
	if err != nil {
		return nil, err
	}

	if !ValidateRequiredPrimaryKeys(primaryKeys, columnsResponse.PrimayColumns) {
		missingKey := findFirstMissingKey(primaryKeys, columnsResponse.PrimayColumns)
		missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableName, keyspaceName, missingKey)
		if err != nil {
			return nil, err
		}
		return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingKey)
	}

	if !isPreparedQuery {
		pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableName, keyspaceName, primaryKeys)
		if err != nil {
			return nil, err
		}
		rowKeyBytes, err := createOrderedCodeKey(pmks, unencrypted, t.EncodeIntValuesWithBigEndian)
		if err != nil {
			return nil, err
		}
		rowKey = string(rowKeyBytes)

		// Building new colum family, qualifier and values for collection type of data.
		rawInput := ProcessRawCollectionsInput{
			Columns:    columnsResponse.Columns,
			Values:     values,
			TableName:  tableName,
			Translator: t,
			KeySpace:   keyspaceName,
		}
		rawOutput, err = processCollectionColumnsForRawQueries(rawInput)
		if err != nil {
			return nil, fmt.Errorf("error processing raw collection columns: %w", err)
		}
		newColumns = rawOutput.NewColumns
		newValues = rawOutput.NewValues
		delColumnFamily = rawOutput.DelColumnFamily

	}

	insertQueryData := &InsertQueryMapping{
		Query:                query,
		QueryType:            INSERT,
		Table:                tableName,
		Keyspace:             keyspaceName,
		Columns:              newColumns,
		Values:               newValues,
		Params:               params,
		ParamKeys:            columnsResponse.ParamKeys,
		PrimaryKeys:          resp.PrimayColumns,
		RowKey:               rowKey,
		DeleteColumnFamilies: delColumnFamily,
		TimestampInfo:        timestampInfo,
		IfNotExists:          ifNotExists,
	}
	return insertQueryData, nil
}