pkg/degradation-detector/fetchMetricValues.go (217 lines of code) (raw):

package degradation_detector import ( "context" "encoding/json" "errors" "fmt" "io" "log/slog" "net/http" "strings" "sync" "time" dataQuery "github.com/JetBrains/ij-perf-report-aggregator/pkg/data-query" "github.com/JetBrains/ij-perf-report-aggregator/pkg/util" "github.com/alitto/pond" ) type queryResult struct { timestamps []int64 values []int builds []string buildTypes []string } type QueryResultWithSettings struct { queryResult Settings } type queryProducer interface { query() dataQuery.Query } func FetchMetricsFromClickhouse(settings []Settings, client *http.Client, backendUrl string) chan QueryResultWithSettings { dataChan := make(chan QueryResultWithSettings, 5) go func() { defer close(dataChan) var wg sync.WaitGroup pool := pond.New(5, 1000) for _, setting := range settings { wg.Add(1) pool.Submit(func() { defer wg.Done() ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() data, err := getDataFromClickhouse(ctx, client, backendUrl, setting.query()) if err != nil { slog.Error("error while getting queryResult from clickhouse", "error", err, "settings", setting) return } slog.Debug("fetched from clickhouse", "settings", setting) dataChan <- QueryResultWithSettings{ queryResult: data, Settings: setting, } }) } wg.Wait() }() return dataChan } func getDataFromClickhouse(ctx context.Context, client *http.Client, backendUrl string, query dataQuery.Query) (queryResult, error) { response, err := getValuesFromServer(ctx, client, backendUrl, query) if err != nil { return queryResult{}, err } data, err := extractDataFromRequest(response) return data, err } func getValuesFromServer(ctx context.Context, client *http.Client, backendURL string, query dataQuery.Query) ([]byte, error) { url := backendURL + "/api/q/" queries := []dataQuery.Query{query} jsonQuery, err := json.Marshal(queries) if err != nil { return nil, fmt.Errorf("failed to marshal query: %w", err) } encoded, err := util.EncodeQuery(jsonQuery) if err != nil { return nil, fmt.Errorf("failed to encode query: %w", err) } req, err := http.NewRequestWithContext(ctx, http.MethodGet, url+encoded, http.NoBody) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } resp, err := client.Do(req) if err != nil { return nil, fmt.Errorf("failed to send GET request: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("failed to get data: %v", resp.Status) } body, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) } return body, err } func (s FleetStartupSettings) query() dataQuery.Query { fields := []dataQuery.QueryDimension{ {Name: "t", Sql: "toUnixTimestamp(generated_time)*1000"}, } filters := []dataQuery.QueryFilter{ {Field: "branch", Value: s.Branch}, {Field: "generated_time", Sql: ">subtractDays(now(),100)"}, {Field: "project", Value: "fleet"}, {Field: "machine", Value: s.Machine, Operator: "like"}, {Field: "triggeredBy", Value: ""}, } if strings.HasSuffix(s.Metric, ".end") { metricName, _ := strings.CutSuffix(s.Metric, ".end") filters = append(filters, dataQuery.QueryFilter{Field: "measures.name", Value: metricName}) fields = append(fields, dataQuery.QueryDimension{Name: "measures", SubName: "end", Sql: "(measures.start+measures.value)"}) } fields = append(fields, dataQuery.QueryDimension{Name: "Build", Sql: "concat(toString(build_c1),'.',toString(build_c2),'.',toString(build_c3))"}, dataQuery.QueryDimension{Name: "tc_build_type"}) query := dataQuery.Query{ Database: "fleet", Table: "report", Fields: fields, Filters: filters, Order: []string{"t"}, } return query } func (s StartupSettings) query() dataQuery.Query { fields := []dataQuery.QueryDimension{ {Name: "t", Sql: "toUnixTimestamp(generated_time)*1000"}, } filters := []dataQuery.QueryFilter{ {Field: "branch", Value: s.Branch}, {Field: "generated_time", Sql: ">subtractDays(now(),100)"}, {Field: "project", Value: s.Project}, {Field: "product", Value: s.Product}, {Field: "machine", Value: s.Machine, Operator: "like"}, {Field: "triggeredBy", Value: ""}, } if strings.Contains(s.Metric, "/") { filters = append(filters, dataQuery.QueryFilter{Field: "metrics.name", Value: s.Metric}) fields = append(fields, dataQuery.QueryDimension{Name: "metrics", SubName: "value"}) } if strings.HasSuffix(s.Metric, ".end") { metricName, _ := strings.CutSuffix(s.Metric, ".end") filters = append(filters, dataQuery.QueryFilter{Field: "measure.name", Value: metricName}) fields = append(fields, dataQuery.QueryDimension{Name: "measure", SubName: "end", Sql: "(measure.start+measure.duration)"}) } if !strings.HasSuffix(s.Metric, ".end") && !strings.Contains(s.Metric, "/") { fields = append(fields, dataQuery.QueryDimension{Name: s.Metric}) } fields = append(fields, dataQuery.QueryDimension{Name: "Build", Sql: "concat(toString(build_c1),'.',toString(build_c2))"}, dataQuery.QueryDimension{Name: "tc_build_type"}) query := dataQuery.Query{ Database: "ijDev", Table: "report", Fields: fields, Filters: filters, Order: []string{"t"}, } return query } func (s PerformanceSettings) query() dataQuery.Query { fields := []dataQuery.QueryDimension{ {Name: "t", Sql: "toUnixTimestamp(generated_time)*1000"}, {Name: "measures", SubName: "value"}, } if s.Db == "perfint" { fields = append(fields, dataQuery.QueryDimension{Name: "Build", Sql: "concat(toString(build_c1),'.',toString(build_c2))"}) } else { fields = append(fields, dataQuery.QueryDimension{Name: "tc_build_id"}) } fields = append(fields, dataQuery.QueryDimension{Name: "tc_build_type"}) query := dataQuery.Query{ Database: s.Db, Table: s.Table, Fields: fields, Filters: []dataQuery.QueryFilter{ {Field: "branch", Value: s.Branch}, {Field: "mode", Value: s.Mode}, {Field: "generated_time", Sql: ">subtractDays(now(),100)"}, {Field: "project", Value: s.Project}, {Field: "measures.name", Value: s.Metric}, {Field: "machine", Value: s.Machine, Operator: "like"}, {Field: "triggeredBy", Value: ""}, }, Order: []string{"t"}, } return query } func extractDataFromRequest(response []byte) (queryResult, error) { var data [][][]any err := json.Unmarshal(response, &data) if err != nil { return queryResult{}, fmt.Errorf("failed to decode JSON: %w", err) } if len(data) == 0 { return queryResult{}, errors.New("no data") } if len(data[0]) < 3 { return queryResult{}, errors.New("not enough data") } timestamps, err := SliceToSliceInt64(data[0][0]) if err != nil { return queryResult{}, fmt.Errorf("failed to convert values: %w", err) } values, err := SliceToSliceOfInt(data[0][1]) if err != nil { return queryResult{}, fmt.Errorf("failed to convert values: %w", err) } builds, err := SliceToSliceOfString(data[0][2]) if err != nil { return queryResult{}, fmt.Errorf("failed to convert values: %w", err) } buildTypes, err := SliceToSliceOfString(data[0][3]) if err != nil { return queryResult{}, fmt.Errorf("failed to convert values: %w", err) } return queryResult{ timestamps: timestamps, values: values, builds: builds, buildTypes: buildTypes, }, err }