func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_delete.go [315:425]


func (t *Translator) TranslateDeleteQuerytoBigtable(queryStr string, isPreparedQuery bool) (*DeleteQueryMapping, error) {
	lowerQuery := strings.ToLower(queryStr)
	query := renameLiterals(queryStr)
	lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
	stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
	p := cql.NewCqlParser(stream)

	deleteObj := p.Delete_()
	if deleteObj == nil {
		return nil, errors.New("error while parsing delete object")
	}
	kwDeleteObj := deleteObj.KwDelete()
	if kwDeleteObj == nil {
		return nil, errors.New("error while parsing delete object")
	}
	tableSpec, err := parseTableFromDelete(deleteObj.FromSpec())
	if err != nil {
		return nil, err
	} else if tableSpec.TableName == "" || tableSpec.KeyspaceName == "" {
		return nil, errors.New("TranslateDeletetQuerytoBigtable: No table or keyspace name found in the query")
	}
	if !t.SchemaMappingConfig.InstanceExists(tableSpec.KeyspaceName) {
		return nil, fmt.Errorf("keyspace %s does not exist", tableSpec.KeyspaceName)
	}
	if !t.SchemaMappingConfig.TableExist(tableSpec.KeyspaceName, tableSpec.TableName) {
		return nil, fmt.Errorf("table %s does not exist", tableSpec.TableName)
	}
	selectedColumns, err := parseDeleteColumns(deleteObj.DeleteColumnList(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
	if err != nil {
		return nil, err
	}
	timestampInfo, err := GetTimestampInfoForRawDelete(lowerQuery, deleteObj)
	if err != nil {
		return nil, err
	}
	ifExistObj := deleteObj.IfExist()
	var ifExist bool = false
	if ifExistObj != nil {
		val := strings.ToLower(ifExistObj.GetText())
		if val == ifExists {
			ifExist = true
		}
	}

	var QueryClauses QueryClauses

	if hasWhere(lowerQuery) {
		resp, err := parseClauseFromDelete(deleteObj.WhereSpec(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
		if err != nil {
			return nil, errors.New("TranslateDeletetQuerytoBigtable: Invalid Where clause condition")
		}
		QueryClauses = *resp
	}

	primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableSpec.TableName, tableSpec.KeyspaceName)
	if err != nil {
		return nil, err
	}
	var primaryKeysFound []string
	pkValues := make(map[string]interface{})

	for _, key := range primaryKeys {
		for _, clause := range QueryClauses.Clauses {
			if !clause.IsPrimaryKey {
				return nil, fmt.Errorf("non PRIMARY KEY columns found in where clause: %s", clause.Column)
			}
			if clause.IsPrimaryKey && clause.Operator == "=" && key == clause.Column {
				pkValues[clause.Column] = clause.Value
				primaryKeysFound = append(primaryKeysFound, fmt.Sprintf("%v", clause.Value))
			}
		}
	}
	// The below code checking the reuired primary keys and actual primary keys when we are having clause statements
	if len(primaryKeysFound) != len(primaryKeys) && len(QueryClauses.Clauses) > 0 {
		missingPrime := findFirstMissingKey(primaryKeys, primaryKeysFound)
		missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableSpec.TableName, tableSpec.KeyspaceName, missingPrime)
		if err != nil {
			return nil, err
		}
		return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingPrime)
	}

	var rowKey string
	if !isPreparedQuery {
		pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableSpec.TableName, tableSpec.KeyspaceName, primaryKeys)
		if err != nil {
			return nil, err
		}
		rowKeyBytes, err := createOrderedCodeKey(pmks, pkValues, t.EncodeIntValuesWithBigEndian)
		if err != nil {
			return nil, fmt.Errorf("key encoding failed. %w", err)
		}
		rowKey = string(rowKeyBytes)
	}

	deleteQueryData := &DeleteQueryMapping{
		Query:           query,
		QueryType:       DELETE,
		Table:           tableSpec.TableName,
		Keyspace:        tableSpec.KeyspaceName,
		Clauses:         QueryClauses.Clauses,
		Params:          QueryClauses.Params,
		ParamKeys:       QueryClauses.ParamKeys,
		PrimaryKeys:     primaryKeys,
		RowKey:          rowKey,
		TimestampInfo:   timestampInfo,
		IfExists:        ifExist,
		SelectedColumns: selectedColumns,
	}
	return deleteQueryData, nil
}