commands/builds_helper.go (460 lines of code) (raw):
package commands
import (
"fmt"
"math"
"net/http"
"strings"
"sync"
"time"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers"
"gitlab.com/gitlab-org/gitlab-runner/helpers/featureflags"
"gitlab.com/gitlab-org/gitlab-runner/session"
"github.com/prometheus/client_golang/prometheus"
)
const (
concurrencyIncreaseFactor = 1.1 // +10%
concurrencyDecreaseFactor = 0.95 // -5%
)
var numBuildsDesc = prometheus.NewDesc(
"gitlab_runner_jobs",
"The current number of running builds.",
[]string{"runner", "system_id", "state", "stage", "executor_stage"},
nil,
)
var requestConcurrencyDesc = prometheus.NewDesc(
"gitlab_runner_request_concurrency",
"The current number of concurrent requests for a new job",
[]string{"runner", "system_id"},
nil,
)
var requestConcurrencyExceededDesc = prometheus.NewDesc(
"gitlab_runner_request_concurrency_exceeded_total",
"Count of excess requests above the configured request_concurrency limit",
[]string{"runner", "system_id"},
nil,
)
var requestConcurrencyHardLimitDesc = prometheus.NewDesc(
"gitlab_runner_request_concurrency_hard_limit",
"Configured request_concurrency limit",
[]string{"runner", "system_id"},
nil,
)
var requestConcurrencyAdaptiveLimitDesc = prometheus.NewDesc(
"gitlab_runner_request_concurrency_adaptive_limit",
"Computed adaptive request concurrency limit",
[]string{"runner", "system_id"},
nil,
)
var requestConcurrencyUsedLimitDesc = prometheus.NewDesc(
"gitlab_runner_request_concurrency_used_limit",
"Used request concurrency limit",
[]string{"runner", "system_id"},
nil,
)
type statePermutation struct {
runner string
systemID string
buildState common.BuildRuntimeState
buildStage common.BuildStage
executorStage common.ExecutorStage
}
func newStatePermutationFromBuild(build *common.Build) statePermutation {
return statePermutation{
runner: build.Runner.ShortDescription(),
systemID: build.Runner.SystemIDState.GetSystemID(),
buildState: build.CurrentState(),
buildStage: build.CurrentStage(),
executorStage: build.CurrentExecutorStage(),
}
}
type runnerCounter struct {
systemID string
builds int
requests int
hardConcurrencyLimit int
adaptiveConcurrencyLimit float64
usedConcurrencyLimit int
requestConcurrencyExceeded int
}
type buildsHelper struct {
counters map[string]*runnerCounter
buildStagesStartTimes map[*common.Build]map[common.BuildStage]time.Time
builds []*common.Build
lock sync.Mutex
jobsTotal *prometheus.CounterVec
jobDurationHistogram *prometheus.HistogramVec
jobQueueDurationHistogram *prometheus.HistogramVec
jobStagesDurationHistogram *prometheus.HistogramVec
acceptableJobQueuingDurationExceeded *prometheus.CounterVec
}
func (b *buildsHelper) getRunnerCounter(runner *common.RunnerConfig) *runnerCounter {
if b.counters == nil {
b.counters = make(map[string]*runnerCounter)
}
counter := b.counters[runner.Token]
if counter == nil {
counter = &runnerCounter{systemID: runner.SystemIDState.GetSystemID()}
b.counters[runner.Token] = counter
}
return counter
}
func (b *buildsHelper) findSessionByURL(url string) (*session.Session, error) {
if url == "" {
return nil, fmt.Errorf("empty URL provided")
}
b.lock.Lock()
defer b.lock.Unlock()
if len(b.builds) == 0 {
return nil, fmt.Errorf("no active builds found")
}
for _, build := range b.builds {
if build.Session == nil {
continue
}
if build.Session.Endpoint == "" {
continue
}
if strings.HasPrefix(url, build.Session.Endpoint+"/") {
return build.Session, nil
}
}
return nil, fmt.Errorf("no session found matching URL: %s", url)
}
func (b *buildsHelper) acquireBuild(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
counter := b.getRunnerCounter(runner)
if runner.Limit > 0 && counter.builds >= runner.Limit {
// Too many builds
return false
}
counter.builds++
return true
}
func (b *buildsHelper) releaseBuild(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
counter := b.getRunnerCounter(runner)
if counter.builds > 0 {
counter.builds--
return true
}
return false
}
func (b *buildsHelper) acquireRequest(runner *common.RunnerConfig) bool {
b.lock.Lock()
defer b.lock.Unlock()
counter := b.getRunnerCounter(runner)
concurrency := runner.GetRequestConcurrency()
counter.hardConcurrencyLimit = concurrency
if runner.IsFeatureFlagOn(featureflags.UseAdaptiveRequestConcurrency) {
// concurrency is the adaptive concurrency value rounded up, between 1 and the max request concurrency
concurrency = min(max(1, int(math.Ceil(counter.adaptiveConcurrencyLimit))), runner.GetRequestConcurrency())
}
counter.usedConcurrencyLimit = concurrency
if counter.requests >= concurrency {
counter.requestConcurrencyExceeded++
return false
}
counter.requests++
return true
}
func (b *buildsHelper) releaseRequest(runner *common.RunnerConfig, hasJob bool) bool {
b.lock.Lock()
defer b.lock.Unlock()
counter := b.getRunnerCounter(runner)
if runner.IsFeatureFlagOn(featureflags.UseAdaptiveRequestConcurrency) {
// if the request returned a job, increase the concurrency by 10%, if not, decrease by 5%
if hasJob {
counter.adaptiveConcurrencyLimit *= concurrencyIncreaseFactor
} else {
counter.adaptiveConcurrencyLimit *= concurrencyDecreaseFactor
}
// adjust adaptive concurrency between 1 and max request concurrency
counter.adaptiveConcurrencyLimit = min(max(1, counter.adaptiveConcurrencyLimit), float64(runner.GetRequestConcurrency()))
}
if counter.requests > 0 {
counter.requests--
return true
}
return false
}
func (b *buildsHelper) addBuild(build *common.Build) {
if build == nil {
return
}
b.lock.Lock()
defer b.lock.Unlock()
runners := make(map[int]bool)
projectRunners := make(map[int]bool)
for _, otherBuild := range b.builds {
if otherBuild.Runner.Token != build.Runner.Token {
continue
}
runners[otherBuild.RunnerID] = true
if otherBuild.JobInfo.ProjectID != build.JobInfo.ProjectID {
continue
}
projectRunners[otherBuild.ProjectRunnerID] = true
}
for {
if !runners[build.RunnerID] {
break
}
build.RunnerID++
}
for {
if !projectRunners[build.ProjectRunnerID] {
break
}
build.ProjectRunnerID++
}
b.builds = append(b.builds, build)
b.jobsTotal.WithLabelValues(build.Runner.ShortDescription(), build.Runner.SystemIDState.GetSystemID()).Inc()
b.jobQueueDurationHistogram.
WithLabelValues(
build.Runner.ShortDescription(),
build.Runner.SystemIDState.GetSystemID(),
build.JobInfo.ProjectJobsRunningOnInstanceRunnersCount,
).
Observe(build.JobInfo.TimeInQueueSeconds)
b.evaluateJobQueuingDuration(build.Runner, build.JobInfo)
b.initializeBuildStageMetrics(build)
}
func (b *buildsHelper) evaluateJobQueuingDuration(runner *common.RunnerConfig, jobInfo common.JobInfo) {
counterForRunner := b.acceptableJobQueuingDurationExceeded.
WithLabelValues(
runner.ShortDescription(),
runner.SystemIDState.GetSystemID(),
)
// This .Add(0) will not change the value of the metric when threshold was
// not exceeded, but will make sure that the metric for each runner is always
// available
counterForRunner.Add(0)
// If configuration is not present we don't care about the metric
if runner.Monitoring == nil || len(runner.Monitoring.JobQueuingDurations) < 1 {
return
}
jobQueueDurationCfg := runner.Monitoring.JobQueuingDurations.GetActiveConfiguration()
// If no configuration matches current time we don't care about the metric
if jobQueueDurationCfg == nil {
return
}
threshold := jobQueueDurationCfg.Threshold.Seconds()
// Threshold not configured, zeroed or invalid (negative) means we're not interested in this feature
if threshold <= 0 {
return
}
// If threshold is not exceeded, then all is good and there is no need for other checks
if jobInfo.TimeInQueueSeconds <= threshold {
return
}
// If JobProjectsRunningOnInstanceRunnersCount doesn't match the definition it means that exceeded
// threshold is acceptable in such case.
// If the definition was not configured (or the regular expression in the config.toml file was invalid
// and couldn't be compiled) we treat that as "matched" and count the case in
if !jobQueueDurationCfg.JobsRunningForProjectMatched(jobInfo.ProjectJobsRunningOnInstanceRunnersCount) {
return
}
// Timing expectation not met for this case. Let's increase the counter
counterForRunner.Inc()
}
func (b *buildsHelper) removeBuild(deleteBuild *common.Build) bool {
b.lock.Lock()
defer b.lock.Unlock()
b.jobDurationHistogram.
WithLabelValues(deleteBuild.Runner.ShortDescription(), deleteBuild.Runner.SystemIDState.GetSystemID()).
Observe(deleteBuild.FinalDuration().Seconds())
for idx, build := range b.builds {
if build == deleteBuild {
b.builds = append(b.builds[0:idx], b.builds[idx+1:]...)
delete(b.buildStagesStartTimes, deleteBuild)
return true
}
}
return false
}
func (b *buildsHelper) buildsCount() int {
b.lock.Lock()
defer b.lock.Unlock()
return len(b.builds)
}
func (b *buildsHelper) statesAndStages() map[statePermutation]int {
b.lock.Lock()
defer b.lock.Unlock()
data := make(map[statePermutation]int)
for token, counter := range b.counters {
// 'idle' state will ensure the metric is always present, even if no
// builds are being processed at the moment
idleState := statePermutation{
runner: helpers.ShortenToken(token),
systemID: counter.systemID,
buildState: "idle",
buildStage: "idle",
executorStage: "idle",
}
data[idleState] = 0
}
for _, build := range b.builds {
state := newStatePermutationFromBuild(build)
data[state]++
}
return data
}
func (b *buildsHelper) runnersCounters() map[string]*runnerCounter {
b.lock.Lock()
defer b.lock.Unlock()
data := make(map[string]*runnerCounter)
for token, counter := range b.counters {
data[helpers.ShortenToken(token)] = counter
}
return data
}
func (b *buildsHelper) initializeBuildStageMetrics(build *common.Build) {
if !build.IsFeatureFlagOn(featureflags.ExportHighCardinalityMetrics) {
return
}
// the receiver lock is held at this point
if b.buildStagesStartTimes == nil {
b.buildStagesStartTimes = make(map[*common.Build]map[common.BuildStage]time.Time)
}
if b.buildStagesStartTimes[build] == nil {
b.buildStagesStartTimes[build] = make(map[common.BuildStage]time.Time)
}
build.OnBuildStageStartFn = func(stage common.BuildStage) {
b.handleOnBuildStageStart(build, stage)
}
build.OnBuildStageEndFn = func(stage common.BuildStage) {
b.handleOnBuildStageEnd(build, stage)
}
}
func (b *buildsHelper) handleOnBuildStageStart(build *common.Build, stage common.BuildStage) {
b.lock.Lock()
b.buildStagesStartTimes[build][stage] = time.Now()
b.lock.Unlock()
}
func (b *buildsHelper) handleOnBuildStageEnd(build *common.Build, stage common.BuildStage) {
b.lock.Lock()
duration := time.Since(b.buildStagesStartTimes[build][stage])
b.lock.Unlock()
b.jobStagesDurationHistogram.
With(prometheus.Labels{
"runner": build.Runner.ShortDescription(),
"system_id": build.Runner.GetSystemID(),
"stage": string(stage),
}).
Observe(duration.Seconds())
}
// Describe implements prometheus.Collector.
func (b *buildsHelper) Describe(ch chan<- *prometheus.Desc) {
ch <- numBuildsDesc
ch <- requestConcurrencyDesc
ch <- requestConcurrencyExceededDesc
ch <- requestConcurrencyHardLimitDesc
ch <- requestConcurrencyAdaptiveLimitDesc
ch <- requestConcurrencyUsedLimitDesc
b.jobsTotal.Describe(ch)
b.jobDurationHistogram.Describe(ch)
b.jobQueueDurationHistogram.Describe(ch)
b.acceptableJobQueuingDurationExceeded.Describe(ch)
b.jobStagesDurationHistogram.Describe(ch)
}
// Collect implements prometheus.Collector.
func (b *buildsHelper) Collect(ch chan<- prometheus.Metric) {
builds := b.statesAndStages()
for state, count := range builds {
ch <- prometheus.MustNewConstMetric(
numBuildsDesc,
prometheus.GaugeValue,
float64(count),
state.runner,
state.systemID,
string(state.buildState),
string(state.buildStage),
string(state.executorStage),
)
}
counters := b.runnersCounters()
for runner, counter := range counters {
ch <- prometheus.MustNewConstMetric(
requestConcurrencyDesc,
prometheus.GaugeValue,
float64(counter.requests),
runner,
counter.systemID,
)
ch <- prometheus.MustNewConstMetric(
requestConcurrencyExceededDesc,
prometheus.CounterValue,
float64(counter.requestConcurrencyExceeded),
runner,
counter.systemID,
)
ch <- prometheus.MustNewConstMetric(
requestConcurrencyHardLimitDesc,
prometheus.GaugeValue,
float64(counter.hardConcurrencyLimit),
runner,
counter.systemID,
)
ch <- prometheus.MustNewConstMetric(
requestConcurrencyAdaptiveLimitDesc,
prometheus.GaugeValue,
counter.adaptiveConcurrencyLimit,
runner,
counter.systemID,
)
ch <- prometheus.MustNewConstMetric(
requestConcurrencyUsedLimitDesc,
prometheus.GaugeValue,
float64(counter.usedConcurrencyLimit),
runner,
counter.systemID,
)
}
b.jobsTotal.Collect(ch)
b.jobDurationHistogram.Collect(ch)
b.jobQueueDurationHistogram.Collect(ch)
b.acceptableJobQueuingDurationExceeded.Collect(ch)
b.jobStagesDurationHistogram.Collect(ch)
}
func (b *buildsHelper) ListJobsHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Add("X-List-Version", "2")
w.Header().Add(common.ContentType, "text/plain")
w.WriteHeader(http.StatusOK)
b.lock.Lock()
defer b.lock.Unlock()
for _, job := range b.builds {
_, _ = fmt.Fprintf(
w,
"url=%s state=%s stage=%s executor_stage=%s duration=%s\n",
job.JobURL(),
job.CurrentState(),
job.CurrentStage(),
job.CurrentExecutorStage(),
job.CurrentDuration(),
)
}
}
func newBuildsHelper() buildsHelper {
return buildsHelper{
jobsTotal: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_runner_jobs_total",
Help: "Total number of handled jobs",
},
[]string{"runner", "system_id"},
),
jobDurationHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gitlab_runner_job_duration_seconds",
Help: "Histogram of job durations",
Buckets: []float64{30, 60, 300, 600, 1800, 3600, 7200, 10800, 18000, 36000},
},
[]string{"runner", "system_id"},
),
jobQueueDurationHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gitlab_runner_job_queue_duration_seconds",
Help: "A histogram representing job queue duration.",
Buckets: []float64{1, 3, 10, 30, 60, 120, 300, 900, 1800, 3600},
},
[]string{"runner", "system_id", "project_jobs_running"},
),
acceptableJobQueuingDurationExceeded: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gitlab_runner_acceptable_job_queuing_duration_exceeded_total",
Help: "Counts how often jobs exceed the configured queuing time threshold",
},
[]string{"runner", "system_id"},
),
jobStagesDurationHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gitlab_runner_job_stage_duration_seconds",
Help: "Histogram of each job stage duration",
Buckets: []float64{1, 3, 10, 30, 60, 120, 300, 900, 1800, 3600},
},
[]string{"runner", "system_id", "stage"},
),
}
}