pkg/data-query/dataQuery.go (401 lines of code) (raw):

package data_query import ( "context" "errors" "fmt" "math" "net/http" "strconv" "strings" "github.com/ClickHouse/ch-go/proto" sqlutil "github.com/JetBrains/ij-perf-report-aggregator/pkg/sql-util" "github.com/JetBrains/ij-perf-report-aggregator/pkg/util" "github.com/sakura-internet/go-rison/v4" "github.com/valyala/bytebufferpool" "github.com/valyala/quicktemplate" ) type Query struct { Database string `json:"db"` Table string `json:"table"` Flat bool `json:"flat"` Fields []QueryDimension `json:"fields,omitempty"` Filters []QueryFilter `json:"filters,omitempty"` Order []string `json:"order,omitempty"` Aggregator string `json:"-"` Dimensions []QueryDimension `json:"-"` TimeDimensionFormat string `json:"-"` } type QueryFilter struct { Field string `json:"f"` Value any `json:"v,omitempty"` Sql string `json:"q,omitempty"` Operator string `json:"o,omitempty"` Split bool `json:"s"` } type QueryDimension struct { Name string `json:"n"` Sql string `json:"sql"` SubName string `json:"subName,omitempty"` metricPath string metricName string metricValueName rune resultPropertyName string arrayJoin string } func ReadQueryV2(request *http.Request) ([]Query, bool, error) { decompressed, err := util.DecodeQuery(request.URL.Path[len("/api/q/"):]) if err != nil { return nil, false, fmt.Errorf("cannot decode query: %w", err) } if len(decompressed) == 0 { rawPath := request.URL.RawPath return nil, false, errors.New("query not found: " + rawPath) } wrappedAsArray := decompressed[0] == '[' parser := queryParsers.Get() defer queryParsers.Put(parser) // fileName := strconv.FormatUint(xxh3.HashString(request.URL.Path), 36) + ".json" // _ = os.WriteFile("/Volumes/data/queries/"+fileName, decompressed, 0644) list, err := readQuery(decompressed) if err != nil { return nil, false, err } return list, wrappedAsArray, nil } func getSplitParameters(query Query) (*SplitParameters, error) { splitParameters := SplitParameters{ numberOfSplits: 1, } if len(query.Filters) == 0 { // Handle case when there are no filters. return &splitParameters, nil } for _, filter := range query.Filters { if !filter.Split { continue } valueSlice, err := assertValueSlice(filter.Value) if err != nil { return nil, err } values, err := convertValuesToMap(valueSlice) if err != nil { return nil, err } splitParameters.numberOfSplits = len(values) splitParameters.splitField = filter.Field splitParameters.values = values } return &splitParameters, nil } // Helper function to assert filter.Value to a slice of empty interfaces. func assertValueSlice(value any) ([]any, error) { valueSlice, ok := value.([]any) if !ok { return nil, fmt.Errorf("invalid filter.Value type %T, expected array", value) } return valueSlice, nil } // Helper function to convert values in the slice to a map with string keys. func convertValuesToMap(valueSlice []any) (map[string]int, error) { values := make(map[string]int) for i, value := range valueSlice { strValue, ok := value.(string) if !ok { return nil, fmt.Errorf("invalid filter.Value type %T, expected string", value) } values[strValue] = i } return values, nil } func ReadQuery(request *http.Request) ([]Query, bool, error) { payload := request.URL.Path // array? arrayStart := strings.IndexRune(payload, '!') objectStart := strings.IndexRune(payload, '(') var index int wrappedAsArray := arrayStart < objectStart if wrappedAsArray { index = arrayStart } else { index = objectStart } if index == -1 { return nil, false, errors.New("query not found") } jsonData, err := rison.ToJSON([]byte(payload[index:]), rison.Rison) if err != nil { return nil, false, fmt.Errorf("cannot decode query: %w", err) } list, err := readQuery(jsonData) if err != nil { return nil, false, err } return list, wrappedAsArray, nil } func SelectRows(ctx context.Context, query Query, table string, dbSupplier DatabaseConnectionSupplier, totalWriter *quicktemplate.QWriter) error { sqlQuery, columnNameToIndex, err := buildSql(query, table) if err != nil { return err } splitParameters, err := getSplitParameters(query) if err != nil { return nil } columnBuffers := make([][]*bytebufferpool.ByteBuffer, splitParameters.numberOfSplits) err = executeQuery(ctx, sqlQuery, query, dbSupplier, func(_ context.Context, block proto.Block, result *proto.Results) error { if block.Rows == 0 { return nil } return writeResult(result, columnNameToIndex, columnBuffers, query, splitParameters) }) if err != nil { return err } writeBuffers(columnBuffers, totalWriter, query.Flat) return nil } func writeBuffers(columnBuffers [][]*bytebufferpool.ByteBuffer, totalWriter *quicktemplate.QWriter, isFlat bool) { if !isFlat && len(columnBuffers) == 1 { totalWriter.S("[") } for splitNumber, splitColumnBuffers := range columnBuffers { if splitNumber != 0 { totalWriter.S(",") } if len(columnBuffers) > 1 { totalWriter.S("[") } for columnIndex, buffer := range splitColumnBuffers { if columnIndex != 0 { totalWriter.S(",") } totalWriter.S("[") if buffer != nil { _, _ = buffer.WriteTo(totalWriter) byteBufferPool.Put(buffer) } totalWriter.S("]") } if len(columnBuffers) > 1 { totalWriter.S("]") } } if !isFlat && len(columnBuffers) == 1 { totalWriter.S("]") } } //gocyclo:ignore func buildSql(query Query, table string) (string, map[string]int, error) { var sb strings.Builder sb.WriteString("select") // the only array join is supported for now arrayJoin := "" for _, dimension := range query.Fields { if dimension.arrayJoin != "" { // the only array join is supported for now arrayJoin = dimension.arrayJoin // for field add distinct to filter duplicates out // sb.WriteString(" distinct ") break } } columnNameToIndex := make(map[string]int, len(query.Dimensions)+len(query.Fields)) columnIndex := 0 dimensionWritten := false for _, dimension := range query.Dimensions { // check that the field with the same name doesn't exists fieldExist := false for _, field := range query.Fields { if field.Name == dimension.Name { fieldExist = true break } } if !fieldExist { if !dimensionWritten { sb.WriteRune(' ') } else { sb.WriteRune(',') } columnNameToIndex[dimension.Name] = columnIndex columnIndex++ writeDimension(dimension, &sb) dimensionWritten = true } } // write extra fields to the end, so, it maybe skipped during serialization for i, field := range query.Fields { if i != 0 || dimensionWritten { sb.WriteRune(',') } sb.WriteRune(' ') if field.Sql != "" { columnNameToIndex[field.Name] = columnIndex columnIndex++ writeDimension(field, &sb) continue } effectiveColumnName := "" if query.Aggregator != "" { sb.WriteString(query.Aggregator) sb.WriteRune('(') } if field.metricPath == "" { sb.WriteString(field.Name) effectiveColumnName = field.Name } else { // select JSONExtractInt(arrayFirst(it -> JSONExtractString(it, 'n') = 'start main frontend', JSONExtractArrayRaw(raw_report, 'prepareAppInitActivities')), 'd') as v // from report; if field.metricValueName == 'e' { // arraySum(it -> it.1 = 's' or it.1 = 'd' ? it.2 : 0, JSONExtractKeysAndValues(arrayFirst(it -> JSONExtractString(it, 'n') = 'render', JSONExtractArrayRaw(raw_report, 'prepareAppInitActivities')), 'Int')) sb.WriteString("arraySum(it -> it.1 = 's' or it.1 = 'd' ? it.2 : 0, JSONExtractKeysAndValues(") writeExtractJsonObject(&sb, field) sb.WriteString(", 'Int'))") } else { sb.WriteString("JSONExtractInt(") writeExtractJsonObject(&sb, field) sb.WriteString(", '") sb.WriteRune(field.metricValueName) sb.WriteString("')") } } if query.Aggregator != "" { sb.WriteRune(')') } if field.resultPropertyName != "" { sb.WriteString(" as ") sb.WriteString(field.resultPropertyName) effectiveColumnName = field.resultPropertyName } else if query.Aggregator != "" { sb.WriteString(" as ") if field.arrayJoin == "" { effectiveColumnName = field.Name } else { // measures.values is not a valid field name effectiveColumnName = "measure_value" } sb.WriteString(effectiveColumnName) } columnNameToIndex[effectiveColumnName] = columnIndex columnIndex++ } sb.WriteString(" from ") sb.WriteString(table) if arrayJoin == "" { for _, dimension := range query.Dimensions { if dimension.arrayJoin != "" { arrayJoin = dimension.arrayJoin break } } } if arrayJoin != "" { sb.WriteString(" array join ") sb.WriteString(arrayJoin) } if len(query.Filters) != 0 { err := writeWhereClause(&sb, query) if err != nil { return "", nil, err } } if len(query.Dimensions) != 0 { sb.WriteString(" group by") for i, dimension := range query.Dimensions { if i != 0 { sb.WriteRune(',') } sb.WriteRune(' ') sb.WriteString(dimension.Name) } } if len(query.Order) != 0 { sb.WriteString(" order by") for i, field := range query.Order { if i != 0 { sb.WriteRune(',') } sb.WriteRune(' ') sb.WriteString(field) } } return sb.String(), columnNameToIndex, nil } func writeExtractJsonObject(sb *strings.Builder, field QueryDimension) { sb.WriteString("arrayFirst(it -> JSONExtractString(it, 'n') = '") sb.WriteString(field.metricName) sb.WriteString("', JSONExtractArrayRaw(raw_report, '") sb.WriteString(field.metricPath) sb.WriteString("'))") } func writeDimension(dimension QueryDimension, sb *strings.Builder) { if dimension.Sql == "" { sb.WriteString(dimension.Name) } else { sb.WriteString(dimension.Sql) sb.WriteString(" as ") // escape - maybe nested name with dot sb.WriteByte('`') sb.WriteString(dimension.Name) sb.WriteByte('`') } } func writeWhereClause(sb *strings.Builder, query Query) error { sb.WriteString(" where") for i, filter := range query.Filters { if i != 0 { sb.WriteString(" and") } sb.WriteString(" (") sb.WriteString(filter.Field) if filter.Sql != "" { if filter.Operator != "" { return errors.New("sql and operator are mutually exclusive") } if filter.Value != nil { return errors.New("sql and value are mutually exclusive") } sb.WriteByte(' ') sb.WriteString(filter.Sql) sb.WriteByte(')') continue } switch v := filter.Value.(type) { case int: sb.WriteString(filter.Operator) sb.WriteString(strconv.Itoa(filter.Value.(int))) //nolint:errcheck // we're sure that filter is int due to switch case on type case float64: sb.WriteString(filter.Operator) if v == math.Trunc(v) { sb.WriteString(strconv.Itoa(int(v))) } else { sb.WriteString(strconv.FormatFloat(v, 'f', -1, 64)) } case bool: sb.WriteString(filter.Operator) sb.WriteString(strconv.FormatBool(v)) case string: sb.WriteString(" ") sb.WriteString(filter.Operator) sb.WriteString(" ") writeString(sb, v) case []string: sb.WriteString(" in (") for j := range v { if j != 0 { sb.WriteByte(',') } writeString(sb, v[j]) } sb.WriteByte(')') case []any: sb.WriteString(" in (") for j := range v { if j != 0 { sb.WriteByte(',') } switch e := v[j].(type) { case string: writeString(sb, e) case bool: sb.WriteString(strconv.FormatBool(e)) default: return fmt.Errorf("filter value type [%T] is not supported", v[j]) } } sb.WriteByte(')') default: return fmt.Errorf("filter value type %T is not supported", v) } sb.WriteByte(')') } return nil } func writeString(sb *strings.Builder, s string) { sb.WriteByte('\'') _, _ = sqlutil.StringEscaper.WriteString(sb, s) sb.WriteByte('\'') }