pkg/analyzer/InsertReportManager.go (213 lines of code) (raw):
package analyzer
import (
"context"
"errors"
"fmt"
"log/slog"
"strconv"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/model"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/sql-util"
"github.com/jackc/pgx/v5/pgxpool"
"go.deanishe.net/env"
)
// use `select distinct cast(machine, 'Uint16') as id, machine as name FROM report order by id` to get current enum values
// for now machine enum should be updated manually if a new machine will be added
var ErrMetricsCannotBeComputed = errors.New("metrics cannot be computed")
type RunResult struct {
Product string
Machine string
BuildTime time.Time
GeneratedTime time.Time
TcBuildId int
TcBuildType string
TcInstallerBuildId int
RawReport []byte
// maybe null
Report *model.Report
ReportFileName string
BuildC1 int
BuildC2 int
BuildC3 int
TriggeredBy string
BuildNumber string
ExtraFieldData []any
branch string
}
type InsertReportManager struct {
sql_util.InsertDataManager
context context.Context
MaxGeneratedTime time.Time
IsCheckThatNotAlreadyAddedNeeded bool
config DatabaseConfiguration
nonMetricFieldCount int
insertInstallerManager *InsertInstallerManager
insertMetaManager *InsertMetaManager
TableName string
}
func NewInsertReportManager(ctx context.Context, db driver.Conn, metaDb *pgxpool.Pool, config DatabaseConfiguration, tableName string, insertWorkerCount int) (*InsertReportManager, error) {
var err error
if config.TableName != "" {
tableName = config.TableName
}
metaFields := make([]string, 0, 16)
metaFields = append(metaFields, "machine", "generated_time", "project", "tc_build_id", "branch", "tc_build_type")
if config.HasProductField {
metaFields = append(metaFields, "product")
}
if config.HasInstallerField {
metaFields = append(metaFields, "build_time", "tc_installer_build_id", "build_c1", "build_c2", "build_c3")
}
if config.HasBuildNumber {
metaFields = append(metaFields, "build_number")
}
metaFields = append(metaFields, "triggeredBy")
var sb strings.Builder
sb.WriteString("insert into ")
sb.WriteString(tableName)
sb.WriteString(" (")
for i, field := range metaFields {
if i != 0 {
sb.WriteRune(',')
}
sb.WriteString(field)
}
config.insertStatementWriter(&sb)
sb.WriteString(") values (")
for i, n := 0, len(metaFields)+config.extraFieldCount; i < n; i++ {
if i != 0 {
sb.WriteRune(',')
}
sb.WriteRune('?')
}
sb.WriteRune(')')
effectiveSql := sb.String()
insertManager := sql_util.NewBatchInsertManager(ctx, db, effectiveSql, insertWorkerCount, slog.With("type", "report"))
// large inserts leads to large memory usage, so, allow to override INSERT_BATCH_SIZE via env
insertManager.BatchSize = env.GetInt("INSERT_BATCH_SIZE", 20_000)
var installerManager *InsertInstallerManager
if config.HasInstallerField || config.HasNoInstallerButHasChanges {
installerManager, err = NewInstallerInsertManager(ctx, db)
if err != nil {
return nil, err
}
}
var metaManager *InsertMetaManager
if config.HasMetaDB && metaDb != nil {
metaManager, err = NewInsertMetaManager(ctx, metaDb)
if err != nil {
return nil, err
}
}
manager := &InsertReportManager{
nonMetricFieldCount: len(metaFields),
config: config,
TableName: tableName,
InsertDataManager: sql_util.InsertDataManager{
InsertManager: insertManager,
},
context: ctx,
insertInstallerManager: installerManager,
insertMetaManager: metaManager,
}
if installerManager != nil {
insertManager.AddDependency(installerManager.InsertManager)
}
return manager, nil
}
// Insert checks that entries are not duplicated and warn if metrics cannot be computed
func (t *InsertReportManager) Insert(runResult *RunResult) error {
logger := slog.Default()
if t.config.HasProductField {
logger = logger.With("product", runResult.Product)
}
logger = logger.With(
"db", t.config.DbName,
"table", t.config.TableName,
"file", runResult.ReportFileName,
)
// tc collector uses tc build id to avoid duplicates, so, IsCheckThatNotAlreadyAddedNeeded is set to false by default
if t.IsCheckThatNotAlreadyAddedNeeded && !runResult.GeneratedTime.After(t.MaxGeneratedTime) {
selectStatement := "select 1 from " + t.config.TableName + " where "
if t.config.HasProductField {
selectStatement = "product = '" + sql_util.StringEscaper.Replace(runResult.Product) + "' and "
}
selectStatement += "machine = '" + sql_util.StringEscaper.Replace(runResult.Machine) +
"' and project = '" + sql_util.StringEscaper.Replace(runResult.Report.Project) +
"' and generated_time = " + strconv.FormatInt(runResult.GeneratedTime.Unix(), 10)
exists, err := t.CheckExists(t.InsertManager.Db.QueryRow(t.context, selectStatement))
if err != nil {
return err
}
if exists {
logger.Debug("report already processed")
return nil
}
}
err := t.WriteMetrics(runResult.Product, runResult, runResult.branch, runResult.Report.Project, logger)
if err != nil {
if errors.Is(err, ErrMetricsCannotBeComputed) {
logger.Warn(err.Error())
return nil
}
return err
}
logger.Debug("new report added")
return nil
}
//goland:noinspection SpellCheckingInspection
var projectIdToName = map[string]string{
"Fleet": "fleet",
// IJ simple project - project v3
"Mc92Qmj3NY0xxdIiX9ayVbbEZ7s": "simple for IJ",
// IJ simple project - project v2
"73YWaW9bytiPDGuKvwNIYMK5CKI": "simple for IJ",
// idea project (v2)
"26hfTKDRtXpJ6U7ivgfKthtyU0A": "idea",
// idea project (v3)
"nC4MRRFMVYUSQLNIvPgDt+B3JqA": "idea",
// idea project (v4)
"Xplo4RZSHXIFu5elYBOiDDkwu20": "idea",
// light edit
"6hglkyp/cmAi7ntjrg7dHwd5NG4": "light edit (IJ)",
"1PbxeQ044EEghMOG9hNEFee05kM": "light edit (IJ)",
// restoring editors
"/q9N7EHxr8F1NHjbNQnpqb0Q0fs": "restoring editors",
}
func (t *InsertReportManager) WriteMetrics(product string, row *RunResult, branch string, providedProject string, logger *slog.Logger) error {
batch, err := t.InsertManager.PrepareForAppend()
if err != nil {
return err
}
project := row.Report.Project
if project == "" {
project = providedProject
} else if t.config.HasInstallerField {
customName := projectIdToName[row.Report.Project]
if customName != "" {
project = customName
}
}
args := make([]any, 0, t.nonMetricFieldCount+t.config.extraFieldCount)
args = append(args, row.Machine, row.GeneratedTime, project, uint32(row.TcBuildId), branch, row.TcBuildType)
if t.config.HasProductField {
args = append(args, product)
}
if t.config.HasInstallerField {
buildTimeUnix, err := getBuildTimeFromReport(row.Report)
if err != nil {
return err
}
if buildTimeUnix.IsZero() {
buildTimeUnix = row.BuildTime
}
if strings.HasPrefix(row.Machine, "intellij-linux-hw-compile-hp-blade-") {
return nil
}
args = append(args, buildTimeUnix, uint32(row.TcInstallerBuildId), uint8(row.BuildC1), uint16(row.BuildC2), uint16(row.BuildC3))
}
if t.config.HasBuildNumber {
args = append(args, row.BuildNumber)
}
args = append(args, row.TriggeredBy)
if t.config.DbName == "ij" || t.config.DbName == "ijDev" {
err = ComputeIjMetrics(t.nonMetricFieldCount, row.Report, &args, logger)
if err != nil {
return err
}
}
args = append(args, row.ExtraFieldData...)
err = batch.Append(args...)
if err != nil {
return fmt.Errorf("cannot append: %w", err)
}
return nil
}