network/trace.go (359 lines of code) (raw):
package network
import (
"context"
"errors"
"sync"
"time"
"gitlab.com/gitlab-org/gitlab-runner/common"
"gitlab.com/gitlab-org/gitlab-runner/helpers/featureflags"
"gitlab.com/gitlab-org/gitlab-runner/helpers/retry"
"gitlab.com/gitlab-org/gitlab-runner/helpers/trace"
)
var (
ErrInvalidPatchTraceResponse = errors.New("received invalid patch trace response")
ErrInvalidUpdateJobResponse = errors.New("received invalid job update response")
)
type clientJobTrace struct {
client common.Network
config common.RunnerConfig
jobCredentials *common.JobCredentials
id int64
cancelFunc context.CancelFunc
abortFunc context.CancelFunc
debugModeEnabled bool
buffer *trace.Buffer
lock sync.RWMutex
state common.JobState
finished chan bool
failureReason common.JobFailureReason
supportedFailureReasonMapper common.SupportedFailureReasonMapper
sentTrace int
sentTime time.Time
updateInterval time.Duration
forceSendInterval time.Duration
finalUpdateBackoffMax time.Duration
maxTracePatchSize int
failuresCollector common.FailuresCollector
exitCode int
finalUpdateRetryLimit int
}
func (c *clientJobTrace) Success() error {
return c.complete(nil, common.JobFailureData{})
}
func (c *clientJobTrace) complete(err error, failureData common.JobFailureData) error {
c.lock.Lock()
if c.state != common.Running {
c.lock.Unlock()
return nil
}
if err == nil {
c.state = common.Success
} else {
c.setFailure(failureData)
}
c.lock.Unlock()
return c.finish()
}
func (c *clientJobTrace) Fail(err error, failureData common.JobFailureData) error {
return c.complete(err, failureData)
}
func (c *clientJobTrace) Write(data []byte) (n int, err error) {
return c.buffer.Write(data)
}
func (c *clientJobTrace) checksum() string {
return c.buffer.Checksum()
}
func (c *clientJobTrace) bytesize() int {
return c.buffer.Size()
}
// SetCancelFunc sets the function to be called by Cancel(). The function
// provided here should cancel the execution of any stages that are not
// absolutely required, whilst allowing for stages such as `after_script` to
// proceed.
func (c *clientJobTrace) SetCancelFunc(cancelFunc context.CancelFunc) {
c.lock.Lock()
defer c.lock.Unlock()
c.cancelFunc = cancelFunc
}
// Cancel consumes the function set by SetCancelFunc.
func (c *clientJobTrace) Cancel() bool {
c.lock.RLock()
cancelFunc := c.cancelFunc
c.lock.RUnlock()
if cancelFunc == nil {
return false
}
c.SetCancelFunc(nil)
cancelFunc()
return true
}
// SetAbortFunc sets the function to be called by Abort(). The function
// provided here should abort the execution of all stages.
func (c *clientJobTrace) SetAbortFunc(cancelFunc context.CancelFunc) {
c.lock.Lock()
defer c.lock.Unlock()
c.abortFunc = cancelFunc
}
// Abort consumes function set by SetAbortFunc
// The abort always have much higher importance than Cancel
// as abort interrupts the execution, thus cancel is never
// called after the Abort
func (c *clientJobTrace) Abort() bool {
c.lock.RLock()
abortFunc := c.abortFunc
c.lock.RUnlock()
if abortFunc == nil {
return false
}
c.SetCancelFunc(nil)
c.SetAbortFunc(nil)
abortFunc()
return true
}
func (c *clientJobTrace) SetFailuresCollector(fc common.FailuresCollector) {
c.failuresCollector = fc
}
func (c *clientJobTrace) SetSupportedFailureReasonMapper(f common.SupportedFailureReasonMapper) {
c.supportedFailureReasonMapper = f
}
func (c *clientJobTrace) IsStdout() bool {
return false
}
func (c *clientJobTrace) setFailure(data common.JobFailureData) {
c.state = common.Failed
c.exitCode = data.ExitCode
c.failureReason = c.ensureSupportedFailureReason(data.Reason)
if c.failuresCollector != nil {
c.failuresCollector.RecordFailure(c.ensureNonEmptyFailureReason(data.Reason), c.config.ShortDescription())
}
}
func (c *clientJobTrace) ensureSupportedFailureReason(reason common.JobFailureReason) common.JobFailureReason {
if c.supportedFailureReasonMapper == nil {
return reason
}
return c.supportedFailureReasonMapper.Map(reason)
}
func (c *clientJobTrace) ensureNonEmptyFailureReason(reason common.JobFailureReason) common.JobFailureReason {
// No specific reason means it's a script failure
// (or Runner doesn't yet detect that it's something else)
if reason == "" {
return common.ScriptFailure
}
return reason
}
func (c *clientJobTrace) start() {
c.finished = make(chan bool)
c.state = common.Running
c.setupLogLimit()
go c.watch()
}
func (c *clientJobTrace) ensureAllTraceSent() error {
for c.anyTraceToSend() {
switch c.sendPatch().State {
case common.PatchSucceeded:
// we continue sending till we succeed
continue
case common.PatchAbort:
return nil
case common.PatchNotFound:
return nil
case common.PatchRangeMismatch:
time.Sleep(c.getUpdateInterval())
case common.PatchFailed:
time.Sleep(c.getUpdateInterval())
return ErrInvalidPatchTraceResponse
}
}
return nil
}
func (c *clientJobTrace) finalUpdate() error {
// On final-update we want the Runner to fallback
// to default interval and make Rails to override it
c.setUpdateInterval(common.DefaultUpdateInterval)
for {
// Before sending update to ensure that trace is sent
// as `sendUpdate()` can force Runner to rewind trace
err := c.ensureAllTraceSent()
if err != nil {
return err
}
switch c.sendUpdate() {
case common.UpdateSucceeded:
return nil
case common.UpdateAbort:
return nil
case common.UpdateNotFound:
return nil
case common.UpdateAcceptedButNotCompleted:
time.Sleep(c.getUpdateInterval())
case common.UpdateTraceValidationFailed:
time.Sleep(c.getUpdateInterval())
case common.UpdateFailed:
time.Sleep(c.getUpdateInterval())
return ErrInvalidUpdateJobResponse
}
}
}
func (c *clientJobTrace) finish() error {
c.buffer.Finish()
c.finished <- true
err := retry.NewNoValue(
retry.New().
WithMaxTries(c.finalUpdateRetryLimit).
WithBackoff(time.Second, c.finalUpdateBackoffMax),
c.finalUpdate,
).Run()
c.buffer.Close()
return err
}
// incrementalUpdate returns a flag if jobs is supposed
// to be running, or whether it should be finished
func (c *clientJobTrace) incrementalUpdate() bool {
patchResult := c.sendPatch()
if patchResult.CancelRequested {
c.Cancel()
}
switch patchResult.State {
case common.PatchSucceeded:
// We try to additionally touch job to check
// it might be required if no content was send
// for longer period of time.
// This is needed to discover if it should be aborted
touchResult := c.touchJob()
if touchResult.CancelRequested {
c.Cancel()
}
if touchResult.State == common.UpdateAbort {
c.Abort()
return false
}
case common.PatchAbort:
c.Abort()
return false
}
return true
}
func (c *clientJobTrace) anyTraceToSend() bool {
c.lock.RLock()
defer c.lock.RUnlock()
return c.buffer.Size() != c.sentTrace
}
func (c *clientJobTrace) sendPatch() common.PatchTraceResult {
c.lock.RLock()
content, err := c.buffer.Bytes(c.sentTrace, c.maxTracePatchSize)
sentTrace := c.sentTrace
c.lock.RUnlock()
if err != nil {
return common.PatchTraceResult{State: common.PatchFailed}
}
if len(content) == 0 {
return common.PatchTraceResult{State: common.PatchSucceeded}
}
result := c.client.PatchTrace(c.config, c.jobCredentials, content, sentTrace, c.debugModeEnabled)
c.setUpdateInterval(result.NewUpdateInterval)
if result.State == common.PatchSucceeded || result.State == common.PatchRangeMismatch {
c.lock.Lock()
c.sentTime = time.Now()
c.sentTrace = result.SentOffset
c.lock.Unlock()
}
return result
}
func (c *clientJobTrace) setUpdateInterval(newUpdateInterval time.Duration) {
if newUpdateInterval <= 0 {
return
}
c.lock.Lock()
defer c.lock.Unlock()
c.updateInterval = newUpdateInterval
// Let's hope that this never happens,
// but if server behaves bogus do not have too long interval
if c.updateInterval > common.MaxUpdateInterval {
c.updateInterval = common.MaxUpdateInterval
}
if c.config.IsFeatureFlagOn(featureflags.UseDynamicTraceForceSendInterval) {
c.forceSendInterval = c.updateInterval * common.TraceForceSendUpdateIntervalMultiplier
if c.forceSendInterval < common.MinTraceForceSendInterval {
c.forceSendInterval = common.MinTraceForceSendInterval
}
if c.forceSendInterval > common.MaxTraceForceSendInterval {
c.forceSendInterval = common.MaxTraceForceSendInterval
}
}
}
// Update Coordinator that the job is still running.
func (c *clientJobTrace) touchJob() common.UpdateJobResult {
c.lock.RLock()
shouldRefresh := time.Since(c.sentTime) > c.forceSendInterval
c.lock.RUnlock()
if !shouldRefresh {
return common.UpdateJobResult{State: common.UpdateSucceeded}
}
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: common.Running,
Output: common.JobTraceOutput{
Checksum: c.checksum(),
Bytesize: c.bytesize(),
},
}
result := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
c.setUpdateInterval(result.NewUpdateInterval)
if result.State == common.UpdateSucceeded {
c.lock.Lock()
c.sentTime = time.Now()
c.lock.Unlock()
}
return result
}
func (c *clientJobTrace) sendUpdate() common.UpdateState {
c.lock.RLock()
state := c.state
c.lock.RUnlock()
jobInfo := common.UpdateJobInfo{
ID: c.id,
State: state,
FailureReason: c.failureReason,
Output: common.JobTraceOutput{
Checksum: c.checksum(),
Bytesize: c.bytesize(),
},
ExitCode: c.exitCode,
}
result := c.client.UpdateJob(c.config, c.jobCredentials, jobInfo)
c.setUpdateInterval(result.NewUpdateInterval)
if result.State == common.UpdateSucceeded {
c.lock.Lock()
c.sentTime = time.Now()
c.lock.Unlock()
} else if result.State == common.UpdateTraceValidationFailed {
c.lock.Lock()
c.sentTime = time.Now()
c.sentTrace = 0
c.lock.Unlock()
}
return result.State
}
func (c *clientJobTrace) watch() {
for {
select {
case <-time.After(c.getUpdateInterval()):
if !c.incrementalUpdate() {
// job is no longer running, wait for finish
<-c.finished
return
}
case <-c.finished:
return
}
}
}
func (c *clientJobTrace) getUpdateInterval() time.Duration {
c.lock.RLock()
defer c.lock.RUnlock()
return c.updateInterval
}
func (c *clientJobTrace) setupLogLimit() {
bytesLimit := c.config.OutputLimit * 1024 // convert to bytes
if bytesLimit == 0 {
bytesLimit = common.DefaultTraceOutputLimit
}
c.buffer.SetLimit(bytesLimit)
}
func (c *clientJobTrace) SetDebugModeEnabled(isEnabled bool) {
c.debugModeEnabled = isEnabled
}
func newJobTrace(
client common.Network,
config common.RunnerConfig,
jobCredentials *common.JobCredentials,
) (*clientJobTrace, error) {
buffer, err := trace.New()
if err != nil {
return nil, err
}
return &clientJobTrace{
client: client,
config: config,
buffer: buffer,
jobCredentials: jobCredentials,
id: jobCredentials.ID,
maxTracePatchSize: common.DefaultTracePatchLimit,
updateInterval: common.DefaultUpdateInterval,
forceSendInterval: common.MinTraceForceSendInterval,
finalUpdateBackoffMax: common.DefaultfinalUpdateBackoffMax,
finalUpdateRetryLimit: config.GetJobStatusFinalUpdateRetryLimit(),
}, nil
}