cmd/tc-collector/reportLoader.go (259 lines of code) (raw):
package main
import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"log/slog"
"net/url"
"strconv"
"strings"
"time"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/analyzer"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/model"
"github.com/json-iterator/go"
"golang.org/x/sync/errgroup"
)
func (t *Collector) loadReports(builds []*Build, reportExistenceChecker *ReportExistenceChecker, reportAnalyzer *analyzer.ReportAnalyzer) error {
networkRequestCount := 1
t.logger.Info("Network request count", "count", networkRequestCount)
for index, build := range builds {
if reportExistenceChecker.has(build.Id) {
t.logger.Info("build already processed", "id", build.Id, "finishDate", build.FinishDate)
builds[index] = nil
}
}
if t.config.HasInstallerField {
err := t.loadInstallerInfo(builds, networkRequestCount)
if err != nil {
return err
}
}
if t.config.HasNoInstallerButHasChanges {
err := t.loadChanges(builds, networkRequestCount)
if err != nil {
return err
}
}
duration := time.Duration(len(builds)*300) * time.Second
t.logger.Debug("load", "timeout", duration.Seconds())
taskContextWithTimeout, cancel := context.WithTimeout(t.taskContext, duration)
defer cancel()
errGroup, loadContext := errgroup.WithContext(taskContextWithTimeout)
errGroup.SetLimit(networkRequestCount)
for _, build := range builds {
if build == nil || build.Agent.Name == "Dead agent" {
continue
}
errGroup.Go(func() error {
t.logger.Info("processing build", "id", build.Id)
if t.config.HasInstallerField && build.installerInfo == nil {
// or already processed or cannot compute installer info
return nil
}
artifacts, err := t.downloadReports(loadContext, *build)
if err != nil {
slog.Error("failed to download reports", "error", err)
return nil
}
if len(artifacts) == 0 {
t.logger.Error("cannot find any performance report", "id", build.Id, "status", build.Status)
return nil
}
tcBuildProperties, err := t.downloadBuildProperties(loadContext, *build)
if err != nil {
slog.Error("failed to download build properties, skipping build", "buildId", build.Id, "error", err)
return nil
}
if tcBuildProperties == nil {
return nil
}
for _, artifact := range artifacts {
if loadContext.Err() != nil {
return nil
}
data := model.ExtraData{
Machine: build.Agent.Name,
TcBuildId: build.Id,
TcBuildType: build.Type,
TcBuildProperties: tcBuildProperties,
ReportFile: artifact.path,
}
currentBuildTime, err := analyzer.ParseTime(build.FinishDate)
if err == nil {
data.CurrentBuildTime = currentBuildTime
}
if build.Private && build.TriggeredBy.User != nil {
data.TriggeredBy = build.TriggeredBy.User.Email
}
if t.config.HasInstallerField {
installerInfo := build.installerInfo
data.BuildTime = installerInfo.buildTime
data.Changes = installerInfo.changes
data.TcInstallerBuildId = installerInfo.id
}
if t.config.HasNoInstallerButHasChanges {
data.Changes = build.buildInfo.changes
}
if t.config.HasBuildNumber {
data.TcBuildNumber = build.BuildNumber
}
err = reportAnalyzer.Analyze(artifact.data, data)
if err != nil {
if build.Status == "FAILURE" {
t.logger.Warn("cannot parse performance report in the failed build", "buildId", build.Id, "error", err)
} else {
return err
}
}
}
return nil
})
}
return errGroup.Wait()
}
func (t *Collector) loadChanges(builds []*Build, networkRequestCount int) error {
var notLoadedBuildIds []*BuildInfo
for _, build := range builds {
if build == nil {
continue
}
id := build.Id
buildInfo := t.buildIdToInfo[id]
if buildInfo == nil {
buildInfo = &BuildInfo{
id: id,
}
notLoadedBuildIds = append(notLoadedBuildIds, buildInfo)
t.buildIdToInfo[id] = buildInfo
}
build.buildInfo = buildInfo
}
if len(notLoadedBuildIds) == 0 {
return nil
}
notLoadedIds := make([]int, 0, len(notLoadedBuildIds))
for _, installerInfo := range notLoadedBuildIds {
notLoadedIds = append(notLoadedIds, installerInfo.id)
}
t.logger.Debug("load build info", "count", len(notLoadedBuildIds), "ids", notLoadedIds)
errGroup, loadContext := errgroup.WithContext(t.taskContext)
errGroup.SetLimit(networkRequestCount)
for _, buildInfo := range notLoadedBuildIds {
if buildInfo.id == -1 {
continue
}
errGroup.Go(func() error {
var err error
buildInfo.changes, err = t.loadBuildChanges(loadContext, buildInfo.id)
if err != nil {
return fmt.Errorf("failed to load changes: %w", err)
}
return nil
})
}
return errGroup.Wait()
}
func (t *Collector) loadInstallerInfo(builds []*Build, networkRequestCount int) error {
var notLoadedInstallerBuildIds []*InstallerInfo
for _, build := range builds {
if build == nil {
continue
}
id, buildTime, err := computeBuildDate(build)
if err != nil {
return err
}
if id == -1 {
if t.config.HasInstallerField {
t.logger.Error("cannot find installer build", "buildId", build.Id)
}
continue
}
installerInfo := t.installerBuildIdToInfo[id]
if installerInfo == nil {
installerInfo = &InstallerInfo{
id: id,
buildTime: buildTime,
}
notLoadedInstallerBuildIds = append(notLoadedInstallerBuildIds, installerInfo)
t.installerBuildIdToInfo[id] = installerInfo
}
build.installerInfo = installerInfo
}
if len(notLoadedInstallerBuildIds) == 0 {
return nil
}
notLoadedIds := make([]int, 0, len(notLoadedInstallerBuildIds))
for _, installerInfo := range notLoadedInstallerBuildIds {
notLoadedIds = append(notLoadedIds, installerInfo.id)
}
t.logger.Debug("load installer info", "count", len(notLoadedInstallerBuildIds), "ids", notLoadedIds)
errGroup, loadContext := errgroup.WithContext(t.taskContext)
errGroup.SetLimit(networkRequestCount)
for _, installerInfo := range notLoadedInstallerBuildIds {
if installerInfo.id == -1 {
continue
}
errGroup.Go(func() error {
var err error
installerInfo.changes, err = t.loadBuildChanges(loadContext, installerInfo.id)
if err != nil {
return fmt.Errorf("failed to load changes: %w", err)
}
return nil
})
}
return errGroup.Wait()
}
func computeBuildDate(build *Build) (int, time.Time, error) {
for _, dependencyBuild := range build.ArtifactDependencies.Builds {
if strings.Contains(dependencyBuild.BuildTypeId, "Installer") || strings.Contains(dependencyBuild.BuildTypeId, "Distribution") {
result, err := time.Parse(tcTimeFormat, dependencyBuild.FinishDate)
if err != nil {
return -1, time.Time{}, err
}
return dependencyBuild.Id, result, nil
}
}
return -1, time.Time{}, nil
}
func (t *Collector) loadBuildChanges(ctx context.Context, buildId int) ([]string, error) {
artifactUrl, err := url.Parse(t.serverUrl + "/changes?locator=build:(id:" + strconv.Itoa(buildId) + ")&fields=change(version)&count=10000")
if err != nil {
return nil, err
}
response, err := t.get(ctx, artifactUrl.String())
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode > 300 {
responseBody, _ := io.ReadAll(response.Body)
return nil, fmt.Errorf("invalid response (%s): %s", response.Status, responseBody)
}
t.storeSessionIdCookie(response)
var changeList ChangeList
err = jsoniter.ConfigFastest.NewDecoder(response.Body).Decode(&changeList)
if err != nil {
return nil, fmt.Errorf("failed to parse response: %w", err)
}
encoding := base64.RawStdEncoding
var b bytes.Buffer
result := make([]string, len(changeList.List))
for index, change := range changeList.List {
if strings.Contains(change.Version, " ") {
// private build with custom change, format: 13 04 2022 12:14
continue
}
data, err := hex.DecodeString(change.Version)
if err != nil {
return nil, fmt.Errorf("failed to decode change version: %w", err)
}
b.Reset()
buf := make([]byte, encoding.EncodedLen(len(data)))
encoding.Encode(buf, data)
result[index] = string(buf)
}
return result, nil
}