func()

in common/elasticsearch/esql/select.go [30:148]


func (e *ESql) convertSelect(sel sqlparser.Select, domainID string, pagination ...interface{}) (dsl string, sortField []string, err error) {
	if sel.Distinct != "" {
		err := fmt.Errorf(`esql: SELECT DISTINCT not supported. use GROUP BY instead`)
		return "", nil, err
	}

	var rootParent sqlparser.Expr
	// a map that contains the main components of a query
	dslMap := make(map[string]interface{})

	// handle WHERE keyword
	if sel.Where != nil {
		dslQuery, err := e.convertWhereExpr(sel.Where.Expr, rootParent)
		if err != nil {
			return "", nil, err
		}
		dslMap["query"] = dslQuery
	}
	// cadence special handling: add domain ID query and time query bounds
	if e.cadence {
		e.addCadenceDomainTimeQuery(sel, domainID, dslMap)
	}

	// handle FROM keyword, currently only support 1 target table
	if len(sel.From) != 1 {
		if len(sel.From) == 0 {
			err = fmt.Errorf("esql: invalid from expressino: no from expression specified")
		} else {
			err = fmt.Errorf("esql: join not supported")
		}
		return "", nil, err
	}

	// handle SELECT keyword
	_, selectedColNameSlice, aggNameSlice, err := e.extractSelectedExpr(sel.SelectExprs)
	if err != nil {
		return "", nil, err
	}
	if len(selectedColNameSlice) > 0 {
		colNames := `"` + strings.Join(selectedColNameSlice, `", "`) + `"`
		dslMap["_source"] = fmt.Sprintf(`{"includes": [%v]}`, colNames)
	}

	// handle all aggregations, including GROUP BY, SELECT <agg function>, ORDER BY <agg function>, HAVING
	dslAgg, err := e.convertAgg(sel)
	if err != nil {
		return "", nil, err
	}
	if dslAgg != "" || len(aggNameSlice) > 0 {
		if dslAgg != "" {
			dslMap["aggs"] = dslAgg
		}
		// do not return document contents if this is an aggregation query
		dslMap["size"] = 0
	} else {
		// handle LIMIT and OFFSET keyword, these 2 keywords only works in non-aggregation query
		dslMap["size"] = e.pageSize
		if sel.Limit != nil {
			if sel.Limit.Offset != nil {
				dslMap["from"] = sqlparser.String(sel.Limit.Offset)
			}
			dslMap["size"] = sqlparser.String(sel.Limit.Rowcount)
		}
		// handle pagination
		var searchAfterSlice []string
		for _, v := range pagination {
			switch v.(type) {
			case int:
				searchAfterSlice = append(searchAfterSlice, fmt.Sprintf(`%v`, v))
			default:
				searchAfterSlice = append(searchAfterSlice, fmt.Sprintf(`"%v"`, v))
			}
		}
		if len(searchAfterSlice) > 0 {
			searchAfterStr := strings.Join(searchAfterSlice, ",")
			dslMap["search_after"] = fmt.Sprintf(`[%v]`, searchAfterStr)
		}
	}

	// handle ORDER BY <column name>
	// if it is an aggregate query, no point to order
	if _, exist := dslMap["aggs"]; !exist && len(aggNameSlice) == 0 {
		var orderBySlice []string
		for _, orderExpr := range sel.OrderBy {
			var colNameStr string
			if colName, ok := orderExpr.Expr.(*sqlparser.ColName); ok {
				colNameStr, err = e.convertColName(colName)
				if err != nil {
					return "", nil, err
				}
			} else {
				err := fmt.Errorf(`esql: mix order by aggregations and column names`)
				return "", nil, err
			}
			colNameStr = strings.Trim(colNameStr, "`")
			orderByStr := fmt.Sprintf(`{"%v": "%v"}`, colNameStr, orderExpr.Direction)
			orderBySlice = append(orderBySlice, orderByStr)
			sortField = append(sortField, colNameStr)
		}
		// cadence special handling: add runID as sorting tie breaker
		if e.cadence {
			orderBySlice, sortField, err = e.addCadenceSort(orderBySlice, sortField)
			if err != nil {
				return "", nil, err
			}
		}
		if len(orderBySlice) > 0 {
			dslMap["sort"] = fmt.Sprintf("[%v]", strings.Join(orderBySlice, ","))
		}
	}

	// generate the final json query
	var dslQuerySlice []string
	for tag, content := range dslMap {
		dslQuerySlice = append(dslQuerySlice, fmt.Sprintf(`"%v": %v`, tag, content))
	}
	dsl = "{" + strings.Join(dslQuerySlice, ",") + "}"
	return dsl, sortField, nil
}