cmd/transform/transform.go (200 lines of code) (raw):
package main
import (
"context"
"fmt"
"log/slog"
"os"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/analyzer"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/model"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"go.deanishe.net/env"
)
/*
1. run restore-backup RC
2. change `migrate/report.sql` as needed and execute.
*/
func main() {
db := env.Get("DB")
table := env.Get("TABLE")
split := strings.Split(env.Get("DB"), "_")
if len(split) > 1 {
table = split[1]
}
if db == "" || table == "" {
slog.Error("Missing db or table, don't forget to set env variables DB and/or TABLE")
os.Exit(1)
}
err := transform("localhost:9000", db, table)
if err != nil {
slog.Error("transform failed", "error", err)
}
}
type ReportRow struct {
Product string `ch:"product"`
Machine string `ch:"machine"`
Branch string `ch:"branch"`
Project string `ch:"project"`
GeneratedTime time.Time `ch:"generated_time"`
BuildTime time.Time `ch:"build_time"`
RawReport string `ch:"raw_report"`
TcBuildId uint32 `ch:"tc_build_id"`
TcInstallerBuildId uint32 `ch:"tc_installer_build_id"`
TcBuildType string `ch:"tc_build_type"`
TriggeredBy string `ch:"triggeredBy"`
BuildC1 uint8 `ch:"build_c1"`
BuildC2 uint16 `ch:"build_c2"`
BuildC3 uint16 `ch:"build_c3"`
MeasuresName []string `ch:"measures.name"`
MeasuresValue []int32 `ch:"measures.value"`
MeasuresType []string `ch:"measures.type"`
Mode string `ch:"mode"`
}
// set insertWorkerCount to 1 if not enough memory
const insertWorkerCount = 4
func transform(clickHouseUrl string, idName string, tableName string) error {
slog.Info("start transforming", "db", idName)
split := strings.Split(idName, "_")
dbName := idName
if len(split) > 1 {
dbName = split[0]
}
db, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{clickHouseUrl},
Auth: clickhouse.Auth{
Database: dbName,
},
DialTimeout: time.Second,
ConnMaxLifetime: time.Hour,
Settings: map[string]any{
// https://github.com/ClickHouse/ClickHouse/issues/2833
// ZSTD 19+ is used, read/write timeout should be quite large (10 minutes)
"send_timeout": 30_000,
"receive_timeout": 3000,
"max_memory_usage": 100000000000,
},
})
if err != nil {
return fmt.Errorf("cannot connect to clickhouse: %w", err)
}
defer util.Close(db)
taskContext, cancel := util.CreateCommandContext()
defer cancel()
config := analyzer.GetAnalyzer(idName)
config.TableName = tableName + "2"
insertReportManager, err := analyzer.NewInsertReportManager(taskContext, db, nil, config, tableName+"2", insertWorkerCount)
if err != nil {
return err
}
insertManager := insertReportManager.InsertManager
// we send batch in the end of each iteration
insertManager.BatchSize = 50_000
// the whole select result in memory - so, limit
var minTime time.Time
var maxTime time.Time
// use something like (now() - toIntervalMonth(1)) to test the transformer on a fresh data
err = db.QueryRow(taskContext, "select min(generated_time) as min, max(generated_time) as max from "+tableName).Scan(&minTime, &maxTime)
if err != nil {
return fmt.Errorf("cannot query min/max: %w", err)
}
slog.Info("time range", "start", minTime, "end", maxTime)
// round to the start of the month
minTime = time.Date(minTime.Year(), minTime.Month(), 1, 0, 0, 0, 0, minTime.Location())
// round to the end of the month
if maxTime.Month() == 12 {
maxTime = time.Date(maxTime.Year()+1, 1, 1, 0, 0, 0, 0, maxTime.Location())
} else {
maxTime = time.Date(maxTime.Year(), maxTime.Month()+1, 1, 0, 0, 0, 0, maxTime.Location())
}
for current := minTime; current.Before(maxTime); {
// 1 month
next := current.AddDate(0, 1, 0)
err = process(taskContext, db, config, current, next, insertReportManager, tableName)
if err != nil {
return err
}
current = next
if insertManager.GetQueuedItemCount() > 10_000 {
insertManager.ScheduleSendBatch()
}
}
err = insertReportManager.InsertManager.Close()
if err != nil {
return err
}
slog.Info("transforming finished")
return nil
}
func process(taskContext context.Context, db driver.Conn, config analyzer.DatabaseConfiguration, startTime time.Time, endTime time.Time, insertReportManager *analyzer.InsertReportManager, tableName string) error {
slog.Info("process", "start", startTime, "end", endTime)
// don't forget to update order clause if differs - better to insert data in an expected order
var err error
var rows driver.Rows
if config.HasProductField {
rows, err = db.Query(taskContext, `
select product, machine, branch,
generated_time, build_time, raw_report,
tc_build_id, tc_installer_build_id,
build_c1, build_c2, build_c3, project
from report
where generated_time >= $1 and generated_time < $2
order by product, machine, branch, project, build_c1, build_c2, build_c3, build_time, generated_time
`, startTime, endTime)
} else {
buildFields := ""
if config.HasInstallerField {
buildFields = "build_c1, build_c2, build_c3,"
}
installerFields := ""
if config.HasInstallerField {
installerFields = "tc_installer_build_id, " + buildFields
}
rows, err = db.Query(taskContext, `
select machine, branch,
generated_time, build_time, tc_build_id,`+installerFields+` project, measures.name, measures.value, measures.type, triggeredBy
from `+tableName+`
where generated_time >= $1 and generated_time < $2
order by machine, branch, project, `+buildFields+` build_time, generated_time
`, startTime, endTime)
}
if err != nil {
return fmt.Errorf("cannot query: %w", err)
}
defer util.Close(rows)
var row ReportRow
for rows.Next() {
err = rows.ScanStruct(&row)
if err != nil {
return fmt.Errorf("cannot scan: %w", err)
}
runResult := &analyzer.RunResult{
Machine: row.Machine,
GeneratedTime: row.GeneratedTime,
BuildTime: row.BuildTime,
TcBuildId: int(row.TcBuildId),
}
if config.HasInstallerField {
runResult.TcInstallerBuildId = int(row.TcInstallerBuildId)
runResult.BuildC1 = int(row.BuildC1)
runResult.BuildC2 = int(row.BuildC2)
runResult.BuildC3 = int(row.BuildC3)
}
if config.HasRawReport {
err = analyzer.ReadReport(runResult, config)
if err != nil {
return err
}
if runResult.Report == nil {
// ignore report
continue
}
}
if config.DbName == "perfint" || config.DbName == "perfintDev" {
runResult.Report = &model.Report{
Project: row.Project,
BuildDate: row.BuildTime.Format("20060102T150405+0000"),
Generated: row.GeneratedTime.Format("20060102T150405+0000"),
}
runResult.ExtraFieldData = []any{row.MeasuresName, row.MeasuresValue, row.MeasuresType, row.Mode}
runResult.TriggeredBy = row.TriggeredBy
runResult.TcBuildType = row.TcBuildType
}
err = insertReportManager.WriteMetrics(row.Product, runResult, row.Branch, row.Project, slog.Default())
if err != nil {
return err
}
}
err = rows.Err()
if err != nil {
return fmt.Errorf("cannot scan: %w", err)
}
return nil
}