in common/elasticsearch/esql/aggregation.go [30:150]
func (e *ESql) convertAgg(sel sqlparser.Select) (dsl string, err error) {
if len(sel.GroupBy) == 0 && sel.Having != nil {
err = fmt.Errorf(`esql: HAVING used without GROUP BY`)
return "", err
}
colNameSetGroupBy := make(map[string]int)
var dslGroupBy string
if len(sel.GroupBy) != 0 {
dslGroupBy, colNameSetGroupBy, err = e.convertGroupByExpr(sel.GroupBy)
if err != nil {
return "", err
}
}
aggFuncExprSlice, colNameSlice, aggNameSlice, err := e.extractSelectedExpr(sel.SelectExprs)
if err != nil {
return "", err
}
// verify don't select col name out side agg group name
if err = e.checkSelGroupByCompatibility(colNameSlice, colNameSetGroupBy, aggNameSlice); err != nil {
return "", err
}
// explanations for getAggSelect, getAggOrderBy, getAggHaving:
// user can introduce aggregation functions from SELECT, ORDER BY and HAVING, for each different
// aggregation functions, we need to add a tag for it in "aggs" field, which let ES to do the calculation
// each aggregation's query body is in the form of "<tag>: {"<agg function name>": {"field": "<colName>"}}
//
// <tag> is generated by us, the convention in esql is tag = <agg function name>_<colName> to prevent dup tag name
// <agg function name> can be sum, max, min, count, avg
// <colName> is the field that agg apply to
//
// however, for each source, there can be dups, we don't want to introduce duplicate tags
// aggTagSet, aggTagOrderBySet, aggTagHavingSet are used to resolve dups, each of them is a map[string]int
// which maps the tag string to an offset integer which indicates the position of this tag in
// the corresponding aggxxxSlice
//
// aggNamexxxSlice stores agg functions names, aggTargetxxxSlice stores colNames, aggTagxxxSlice stores the tags
// they are used to generate final json query
// handle selected aggregation functions
aggNameSlice, aggTargetSlice, aggTagSlice, aggTagSet, err := e.getAggSelect(aggFuncExprSlice)
if err != nil {
return "", err
}
// handle order by aggregation functions
aggNameOrderBySlice, aggTargetOrderBySlice, aggTagOrderBySlice, aggDirOrderBySlice, aggTagOrderBySet, err := e.getAggOrderBy(sel.OrderBy)
if err != nil {
return "", err
}
// handle having aggregation functions
script, aggNameHavingSlice, aggTargetHavingSlice, aggTagHavingSlice, aggTagHavingSet, err := e.getAggHaving(sel.Having)
if err != nil {
return "", err
}
// add necessary aggregations originated from order by and having
for tag, i := range aggTagOrderBySet {
if _, exist := aggTagSet[tag]; !exist {
aggTagSet[tag] = len(aggTagSet)
aggNameSlice = append(aggNameSlice, aggNameOrderBySlice[i])
aggTargetSlice = append(aggTargetSlice, aggTargetOrderBySlice[i])
aggTagSlice = append(aggTagSlice, aggTagOrderBySlice[i])
}
}
for tag, i := range aggTagHavingSet {
if _, exist := aggTagSet[tag]; !exist {
aggTagSet[tag] = len(aggTagSet)
aggNameSlice = append(aggNameSlice, aggNameHavingSlice[i])
aggTargetSlice = append(aggTargetSlice, aggTargetHavingSlice[i])
aggTagSlice = append(aggTagSlice, aggTagHavingSlice[i])
}
}
// generate inside aggs field
var dslAgg string
if len(aggTagSlice) > 0 {
var dslAggSlice []string
for i, tag := range aggTagSlice {
if tag != "_count" {
dslAgg := fmt.Sprintf(`"%v": {"%v": {"field": "%v"}}`, tag, aggNameSlice[i], aggTargetSlice[i])
dslAggSlice = append(dslAggSlice, dslAgg)
}
}
if len(aggTagOrderBySlice) > 0 {
var dslOrderSlice []string
for i, tag := range aggTagOrderBySlice {
dslOrder := fmt.Sprintf(`{"%v": {"order": "%v"}}`, tag, aggDirOrderBySlice[i])
dslOrderSlice = append(dslOrderSlice, dslOrder)
}
dslAggOrder := strings.Join(dslOrderSlice, ",")
dslAggOrder = fmt.Sprintf(`"bucket_sort": {"bucket_sort": {"sort": [%v], "size": %v}}`, dslAggOrder, e.bucketNumber)
dslAggSlice = append(dslAggSlice, dslAggOrder)
}
if script != "" {
var bucketPathSlice []string
for tag := range aggTagHavingSet {
bucketPathSlice = append(bucketPathSlice, fmt.Sprintf(`"%v": "%v"`, tag, tag))
}
bucketPathStr := strings.Join(bucketPathSlice, ",")
bucketFilterStr := fmt.Sprintf(`"having": {"bucket_selector": {"buckets_path": {%v}, "script": "%v"}}`, bucketPathStr, script)
dslAggSlice = append(dslAggSlice, bucketFilterStr)
}
dslAgg = "{" + strings.Join(dslAggSlice, ",") + "}"
}
// generate final dsl for aggs field
// here "groupby" is just a tag and can be any unreserved word
if len(dslGroupBy) == 0 && len(aggTagSlice) == 0 {
dsl = ""
} else if len(aggTagSlice) == 0 {
dsl = fmt.Sprintf(`{"groupby": {%v}}`, dslGroupBy)
} else if len(dslGroupBy) == 0 {
dsl = dslAgg
} else {
dsl = fmt.Sprintf(`{"groupby": {%v, "aggs": %v}}`, dslGroupBy, dslAgg)
}
return dsl, nil
}