in cmd/tc-collector/tcCollector.go [110:231]
func collectBuildConfiguration(taskContext context.Context, httpClient *http.Client, db driver.Conn, metaDb *pgxpool.Pool, config analyzer.DatabaseConfiguration, buildTypeId string, serverUrl string, serverHost string, userSpecifiedSince time.Time, logger *slog.Logger) error {
serverBuildUrl, err := url.Parse(serverUrl + "/builds/")
if err != nil {
return err
}
q := serverBuildUrl.Query()
locator := "buildType:(id:" + buildTypeId + "),defaultFilter:false,failedToStart:false,state:finished,canceled:false,count:50"
since := userSpecifiedSince
if since.IsZero() {
//goland:noinspection SqlResolve
query := "select last_time from collector_state where build_type_id = '" + sqlutil.StringEscaper.Replace(buildTypeId) + "' order by last_time desc limit 1"
err := db.QueryRow(taskContext, query).Scan(&since)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("cannot query last collect time: %w", err)
}
}
if !since.IsZero() {
locator += ",finishDate:(date:" + since.Format(tcTimeFormat) + ",condition:after)"
}
q.Set("locator", locator)
q.Set("fields", buildTeamCityQuery())
serverBuildUrl.RawQuery = q.Encode()
logger.Info("collect", "since", since)
reportExistenceChecker := &ReportExistenceChecker{}
err = reportExistenceChecker.reset(taskContext, config.DbName, config.TableName, db, since, buildTypeId)
if err != nil {
return err
}
// TC returns from newest to oldest, but we need
// 1) to insert in opposite order (less merge work for ClickHouse)
// 2) set last collect state once the oldest chunk is committed, but it is possible only if the oldest will be inserted before newest (as we ask TC to returns since some date)
var buildsToLoad [][]*Build
collector := &Collector{
serverUrl: serverUrl,
httpClient: httpClient,
taskContext: taskContext,
config: config,
installerBuildIdToInfo: make(map[int]*InstallerInfo),
buildIdToInfo: make(map[int]*BuildInfo),
logger: logger,
}
buildList, err := collector.loadBuilds(serverBuildUrl.String())
if err != nil {
logger.Warn(err.Error())
return nil
}
buildsToLoad = append(buildsToLoad, buildList.Builds)
totalCount := len(buildList.Builds)
nextHref := buildList.NextHref
for buildList.NextHref != "" {
if taskContext.Err() != nil {
return fmt.Errorf("error in context: %w", taskContext.Err())
}
buildList, err = collector.loadBuilds(serverHost + nextHref)
if err != nil {
return err
}
nextHref = buildList.NextHref
buildsToLoad = append(buildsToLoad, buildList.Builds)
totalCount += len(buildList.Builds)
}
logger.Info("load reports", "buildCount", totalCount, "buildTypeId", buildTypeId, "since", since)
for i := len(buildsToLoad) - 1; i >= 0; i-- {
builds := buildsToLoad[i]
if len(builds) == 0 {
continue
}
sort.Slice(builds, func(i, j int) bool {
return builds[i].Id < builds[j].Id
})
logger.Debug("load reports", "chunk", i)
lastBuildFinishDate, err := time.Parse(tcTimeFormat, builds[len(builds)-1].FinishDate)
if err != nil {
return fmt.Errorf("cannot parse last build start date: %w", err)
}
reportAnalyzer, err := analyzer.CreateReportAnalyzer(taskContext, db, metaDb, config, logger)
if err != nil {
return err
}
err = collector.loadReports(builds, reportExistenceChecker, reportAnalyzer)
if err != nil {
return err
}
logger.Debug("wait for analyze and insert", "chunk", i)
err = reportAnalyzer.WaitAnalyzeAndInsert()
if err != nil {
return err
}
// engine ReplacingMergeTree(last_time) is used, no need to delete old entry
// set last collect time to 1 second after last build in chunk
err = updateLastCollectTime(taskContext, buildTypeId, lastBuildFinishDate.Add(1*time.Second), db)
if err != nil {
return err
}
}
return nil
}