func()

in common/elasticsearch/esql/aggregation.go [30:150]


func (e *ESql) convertAgg(sel sqlparser.Select) (dsl string, err error) {
	if len(sel.GroupBy) == 0 && sel.Having != nil {
		err = fmt.Errorf(`esql: HAVING used without GROUP BY`)
		return "", err
	}

	colNameSetGroupBy := make(map[string]int)
	var dslGroupBy string
	if len(sel.GroupBy) != 0 {
		dslGroupBy, colNameSetGroupBy, err = e.convertGroupByExpr(sel.GroupBy)
		if err != nil {
			return "", err
		}
	}
	aggFuncExprSlice, colNameSlice, aggNameSlice, err := e.extractSelectedExpr(sel.SelectExprs)
	if err != nil {
		return "", err
	}
	// verify don't select col name out side agg group name
	if err = e.checkSelGroupByCompatibility(colNameSlice, colNameSetGroupBy, aggNameSlice); err != nil {
		return "", err
	}

	// explanations for getAggSelect, getAggOrderBy, getAggHaving:
	// user can introduce aggregation functions from SELECT, ORDER BY and HAVING, for each different
	// aggregation functions, we need to add a tag for it in "aggs" field, which let ES to do the calculation
	// each aggregation's query body is in the form of "<tag>: {"<agg function name>": {"field": "<colName>"}}
	//
	// <tag> is generated by us, the convention in esql is tag = <agg function name>_<colName> to prevent dup tag name
	// <agg function name> can be sum, max, min, count, avg
	// <colName> is the field that agg apply to
	//
	// however, for each source, there can be dups, we don't want to introduce duplicate tags
	// aggTagSet, aggTagOrderBySet, aggTagHavingSet are used to resolve dups, each of them is a map[string]int
	// which maps the tag string to an offset integer which indicates the position of this tag in
	// the corresponding aggxxxSlice
	//
	// aggNamexxxSlice stores agg functions names, aggTargetxxxSlice stores colNames, aggTagxxxSlice stores the tags
	// they are used to generate final json query

	// handle selected aggregation functions
	aggNameSlice, aggTargetSlice, aggTagSlice, aggTagSet, err := e.getAggSelect(aggFuncExprSlice)
	if err != nil {
		return "", err
	}

	// handle order by aggregation functions
	aggNameOrderBySlice, aggTargetOrderBySlice, aggTagOrderBySlice, aggDirOrderBySlice, aggTagOrderBySet, err := e.getAggOrderBy(sel.OrderBy)
	if err != nil {
		return "", err
	}

	// handle having aggregation functions
	script, aggNameHavingSlice, aggTargetHavingSlice, aggTagHavingSlice, aggTagHavingSet, err := e.getAggHaving(sel.Having)
	if err != nil {
		return "", err
	}

	// add necessary aggregations originated from order by and having
	for tag, i := range aggTagOrderBySet {
		if _, exist := aggTagSet[tag]; !exist {
			aggTagSet[tag] = len(aggTagSet)
			aggNameSlice = append(aggNameSlice, aggNameOrderBySlice[i])
			aggTargetSlice = append(aggTargetSlice, aggTargetOrderBySlice[i])
			aggTagSlice = append(aggTagSlice, aggTagOrderBySlice[i])
		}
	}
	for tag, i := range aggTagHavingSet {
		if _, exist := aggTagSet[tag]; !exist {
			aggTagSet[tag] = len(aggTagSet)
			aggNameSlice = append(aggNameSlice, aggNameHavingSlice[i])
			aggTargetSlice = append(aggTargetSlice, aggTargetHavingSlice[i])
			aggTagSlice = append(aggTagSlice, aggTagHavingSlice[i])
		}
	}

	// generate inside aggs field
	var dslAgg string
	if len(aggTagSlice) > 0 {
		var dslAggSlice []string
		for i, tag := range aggTagSlice {
			if tag != "_count" {
				dslAgg := fmt.Sprintf(`"%v": {"%v": {"field": "%v"}}`, tag, aggNameSlice[i], aggTargetSlice[i])
				dslAggSlice = append(dslAggSlice, dslAgg)
			}
		}
		if len(aggTagOrderBySlice) > 0 {
			var dslOrderSlice []string
			for i, tag := range aggTagOrderBySlice {
				dslOrder := fmt.Sprintf(`{"%v": {"order": "%v"}}`, tag, aggDirOrderBySlice[i])
				dslOrderSlice = append(dslOrderSlice, dslOrder)
			}
			dslAggOrder := strings.Join(dslOrderSlice, ",")
			dslAggOrder = fmt.Sprintf(`"bucket_sort": {"bucket_sort": {"sort": [%v], "size": %v}}`, dslAggOrder, e.bucketNumber)
			dslAggSlice = append(dslAggSlice, dslAggOrder)
		}
		if script != "" {
			var bucketPathSlice []string
			for tag := range aggTagHavingSet {
				bucketPathSlice = append(bucketPathSlice, fmt.Sprintf(`"%v": "%v"`, tag, tag))
			}
			bucketPathStr := strings.Join(bucketPathSlice, ",")
			bucketFilterStr := fmt.Sprintf(`"having": {"bucket_selector": {"buckets_path": {%v}, "script": "%v"}}`, bucketPathStr, script)
			dslAggSlice = append(dslAggSlice, bucketFilterStr)
		}
		dslAgg = "{" + strings.Join(dslAggSlice, ",") + "}"
	}

	// generate final dsl for aggs field
	// here "groupby" is just a tag and can be any unreserved word
	if len(dslGroupBy) == 0 && len(aggTagSlice) == 0 {
		dsl = ""
	} else if len(aggTagSlice) == 0 {
		dsl = fmt.Sprintf(`{"groupby": {%v}}`, dslGroupBy)
	} else if len(dslGroupBy) == 0 {
		dsl = dslAgg
	} else {
		dsl = fmt.Sprintf(`{"groupby": {%v, "aggs": %v}}`, dslGroupBy, dslAgg)
	}
	return dsl, nil
}