in common/elasticsearch/esql/select.go [30:148]
func (e *ESql) convertSelect(sel sqlparser.Select, domainID string, pagination ...interface{}) (dsl string, sortField []string, err error) {
if sel.Distinct != "" {
err := fmt.Errorf(`esql: SELECT DISTINCT not supported. use GROUP BY instead`)
return "", nil, err
}
var rootParent sqlparser.Expr
// a map that contains the main components of a query
dslMap := make(map[string]interface{})
// handle WHERE keyword
if sel.Where != nil {
dslQuery, err := e.convertWhereExpr(sel.Where.Expr, rootParent)
if err != nil {
return "", nil, err
}
dslMap["query"] = dslQuery
}
// cadence special handling: add domain ID query and time query bounds
if e.cadence {
e.addCadenceDomainTimeQuery(sel, domainID, dslMap)
}
// handle FROM keyword, currently only support 1 target table
if len(sel.From) != 1 {
if len(sel.From) == 0 {
err = fmt.Errorf("esql: invalid from expressino: no from expression specified")
} else {
err = fmt.Errorf("esql: join not supported")
}
return "", nil, err
}
// handle SELECT keyword
_, selectedColNameSlice, aggNameSlice, err := e.extractSelectedExpr(sel.SelectExprs)
if err != nil {
return "", nil, err
}
if len(selectedColNameSlice) > 0 {
colNames := `"` + strings.Join(selectedColNameSlice, `", "`) + `"`
dslMap["_source"] = fmt.Sprintf(`{"includes": [%v]}`, colNames)
}
// handle all aggregations, including GROUP BY, SELECT <agg function>, ORDER BY <agg function>, HAVING
dslAgg, err := e.convertAgg(sel)
if err != nil {
return "", nil, err
}
if dslAgg != "" || len(aggNameSlice) > 0 {
if dslAgg != "" {
dslMap["aggs"] = dslAgg
}
// do not return document contents if this is an aggregation query
dslMap["size"] = 0
} else {
// handle LIMIT and OFFSET keyword, these 2 keywords only works in non-aggregation query
dslMap["size"] = e.pageSize
if sel.Limit != nil {
if sel.Limit.Offset != nil {
dslMap["from"] = sqlparser.String(sel.Limit.Offset)
}
dslMap["size"] = sqlparser.String(sel.Limit.Rowcount)
}
// handle pagination
var searchAfterSlice []string
for _, v := range pagination {
switch v.(type) {
case int:
searchAfterSlice = append(searchAfterSlice, fmt.Sprintf(`%v`, v))
default:
searchAfterSlice = append(searchAfterSlice, fmt.Sprintf(`"%v"`, v))
}
}
if len(searchAfterSlice) > 0 {
searchAfterStr := strings.Join(searchAfterSlice, ",")
dslMap["search_after"] = fmt.Sprintf(`[%v]`, searchAfterStr)
}
}
// handle ORDER BY <column name>
// if it is an aggregate query, no point to order
if _, exist := dslMap["aggs"]; !exist && len(aggNameSlice) == 0 {
var orderBySlice []string
for _, orderExpr := range sel.OrderBy {
var colNameStr string
if colName, ok := orderExpr.Expr.(*sqlparser.ColName); ok {
colNameStr, err = e.convertColName(colName)
if err != nil {
return "", nil, err
}
} else {
err := fmt.Errorf(`esql: mix order by aggregations and column names`)
return "", nil, err
}
colNameStr = strings.Trim(colNameStr, "`")
orderByStr := fmt.Sprintf(`{"%v": "%v"}`, colNameStr, orderExpr.Direction)
orderBySlice = append(orderBySlice, orderByStr)
sortField = append(sortField, colNameStr)
}
// cadence special handling: add runID as sorting tie breaker
if e.cadence {
orderBySlice, sortField, err = e.addCadenceSort(orderBySlice, sortField)
if err != nil {
return "", nil, err
}
}
if len(orderBySlice) > 0 {
dslMap["sort"] = fmt.Sprintf("[%v]", strings.Join(orderBySlice, ","))
}
}
// generate the final json query
var dslQuerySlice []string
for tag, content := range dslMap {
dslQuerySlice = append(dslQuerySlice, fmt.Sprintf(`"%v": %v`, tag, content))
}
dsl = "{" + strings.Join(dslQuerySlice, ",") + "}"
return dsl, sortField, nil
}