pkg/analyzer/ReportAnalyzer.go (327 lines of code) (raw):
package analyzer
import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/model"
"github.com/jackc/pgx/v5/pgxpool"
"go.deanishe.net/env"
)
type ReportAnalyzer struct {
config DatabaseConfiguration
insertQueue chan *ReportInfo
analyzeContext context.Context
waitGroup sync.WaitGroup
cancel func()
errOnce sync.Once
err error
InsertReportManager *InsertReportManager
logger *slog.Logger
}
func CreateReportAnalyzer(parentContext context.Context, db driver.Conn, metaDb *pgxpool.Pool, config DatabaseConfiguration, logger *slog.Logger) (*ReportAnalyzer, error) {
insertReportManager, err := NewInsertReportManager(parentContext, db, metaDb, config, "report", env.GetInt("INSERT_WORKER_COUNT", -1))
if err != nil {
return nil, err
}
analyzeContext, cancel := context.WithCancel(parentContext)
analyzer := &ReportAnalyzer{
config: config,
insertQueue: make(chan *ReportInfo, 1024),
analyzeContext: analyzeContext,
cancel: cancel,
InsertReportManager: insertReportManager,
logger: logger,
}
go func() {
for {
report, ok := <-analyzer.insertQueue
if !ok {
logger.Debug("analyze stopped; insert queue is closed")
return
}
analyzer.invokeInsert(report, cancel)
}
}()
return analyzer, nil
}
func OpenDb(clickHouseUrl string, config DatabaseConfiguration) (driver.Conn, *pgxpool.Pool, error) {
// well, go-faster/ch is not so easy to use for such a generic case as our code (each column should be created in advance, no API to simply pass slice of any values)
db, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{clickHouseUrl},
Auth: clickhouse.Auth{
Database: config.DbName,
},
DialTimeout: 10 * time.Second,
ConnMaxLifetime: time.Hour,
Settings: map[string]any{
// https://github.com/ClickHouse/ClickHouse/issues/2833
// ZSTD 19+ is used, read/write timeout should be quite large (10 minutes)
"send_timeout": 30_000,
"receive_timeout": 3000,
},
})
if err != nil {
return nil, nil, fmt.Errorf("cannot connect to clickhouse: %w", err)
}
metaDb, err := pgxpool.New(context.Background(), os.Getenv("DATABASE_URL"))
metaDb.Config().MaxConns = 10
if err != nil {
return nil, nil, fmt.Errorf("cannot create pool: %w", err)
}
return db, metaDb, err
}
func (t *ReportAnalyzer) Analyze(data []byte, extraData model.ExtraData) error {
if t.analyzeContext.Err() != nil {
return nil
}
var err error
runResult := &RunResult{
RawReport: data,
TcBuildId: getNullIfEmpty(extraData.TcBuildId),
TcBuildType: extraData.TcBuildType,
TcInstallerBuildId: getNullIfEmpty(extraData.TcInstallerBuildId),
ReportFileName: extraData.ReportFile,
TriggeredBy: extraData.TriggeredBy,
}
if isBisectRun(extraData, t.logger) {
return nil
}
switch t.config.DbName {
case "jbr":
ignore := analyzePerfJbrReport(runResult, extraData)
if ignore {
// ignore empty report
return nil
}
case "qodana":
reportURL := runResult.ReportFileName
fileName := reportURL[strings.LastIndex(reportURL, "/")+1:]
if fileName == "open-telemetry.json" {
ignore := analyzeQodanaReport(runResult, extraData)
if ignore {
return nil
}
}
if fileName == "metrics.json" {
err = ReadReport(runResult, t.config)
}
default:
err = ReadReport(runResult, t.config)
}
projectId := t.config.DbName
if err != nil {
return err
}
if runResult.Report == nil {
// ignore report
return nil
}
if extraData.ProductCode == "" {
extraData.ProductCode = runResult.Report.ProductCode
}
if extraData.BuildNumber == "" {
extraData.BuildNumber = runResult.Report.Build
}
if extraData.Machine == "" {
return errors.New("machine is not specified")
}
runResult.Product = extraData.ProductCode
runResult.Machine = extraData.Machine
if runResult.GeneratedTime.IsZero() {
runResult.GeneratedTime, err = computeGeneratedTime(runResult.Report, extraData)
if err != nil {
if extraData.CurrentBuildTime.IsZero() {
return err
}
runResult.GeneratedTime = extraData.CurrentBuildTime
}
}
if t.config.HasInstallerField {
runResult.BuildTime = extraData.BuildTime
if extraData.BuildNumber == "" {
t.logger.Error("buildNumber is missed")
return nil
}
buildComponents := strings.Split(extraData.BuildNumber, ".")
if len(buildComponents) == 2 {
buildComponents = append(buildComponents, "0")
}
runResult.BuildC1, runResult.BuildC2, runResult.BuildC3, err = splitBuildNumber(buildComponents)
if err != nil {
// we might get 231.snapshot build numbers, that is more or less fine and we need such build anyway
t.logger.Error(err.Error())
}
}
if t.config.HasBuildNumber {
runResult.BuildNumber = extraData.TcBuildNumber
}
if t.analyzeContext.Err() != nil {
return nil
}
if len(extraData.TcBuildProperties) == 0 {
runResult.branch = "master"
} else {
runResult.branch, err = getBranch(runResult, extraData, projectId, t.logger)
if err != nil {
return err
}
}
t.waitGroup.Add(1)
t.insertQueue <- &ReportInfo{
extraData: extraData,
runResult: runResult,
}
return nil
}
func isBisectRun(extraData model.ExtraData, logger *slog.Logger) bool {
parser := parserPool.Get()
defer parserPool.Put(parser)
props, err := parser.ParseBytes(extraData.TcBuildProperties)
if err != nil {
logger.Warn("failed to parse build properties", "error", err)
return false
}
return props.GetBool("env.IS_BISECT_RUN")
}
func getBranch(runResult *RunResult, extraData model.ExtraData, projectId string, logger *slog.Logger) (string, error) {
parser := parserPool.Get()
defer parserPool.Put(parser)
props, err := parser.ParseBytes(extraData.TcBuildProperties)
if err != nil {
return "", fmt.Errorf("failed to parse build properties: %w", err)
}
if projectId == "mlEvaluation" {
branch := props.GetStringBytes("teamcity.build.branch")
if branch != nil {
return string(branch), nil
}
return "master", nil
}
if projectId == "jbr" {
splitId := strings.SplitN(extraData.TcBuildType, "_", 4)
if len(splitId) == 4 {
jbrBranch := strings.ToLower(splitId[1]) + "_" + strings.ToLower(splitId[2])
return jbrBranch, nil
}
logger.Error("format of JBR project is unexpected", "teamcity.project.id", extraData.TcBuildType)
return "", errors.New("cannot infer branch from JBR project id")
}
if projectId == "qodana" {
qodanaImage := string(props.GetStringBytes("image"))
lastSlash := strings.LastIndex(qodanaImage, "/")
if lastSlash >= 0 {
return qodanaImage[lastSlash+1:], nil
}
logger.Warn("No slash found in string")
}
//goland:noinspection SpellCheckingInspection
branch := string(props.GetStringBytes("teamcity.build.branch"))
if branch != "" && branch != "<default>" {
return branch, nil
}
branchInt := props.GetInt("teamcity.build.branch")
if branchInt != 0 {
return strconv.Itoa(branchInt), nil
}
isMaster := props.GetStringBytes("vcsroot.ijplatform_master_IntelliJMonorepo.branch")
if len(isMaster) == 0 && runResult.BuildC1 != 0 {
// we check that the property doesn't exist so it is not a master
if runResult.BuildC3 == 0 {
return strconv.Itoa(runResult.BuildC1), nil
}
// we have EAP branch
return strconv.Itoa(runResult.BuildC1) + "." + strconv.Itoa(runResult.BuildC2), nil
}
return "master", nil
}
type ReportInfo struct {
extraData model.ExtraData
runResult *RunResult
}
func computeGeneratedTime(report *model.Report, extraData model.ExtraData) (time.Time, error) {
if report.Generated == "" {
if extraData.LastGeneratedTime.IsZero() {
return time.Time{}, errors.New("generated time not in report and not provided explicitly")
}
return extraData.LastGeneratedTime, nil
}
parsedTime, err := ParseTime(report.Generated)
if err != nil {
return time.Time{}, err
}
return parsedTime, nil
}
func (t *ReportAnalyzer) WaitAnalyzeAndInsert() error {
t.logger.Debug("wait for analyze")
t.waitGroup.Wait()
t.cancel()
if t.err != nil {
return t.err
}
close(t.insertQueue)
t.logger.Debug("wait for insert")
err := t.InsertReportManager.InsertManager.Close()
if err != nil {
return err
}
return nil
}
func (t *ReportAnalyzer) invokeInsert(report *ReportInfo, cancel context.CancelFunc) {
defer t.waitGroup.Done()
err := t.insert(report)
if err != nil {
t.errOnce.Do(func() {
t.err = err
cancel()
})
}
}
func (t *ReportAnalyzer) insert(report *ReportInfo) error {
runResult := report.runResult
if report.extraData.TcInstallerBuildId > 0 {
err := t.InsertReportManager.insertInstallerManager.Insert(report.extraData.TcInstallerBuildId, report.extraData.Changes)
if err != nil {
return err
}
} else if report.extraData.Changes != nil {
err := t.InsertReportManager.insertInstallerManager.Insert(report.extraData.TcBuildId, report.extraData.Changes)
if err != nil {
return err
}
}
if t.InsertReportManager.insertMetaManager != nil {
r := report.runResult.Report
err := t.InsertReportManager.insertMetaManager.InsertProjectDescription(r.Project, runResult.branch, r.ProjectURL, r.MethodName, r.ProjectDescription)
if err != nil {
t.logger.Warn("cannot insert project description", "error", err)
}
}
err := t.InsertReportManager.Insert(runResult)
if err != nil {
if errors.Is(err, context.Canceled) {
return err
}
return fmt.Errorf("cannot insert report (teamcityBuildId=%d, reportPath=%s): %w", report.extraData.TcBuildId, report.extraData.ReportFile, err)
}
return nil
}
func getNullIfEmpty(v int) int {
if v <= 0 {
return 0
}
return v
}
func splitBuildNumber(buildComponents []string) (int, int, int, error) {
buildC1, err := strconv.Atoi(buildComponents[0])
if err != nil {
return 0, 0, 0, fmt.Errorf("cannot parse build number: %w", err)
}
buildC2, err := strconv.Atoi(buildComponents[1])
if err != nil {
return buildC1, 0, 0, nil
}
buildC3, err := strconv.Atoi(buildComponents[2])
if err != nil {
return buildC1, buildC2, 0, nil
}
return buildC1, buildC2, buildC3, nil
}