cmd/tc-collector/tcCollector.go (351 lines of code) (raw):
package main
import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"runtime"
"sort"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/analyzer"
sqlutil "github.com/JetBrains/ij-perf-report-aggregator/pkg/sql-util"
"github.com/JetBrains/ij-perf-report-aggregator/pkg/util"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/nats-io/nats.go"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
)
const tcTimeFormat = "20060102T150405-0700"
type Collector struct {
serverUrl string
httpClient *http.Client
taskContext context.Context
config analyzer.DatabaseConfiguration
logger *slog.Logger
tcSessionId atomic.String
installerBuildIdToInfo map[int]*InstallerInfo
buildIdToInfo map[int]*BuildInfo
}
func doNotifyServer(natsUrl string) error {
slog.Info("ask report aggregator server to clear cache")
nc, err := nats.Connect("nats://" + natsUrl)
if err != nil {
return fmt.Errorf("cannot connect to nats: %w", err)
}
err = nc.Publish("server.clearCache", []byte("tcCollector"))
if err != nil {
return fmt.Errorf("cannot publish to server.clearCache: %w", err)
}
slog.Info("ask to backup db")
err = nc.Publish("db.backup", []byte("tcCollector"))
if err != nil {
return fmt.Errorf("cannot publish to db.backup: %w", err)
}
// ensure that message is delivered, because app will be exited very soon
err = nc.Flush()
if err != nil {
return fmt.Errorf("cannot flush: %w", err)
}
return nil
}
func collectFromTeamCity(taskContext context.Context, clickHouseUrl string, tcUrl string, projectId string, buildConfigurationIds []string, userSpecifiedSince time.Time, httpClient *http.Client) error {
serverUrl := tcUrl + "/app/rest"
config := analyzer.GetAnalyzer(projectId)
db, metaDb, err := analyzer.OpenDb(clickHouseUrl, config)
if err != nil {
return fmt.Errorf("cannot open db: %w", err)
}
defer util.Close(db)
defer metaDb.Close()
errGroup, loadContext := errgroup.WithContext(taskContext)
errGroup.SetLimit(runtime.GOMAXPROCS(-1))
for _, buildTypeId := range buildConfigurationIds {
if taskContext.Err() != nil {
return fmt.Errorf("error in context: %w", taskContext.Err())
}
errGroup.Go(func() error {
return collectBuildConfiguration(
loadContext,
httpClient,
db,
metaDb,
config,
buildTypeId,
serverUrl,
tcUrl,
userSpecifiedSince,
slog.With("buildTypeId", buildTypeId),
)
})
}
err = errGroup.Wait()
return err
}
func collectBuildConfiguration(taskContext context.Context, httpClient *http.Client, db driver.Conn, metaDb *pgxpool.Pool, config analyzer.DatabaseConfiguration, buildTypeId string, serverUrl string, serverHost string, userSpecifiedSince time.Time, logger *slog.Logger) error {
serverBuildUrl, err := url.Parse(serverUrl + "/builds/")
if err != nil {
return err
}
q := serverBuildUrl.Query()
locator := "buildType:(id:" + buildTypeId + "),defaultFilter:false,failedToStart:false,state:finished,canceled:false,count:50"
since := userSpecifiedSince
if since.IsZero() {
//goland:noinspection SqlResolve
query := "select last_time from collector_state where build_type_id = '" + sqlutil.StringEscaper.Replace(buildTypeId) + "' order by last_time desc limit 1"
err := db.QueryRow(taskContext, query).Scan(&since)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("cannot query last collect time: %w", err)
}
}
if !since.IsZero() {
locator += ",finishDate:(date:" + since.Format(tcTimeFormat) + ",condition:after)"
}
q.Set("locator", locator)
q.Set("fields", buildTeamCityQuery())
serverBuildUrl.RawQuery = q.Encode()
logger.Info("collect", "since", since)
reportExistenceChecker := &ReportExistenceChecker{}
err = reportExistenceChecker.reset(taskContext, config.DbName, config.TableName, db, since, buildTypeId)
if err != nil {
return err
}
// TC returns from newest to oldest, but we need
// 1) to insert in opposite order (less merge work for ClickHouse)
// 2) set last collect state once the oldest chunk is committed, but it is possible only if the oldest will be inserted before newest (as we ask TC to returns since some date)
var buildsToLoad [][]*Build
collector := &Collector{
serverUrl: serverUrl,
httpClient: httpClient,
taskContext: taskContext,
config: config,
installerBuildIdToInfo: make(map[int]*InstallerInfo),
buildIdToInfo: make(map[int]*BuildInfo),
logger: logger,
}
buildList, err := collector.loadBuilds(serverBuildUrl.String())
if err != nil {
logger.Warn(err.Error())
return nil
}
buildsToLoad = append(buildsToLoad, buildList.Builds)
totalCount := len(buildList.Builds)
nextHref := buildList.NextHref
for buildList.NextHref != "" {
if taskContext.Err() != nil {
return fmt.Errorf("error in context: %w", taskContext.Err())
}
buildList, err = collector.loadBuilds(serverHost + nextHref)
if err != nil {
return err
}
nextHref = buildList.NextHref
buildsToLoad = append(buildsToLoad, buildList.Builds)
totalCount += len(buildList.Builds)
}
logger.Info("load reports", "buildCount", totalCount, "buildTypeId", buildTypeId, "since", since)
for i := len(buildsToLoad) - 1; i >= 0; i-- {
builds := buildsToLoad[i]
if len(builds) == 0 {
continue
}
sort.Slice(builds, func(i, j int) bool {
return builds[i].Id < builds[j].Id
})
logger.Debug("load reports", "chunk", i)
lastBuildFinishDate, err := time.Parse(tcTimeFormat, builds[len(builds)-1].FinishDate)
if err != nil {
return fmt.Errorf("cannot parse last build start date: %w", err)
}
reportAnalyzer, err := analyzer.CreateReportAnalyzer(taskContext, db, metaDb, config, logger)
if err != nil {
return err
}
err = collector.loadReports(builds, reportExistenceChecker, reportAnalyzer)
if err != nil {
return err
}
logger.Debug("wait for analyze and insert", "chunk", i)
err = reportAnalyzer.WaitAnalyzeAndInsert()
if err != nil {
return err
}
// engine ReplacingMergeTree(last_time) is used, no need to delete old entry
// set last collect time to 1 second after last build in chunk
err = updateLastCollectTime(taskContext, buildTypeId, lastBuildFinishDate.Add(1*time.Second), db)
if err != nil {
return err
}
}
return nil
}
func buildTeamCityQuery() string {
return "count,href,nextHref,build(id,buildTypeId,number,finishDate,status,agent(name),artifacts($locator(recursive:true,directory:false),file(href)),artifact-dependencies(build(id,buildTypeId,finishDate)),personal,triggered(user(email)))"
}
func updateLastCollectTime(ctx context.Context, buildTypeId string, lastCollectTimeToSet time.Time, db driver.Conn) error {
//goland:noinspection SqlResolve
batch, err := db.PrepareBatch(ctx, "insert into collector_state (build_type_id, last_time)", driver.WithReleaseConnection())
if err != nil {
return fmt.Errorf("cannot prepare batch: %w", err)
}
defer batch.Close()
err = batch.Append(buildTypeId, lastCollectTimeToSet)
if err != nil {
return fmt.Errorf("cannot append to batch: %w", err)
}
err = batch.Send()
if err != nil {
return fmt.Errorf("cannot send batch: %w", err)
}
return nil
}
func getTcSessionIdCookie(cookies []*http.Cookie) string {
for _, cookie := range cookies {
if cookie.Name == "TCSESSIONID" {
return cookie.Value
}
}
return ""
}
func (t *Collector) storeSessionIdCookie(response *http.Response) {
cookie := getTcSessionIdCookie(response.Cookies())
// TC doesn't set cookie if it was already set for request
if cookie != "" {
t.tcSessionId.Store(cookie)
}
}
func (t *Collector) get(ctx context.Context, targetUrl string) (*http.Response, error) {
request, err := t.createRequest(ctx, targetUrl)
if err != nil {
return nil, err
}
response, err := t.httpClient.Do(request)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil, err
}
return nil, fmt.Errorf("cannot get %s: %w", targetUrl, err)
}
return response, nil
}
func (t *Collector) getBuildTypesFromComposite(ctx context.Context, configuration string) ([]string, error) {
isComposite, err := t.isComposite(ctx, configuration)
if err != nil {
return nil, err
}
if !isComposite {
return []string{configuration}, nil
}
configurations := make([]string, 0)
err = t.getSnapshotsRecursive(ctx, configuration, &configurations)
return configurations, err
}
func (t *Collector) getSnapshots(ctx context.Context, configuration string) ([]string, error) {
configurations, err := t.getBuildTypesFromProject(ctx, configuration)
if err != nil {
return nil, err
}
// not a project
if len(configurations) == 0 {
configurations, err = t.getBuildTypesFromComposite(ctx, configuration)
if err != nil {
return nil, err
}
// not composite
if len(configurations) == 0 {
configurations = []string{configuration}
}
}
return configurations, err
}
func (t *Collector) getBuildTypesFromProject(ctx context.Context, configuration string) ([]string, error) {
response, err := t.get(ctx, t.serverUrl+"/buildTypes?locator=project:"+configuration)
configurations := make([]string, 0, 10)
if err != nil {
return configurations, err
}
defer response.Body.Close()
responseBody, _ := io.ReadAll(response.Body)
if response.StatusCode > 300 {
return configurations, fmt.Errorf("invalid response (%s): %s", response.Status, responseBody)
}
type BuildType struct {
Id string
}
type Project struct {
BuildType []BuildType
}
var project Project
err = json.Unmarshal(responseBody, &project)
for _, buildType := range project.BuildType {
configurations = append(configurations, buildType.Id)
}
return configurations, err
}
func (t *Collector) getSnapshotsRecursive(ctx context.Context, configuration string, configurations *[]string) error {
isComposite, err := t.isComposite(ctx, configuration)
if err != nil {
return nil
}
if strings.Contains(configuration, "Installers") {
return nil
}
if !isComposite {
*configurations = append(*configurations, configuration)
return nil
}
response, err := t.get(ctx, t.serverUrl+"/buildTypes/"+configuration+"/snapshot-dependencies")
if err != nil {
return err
}
defer response.Body.Close()
responseBody, _ := io.ReadAll(response.Body)
if response.StatusCode > 300 {
return fmt.Errorf("invalid response (%s): %s", response.Status, responseBody)
}
type Dependency struct {
Id string
}
type AllDependencies struct {
Dependencies []Dependency `json:"snapshot-dependency"`
}
dependency := &AllDependencies{}
err = json.Unmarshal(responseBody, dependency)
if err != nil {
return err
}
for _, dependency := range dependency.Dependencies {
err = t.getSnapshotsRecursive(ctx, dependency.Id, configurations)
if err != nil {
t.logger.Warn(err.Error())
}
}
return nil
}
func (t *Collector) isComposite(ctx context.Context, configuration string) (bool, error) {
response, err := t.get(ctx, t.serverUrl+"/buildTypes/"+configuration+"/settings/buildConfigurationType")
if err != nil {
return false, err
}
defer response.Body.Close()
responseBody, _ := io.ReadAll(response.Body)
if response.StatusCode > 300 {
return false, fmt.Errorf("invalid response (%s): %s", response.Status, responseBody)
}
type BuildType struct {
Name string
Value string
}
var buildType BuildType
err = json.Unmarshal(responseBody, &buildType)
return buildType.Value == "COMPOSITE", err
}
func (t *Collector) createRequest(ctx context.Context, requestURL string) (*http.Request, error) {
request, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, http.NoBody)
if err != nil {
return nil, fmt.Errorf("cannot create request: %w", err)
}
sessionId := t.tcSessionId.Load()
if sessionId != "" {
request.AddCookie(&http.Cookie{Name: "TCSESSIONID", Value: sessionId})
}
request.Header.Add("Authorization", "Bearer "+os.Getenv("TC_TOKEN"))
request.Header.Add("Accept", "application/json")
return request, nil
}