in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_select.go [588:665]
func getBigtableSelectQuery(t *Translator, data *SelectQueryMap) (string, error) {
column := ""
var columns []string
var err error
var aliasMap map[string]AsKeywordMeta
if data.ColumnMeta.Star {
column = STAR
} else {
isGroupBy := false
if len(data.GroupByColumns) > 0 {
isGroupBy = true
}
aliasMap, columns, err = processStrings(t, data.ColumnMeta.Column, data.Table, data.Keyspace, isGroupBy)
if err != nil {
return "nil", err
}
data.AliasMap = aliasMap
column = strings.Join(columns, ",")
}
if column == "" && data.Table == "" {
return "", errors.New("could not prepare the select query due to incomplete information")
}
btQuery := fmt.Sprintf("SELECT %s FROM %s", column, data.Table)
whereCondition, err := buildWhereClause(data.Clauses, t, data.Table, data.Keyspace)
if err != nil {
return "nil", err
}
if whereCondition != "" {
btQuery += whereCondition
}
if data.OrderBy.IsOrderBy {
if colMeta, ok := t.SchemaMappingConfig.TablesMetaData[data.Keyspace][data.Table][data.OrderBy.Column]; ok {
if colMeta.IsPrimaryKey {
btQuery = btQuery + " ORDER BY " + data.OrderBy.Column + " " + string(data.OrderBy.Operation)
} else if !colMeta.IsCollection {
orderByKey := t.SchemaMappingConfig.SystemColumnFamily + "['" + data.OrderBy.Column + "']"
btQuery = btQuery + " ORDER BY " + orderByKey + " " + string(data.OrderBy.Operation)
} else {
return "", errors.New("order by on collection data type is not supported")
}
} else {
return "", errors.New("Undefined column name " + data.OrderBy.Column + " in table " + data.Keyspace + "." + data.Table)
}
}
if data.Limit.IsLimit {
val := data.Limit.Count
if val == questionMark || strings.Contains(val, questionMark) {
val = "@" + limitPlaceholder
}
btQuery = btQuery + " " + "LIMIT" + " " + val
}
if len(data.GroupByColumns) > 0 {
btQuery = btQuery + " GROUP BY "
groupBykeys := []string{}
for _, col := range data.GroupByColumns {
if colMeta, ok := t.SchemaMappingConfig.TablesMetaData[data.Keyspace][data.Table][col]; ok {
if !colMeta.IsCollection {
col, err := castColumns(colMeta, t.SchemaMappingConfig.SystemColumnFamily)
if err != nil {
return "", err
}
groupBykeys = append(groupBykeys, col)
} else {
return "", errors.New("group by on collection data type is not supported")
}
}
}
btQuery = btQuery + strings.Join(groupBykeys, ",")
}
btQuery += ";"
return btQuery, nil
}