func getBigtableSelectQuery()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_select.go [588:665]


func getBigtableSelectQuery(t *Translator, data *SelectQueryMap) (string, error) {
	column := ""
	var columns []string
	var err error
	var aliasMap map[string]AsKeywordMeta

	if data.ColumnMeta.Star {
		column = STAR
	} else {
		isGroupBy := false
		if len(data.GroupByColumns) > 0 {
			isGroupBy = true
		}
		aliasMap, columns, err = processStrings(t, data.ColumnMeta.Column, data.Table, data.Keyspace, isGroupBy)
		if err != nil {
			return "nil", err
		}
		data.AliasMap = aliasMap
		column = strings.Join(columns, ",")
	}

	if column == "" && data.Table == "" {
		return "", errors.New("could not prepare the select query due to incomplete information")
	}
	btQuery := fmt.Sprintf("SELECT %s FROM %s", column, data.Table)
	whereCondition, err := buildWhereClause(data.Clauses, t, data.Table, data.Keyspace)
	if err != nil {
		return "nil", err
	}
	if whereCondition != "" {
		btQuery += whereCondition
	}

	if data.OrderBy.IsOrderBy {
		if colMeta, ok := t.SchemaMappingConfig.TablesMetaData[data.Keyspace][data.Table][data.OrderBy.Column]; ok {
			if colMeta.IsPrimaryKey {
				btQuery = btQuery + " ORDER BY " + data.OrderBy.Column + " " + string(data.OrderBy.Operation)
			} else if !colMeta.IsCollection {
				orderByKey := t.SchemaMappingConfig.SystemColumnFamily + "['" + data.OrderBy.Column + "']"
				btQuery = btQuery + " ORDER BY " + orderByKey + " " + string(data.OrderBy.Operation)
			} else {
				return "", errors.New("order by on collection data type is not supported")
			}
		} else {
			return "", errors.New("Undefined column name " + data.OrderBy.Column + " in table " + data.Keyspace + "." + data.Table)
		}

	}

	if data.Limit.IsLimit {
		val := data.Limit.Count
		if val == questionMark || strings.Contains(val, questionMark) {
			val = "@" + limitPlaceholder
		}
		btQuery = btQuery + " " + "LIMIT" + " " + val
	}
	if len(data.GroupByColumns) > 0 {
		btQuery = btQuery + " GROUP BY "
		groupBykeys := []string{}
		for _, col := range data.GroupByColumns {
			if colMeta, ok := t.SchemaMappingConfig.TablesMetaData[data.Keyspace][data.Table][col]; ok {
				if !colMeta.IsCollection {
					col, err := castColumns(colMeta, t.SchemaMappingConfig.SystemColumnFamily)
					if err != nil {
						return "", err
					}
					groupBykeys = append(groupBykeys, col)
				} else {
					return "", errors.New("group by on collection data type is not supported")
				}
			}
		}
		btQuery = btQuery + strings.Join(groupBykeys, ",")
	}
	btQuery += ";"
	return btQuery, nil

}