pkg/server/clickhouse.go (350 lines of code) (raw):
package server
import (
"encoding/json"
"fmt"
"math"
"net/http"
"slices"
"strings"
"sync"
"github.com/AndreyAkinshin/pragmastat/go/v3"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
degradation_detector "github.com/JetBrains/ij-perf-report-aggregator/pkg/degradation-detector"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/degradation-detector/statistic"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/outlier-detection"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"github.com/valyala/bytebufferpool"
)
func (t *StatsServer) openDatabaseConnection() (driver.Conn, error) {
return clickhouse.Open(&clickhouse.Options{
Addr: []string{t.dbUrl},
Auth: clickhouse.Auth{
Database: "ij",
},
Settings: map[string]any{
"readonly": 1,
"max_query_size": 1000000,
"max_memory_usage": 3221225472,
},
})
}
func toJSONBuffer(data any) (*bytebufferpool.ByteBuffer, error) {
jsonData, err := json.Marshal(data)
if err != nil {
return nil, err
}
buffer := bytebufferpool.Get()
_, err = buffer.Write(jsonData)
if err != nil {
return nil, err
}
return buffer, nil
}
type responseItem struct {
Project string
MeasureName string
Median float64
}
func (t *StatsServer) getBranchComparison(request *http.Request) (*bytebufferpool.ByteBuffer, bool, error) {
type requestParams struct {
Table string `json:"table"`
MeasureNames []string `json:"measure_names"`
Branch string `json:"branch"`
Machine string `json:"machine"`
Mode string `json:"mode"`
}
var params requestParams
data, err := util.DecodeQuery(request.URL.Path[len("/api/compareBranches/"):])
if err != nil {
return nil, false, err
}
err = json.Unmarshal(data, ¶ms)
if err != nil {
return nil, false, err
}
quotedMeasureNames := make([]string, len(params.MeasureNames))
for i, name := range params.MeasureNames {
quotedMeasureNames[i] = "'" + name + "'"
}
measureNamesString := strings.Join(quotedMeasureNames, ",")
mode := params.Mode
if params.Mode == "default" {
mode = ""
}
sql := fmt.Sprintf("SELECT project as Project, measure_name as MeasureName, arraySlice(groupArray(measure_value), 1, 50) AS MeasureValues FROM (SELECT project, measures.name as measure_name, measures.value as measure_value FROM %s ARRAY JOIN measures WHERE branch = '%s' AND measure_name in (%s) AND machine like '%s' and mode = '%s' ORDER BY generated_time DESC)GROUP BY project, measure_name;", params.Table, params.Branch, measureNamesString, params.Machine, mode)
db, err := t.openDatabaseConnection()
defer func(db driver.Conn) {
_ = db.Close()
}(db)
if err != nil {
return nil, false, err
}
var queryResults []struct {
Project string
MeasureName string
MeasureValues []int
}
err = db.Select(request.Context(), &queryResults, sql)
if err != nil {
return nil, false, err
}
response := getMedianValues(queryResults)
buffer, err := toJSONBuffer(response)
return buffer, true, err
}
func (t *StatsServer) getModeComparison(request *http.Request) (*bytebufferpool.ByteBuffer, bool, error) {
type requestParams struct {
Table string `json:"table"`
MeasureNames []string `json:"measure_names"`
Branch string `json:"branch"`
Machine string `json:"machine"`
Mode string `json:"mode"`
}
var params requestParams
data, err := util.DecodeQuery(request.URL.Path[len("/api/compareModes/"):])
if err != nil {
return nil, false, err
}
err = json.Unmarshal(data, ¶ms)
if err != nil {
return nil, false, err
}
quotedMeasureNames := make([]string, len(params.MeasureNames))
for i, name := range params.MeasureNames {
quotedMeasureNames[i] = "'" + name + "'"
}
measureNamesString := strings.Join(quotedMeasureNames, ",")
sql := fmt.Sprintf("SELECT project as Project, measure_name as MeasureName, arraySlice(groupArray(measure_value), 1, 50) AS MeasureValues FROM (SELECT project, measures.name as measure_name, measures.value as measure_value FROM %s ARRAY JOIN measures WHERE mode = '%s' AND branch = '%s' AND measure_name in (%s) AND machine like '%s' AND generated_time >subtractMonths(now(),1) ORDER BY generated_time DESC)GROUP BY project, measure_name;", params.Table, params.Mode, params.Branch, measureNamesString, params.Machine)
db, err := t.openDatabaseConnection()
defer func(db driver.Conn) {
_ = db.Close()
}(db)
if err != nil {
return nil, false, err
}
var queryResults []struct {
Project string
MeasureName string
MeasureValues []int
}
err = db.Select(request.Context(), &queryResults, sql)
if err != nil {
return nil, false, err
}
response := getMedianValues(queryResults)
buffer, err := toJSONBuffer(response)
return buffer, true, err
}
func getMedianValues(queryResults []struct {
Project string
MeasureName string
MeasureValues []int
},
) []responseItem {
responseChan := make(chan responseItem, len(queryResults))
var wg sync.WaitGroup
for _, result := range queryResults {
wg.Go(func() {
values := result.MeasureValues
slices.Reverse(values)
indexes := statistic.GetChangePointIndexes(values, 1)
var valuesAfterLastChangePoint []int
if len(indexes) == 0 {
valuesAfterLastChangePoint = values
} else {
lastIndex := indexes[len(indexes)-1]
valuesAfterLastChangePoint = values[lastIndex:]
}
median := statistic.Median(valuesAfterLastChangePoint)
responseChan <- responseItem{
Project: result.Project,
MeasureName: result.MeasureName,
Median: median,
}
})
}
go func() {
wg.Wait()
close(responseChan)
}()
response := make([]responseItem, 0, len(queryResults))
for item := range responseChan {
response = append(response, item)
}
return response
}
func (t *StatsServer) getDistinctHighlightingPasses(request *http.Request) (*bytebufferpool.ByteBuffer, bool, error) {
db, err := t.openDatabaseConnection()
if err != nil {
return nil, false, err
}
defer func(db driver.Conn) {
_ = db.Close()
}(db)
var queryResult []struct {
PassName string
}
sql := "SELECT DISTINCT arrayJoin((arrayFilter(x-> x LIKE 'highlighting/%', `metrics.name`))) as PassName from report where generated_time >subtractMonths(now(),12)"
err = db.Select(request.Context(), &queryResult, sql)
if err != nil {
return nil, false, err
}
passes := make([]string, len(queryResult))
for i, v := range queryResult {
passes[i] = v.PassName
}
buffer, err := toJSONBuffer(passes)
return buffer, true, err
}
func removeLastPart(s string) string {
awsIndex := strings.LastIndex(s, "aws")
if awsIndex != -1 {
return s[:awsIndex+3]
}
lastIndex := strings.LastIndex(s, "-")
if lastIndex == -1 {
return s
}
return s[:lastIndex]
}
func (t *StatsServer) CreateProcessMetricDataHandler() http.HandlerFunc {
return func(w http.ResponseWriter, request *http.Request) {
type requestParams struct {
TestName string `json:"testName"`
Branch string `json:"branch"`
Machine string `json:"machine"`
Product string `json:"product"`
MetricName string `json:"metricName"`
Mode string `json:"mode"`
}
var params requestParams
decoder := json.NewDecoder(request.Body)
defer request.Body.Close()
if err := decoder.Decode(¶ms); err != nil {
http.Error(w, "Invalid request body: "+err.Error(), http.StatusBadRequest)
return
}
// Replace all matches with %
machine := removeLastPart(params.Machine) + "%"
// Map product to table - this is a simplified mapping, adjust as needed
table := mapProductToTable(params.Product)
if table == "" {
http.Error(w, "Unknown product: "+params.Product, http.StatusBadRequest)
return
}
// Query the database for metric values
sql := fmt.Sprintf(`
SELECT groupArray(metric_value) AS MetricValues
FROM (
SELECT measures.value as metric_value
FROM perfintDev.%s ARRAY JOIN measures
WHERE branch = '%s'
AND measures.name = '%s'
AND machine LIKE '%s'
AND project = '%s'
AND mode = '%s'
AND generated_time >= now() - INTERVAL 1 MONTH
ORDER BY generated_time
)
`, table, params.Branch, params.MetricName, machine, params.TestName, params.Mode)
db, err := t.openDatabaseConnection()
if err != nil {
http.Error(w, "Failed to open database: "+err.Error(), http.StatusInternalServerError)
return
}
defer func(db driver.Conn) {
_ = db.Close()
}(db)
var queryResult struct {
MetricValues []int
}
err = db.QueryRow(request.Context(), sql).Scan(&queryResult.MetricValues)
if err != nil {
http.Error(w, "Database query failed: "+err.Error(), http.StatusInternalServerError)
return
}
if len(queryResult.MetricValues) == 0 {
http.Error(w, "No data found for the specified parameters", http.StatusNotFound)
return
}
// Run Change Point algorithm
changePoints := statistic.GetChangePointIndexes(queryResult.MetricValues, 3)
// Median difference and effect size thresholds (same as degradation detector)
medianDifferenceThreshold := 10.0
effectSizeThreshold := 2.0
// Filter change points based on median difference and effect size
validChangePoints := filterValidChangePoints(queryResult.MetricValues, changePoints, medianDifferenceThreshold, effectSizeThreshold)
// Get the segment to analyze for max value
// Strategy: use data after the last significant behavior change
var segmentForAnalysis []int
if len(validChangePoints) == 0 {
// No valid change points - use all data
segmentForAnalysis = queryResult.MetricValues
} else {
// Use data after the last valid change point
lastChangePoint := validChangePoints[len(validChangePoints)-1]
segmentForAnalysis = queryResult.MetricValues[lastChangePoint:]
}
// Remove outliers using MAD-based detection (windowSize=5, threshold=3)
processedData := outlier_detection.RemoveOutliers(segmentForAnalysis, 5, 3.0)
type processMetricResponse struct {
MaxValue int `json:"maxValue"`
}
response := processMetricResponse{
MaxValue: slices.Max(processedData),
}
jsonData, err := json.Marshal(response)
if err != nil {
http.Error(w, "Failed to marshal response: "+err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(jsonData)
}
}
// mapProductToTable maps product names to database table names
func mapProductToTable(product string) string {
switch product {
case "IU":
return "idea"
case "GO":
return "goland"
case "RM":
return "ruby"
case "PS":
return "phpstorm"
case "PY":
return "pycharm"
case "WS":
return "webstorm"
default:
return ""
}
}
// filterValidChangePoints filters change points based on median difference and effect size thresholds
// This logic is similar to the degradation detector's approach
func filterValidChangePoints(values []int, changePoints []int, medianDifferenceThreshold float64, effectSizeThreshold float64) []int {
if len(changePoints) == 0 {
return changePoints
}
// Split values into segments
segments := degradation_detector.GetSegmentsBetweenChangePoints(changePoints, values)
if len(segments) < 2 {
return []int{}
}
validChangePoints := make([]int, 0)
// Iterate through segments and validate change points
for i := 1; i < len(segments); i++ {
prevSegment := segments[i-1]
currentSegment := segments[i]
ratio, err := pragmastat.Ratio(currentSegment, prevSegment)
if err != nil {
continue
}
currentCenter, err := pragmastat.Center(currentSegment)
if err != nil {
continue
}
previousCenter, err := pragmastat.Center(prevSegment)
if err != nil {
continue
}
// Convert ratio to percentage change: (ratio - 1) * 100
percentageChange := math.Abs((ratio - 1) * 100)
absoluteChange := math.Abs(currentCenter - previousCenter)
// Check if change is significant enough
if absoluteChange < 10 || percentageChange < medianDifferenceThreshold {
continue
}
// Calculate effect size
effectSize := statistic.EffectSize(currentSegment, prevSegment)
if effectSize < effectSizeThreshold {
continue
}
// This is a valid change point
validChangePoints = append(validChangePoints, changePoints[i-1])
}
return validChangePoints
}