func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_select.go [673:791]


func (t *Translator) TranslateSelectQuerytoBigtable(originalQuery string) (*SelectQueryMap, error) {
	lowerQuery := strings.ToLower(originalQuery)

	//Create copy of cassandra query where literals are substituted with a suffix
	query := renameLiterals(originalQuery)

	p, err := NewCqlParser(query, false)
	if err != nil {
		return nil, err
	}
	selectObj := p.Select_()
	if selectObj == nil {
		// Todo - Code is not reachable
		return nil, errors.New("ToBigtableSelect: Could not parse select object")
	}

	kwSelectObj := selectObj.KwSelect()
	if kwSelectObj == nil {
		// Todo - Code is not reachable
		return nil, errors.New("ToBigtableSelect: Could not parse select object")
	}

	queryType := kwSelectObj.GetText()
	columns, err := parseColumnsFromSelect(selectObj.SelectElements())
	if err != nil {
		return nil, err
	}

	tableSpec, err := parseTableFromSelect(selectObj.FromSpec())
	if err != nil {
		return nil, err
	}
	if !t.SchemaMappingConfig.InstanceExists(tableSpec.KeyspaceName) {
		return nil, fmt.Errorf("keyspace %s does not exist", tableSpec.KeyspaceName)
	}
	if !t.SchemaMappingConfig.TableExist(tableSpec.KeyspaceName, tableSpec.TableName) {
		return nil, fmt.Errorf("table %s does not exist", tableSpec.TableName)
	}
	var QueryClauses QueryClauses

	if hasWhere(lowerQuery) {
		resp, err := parseWhereByClause(selectObj.WhereSpec(), tableSpec.TableName, t.SchemaMappingConfig, tableSpec.KeyspaceName)
		if err != nil {
			return nil, err
		}
		QueryClauses = *resp
	}

	var groupBy []string
	if hasGroupBy(lowerQuery) {
		groupBy = parseGroupByColumn(lowerQuery)
	}
	var orderBy OrderBy
	if hasOrderBy(lowerQuery) {
		orderBy, err = parseOrderByFromSelect(selectObj.OrderSpec())
		if err != nil {
			// pass the original error to provide proper root cause of error.
			return nil, err
		}
	} else {
		orderBy.IsOrderBy = false
	}

	var limit Limit
	if hasLimit(lowerQuery) {
		limit, err = parseLimitFromSelect(selectObj.LimitSpec())
		if err != nil {
			return nil, err
		}
		if limit.Count == questionMark || strings.Contains(limit.Count, questionMark) {
			QueryClauses.ParamKeys = append(QueryClauses.ParamKeys, limitPlaceholder)
		}
	} else {
		limit.IsLimit = false
	}

	if limit.IsLimit && !orderBy.IsOrderBy {
		// Todo:- Add this logic in the parser
		limitPosition := strings.Index(strings.ToUpper(query), "LIMIT")
		orderByPosition := strings.Index(strings.ToUpper(query), "ORDER BY")
		if orderByPosition > limitPosition {
			return nil, errors.New("mismatched input 'Order' expecting EOF (...age = ? LIMIT ? [Order]...)")
		}
	}

	pmks, err := t.SchemaMappingConfig.GetPkByTableName(tableSpec.TableName, tableSpec.KeyspaceName)
	if err != nil {
		return nil, err
	}

	var pmkNames []string
	for _, pmk := range pmks {
		pmkNames = append(pmkNames, pmk.ColumnName)
	}

	selectQueryData := &SelectQueryMap{
		Query:           query,
		TranslatedQuery: "",
		QueryType:       queryType,
		Table:           tableSpec.TableName,
		Keyspace:        tableSpec.KeyspaceName,
		ColumnMeta:      columns,
		Clauses:         QueryClauses.Clauses,
		PrimaryKeys:     pmkNames,
		Limit:           limit,
		OrderBy:         orderBy,
		GroupByColumns:  groupBy,
		Params:          QueryClauses.Params,
		ParamKeys:       QueryClauses.ParamKeys,
	}

	translatedResult, err := getBigtableSelectQuery(t, selectQueryData)
	if err != nil {
		return nil, err
	}

	selectQueryData.TranslatedQuery = translatedResult
	return selectQueryData, nil
}