network/trace.go (242 lines of code) (raw):

package network import ( "bufio" "bytes" "context" "errors" "fmt" "io" "sync" "time" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/common" "gitlab.com/gitlab-org/gitlab-ci-multi-runner/helpers" ) var traceUpdateInterval = common.UpdateInterval var traceForceSendInterval = common.ForceTraceSentInterval var traceFinishRetryInterval = common.UpdateRetryInterval type tracePatch struct { trace bytes.Buffer offset int limit int } func (tp *tracePatch) Patch() []byte { return tp.trace.Bytes()[tp.offset:tp.limit] } func (tp *tracePatch) Offset() int { return tp.offset } func (tp *tracePatch) Limit() int { return tp.limit } func (tp *tracePatch) SetNewOffset(newOffset int) { tp.offset = newOffset } func (tp *tracePatch) ValidateRange() bool { if tp.limit >= tp.offset { return true } return false } func newTracePatch(trace bytes.Buffer, offset int) (*tracePatch, error) { patch := &tracePatch{ trace: trace, offset: offset, limit: trace.Len(), } if !patch.ValidateRange() { return nil, errors.New("Range is invalid, limit can't be less than offset") } return patch, nil } type clientJobTrace struct { *io.PipeWriter client common.Network config common.RunnerConfig jobCredentials *common.JobCredentials id int limit int64 cancelFunc context.CancelFunc log bytes.Buffer lock sync.RWMutex state common.JobState finished chan bool sentTrace int sentTime time.Time sentState common.JobState } func (c *clientJobTrace) Success() { c.Fail(nil) } func (c *clientJobTrace) Fail(err error) { c.lock.Lock() if c.state != common.Running { c.lock.Unlock() return } if err == nil { c.state = common.Success } else { c.state = common.Failed } c.lock.Unlock() c.finish() } func (c *clientJobTrace) SetCancelFunc(cancelFunc context.CancelFunc) { c.cancelFunc = cancelFunc } func (c *clientJobTrace) IsStdout() bool { return false } func (c *clientJobTrace) start() { reader, writer := io.Pipe() c.PipeWriter = writer c.finished = make(chan bool) c.state = common.Running go c.process(reader) go c.watch() } func (c *clientJobTrace) finish() { c.Close() c.finished <- true // Do final upload of job trace for { if c.fullUpdate() != common.UpdateFailed { return } time.Sleep(traceFinishRetryInterval) } } func (c *clientJobTrace) writeRune(r rune, limit int) (n int, err error) { c.lock.Lock() defer c.lock.Unlock() n, err = c.log.WriteRune(r) if c.log.Len() < limit { return } output := fmt.Sprintf("\n%sJob's log exceeded limit of %v bytes.%s\n", helpers.ANSI_BOLD_RED, limit, helpers.ANSI_RESET, ) c.log.WriteString(output) err = io.EOF return } func (c *clientJobTrace) process(pipe *io.PipeReader) { defer pipe.Close() stopped := false limit := c.config.OutputLimit if limit == 0 { limit = common.DefaultOutputLimit } limit *= 1024 reader := bufio.NewReader(pipe) for { r, s, err := reader.ReadRune() if s <= 0 { break } else if stopped { // ignore symbols if job log exceeded limit continue } else if err == nil { _, err = c.writeRune(r, limit) if err == io.EOF { stopped = true } } else { // ignore invalid characters continue } } } func (c *clientJobTrace) incrementalUpdate() common.UpdateState { c.lock.RLock() state := c.state trace := c.log c.lock.RUnlock() if c.sentState == state && c.sentTrace == trace.Len() && time.Since(c.sentTime) < traceForceSendInterval { return common.UpdateSucceeded } if c.sentState != state { c.client.UpdateJob(c.config, c.jobCredentials, c.id, state, nil) c.sentState = state } tracePatch, err := newTracePatch(trace, c.sentTrace) if err != nil { c.config.Log().Errorln("Error while creating a tracePatch", err.Error()) } update := c.client.PatchTrace(c.config, c.jobCredentials, tracePatch) if update == common.UpdateNotFound { return update } if update == common.UpdateRangeMismatch { update = c.resendPatch(c.jobCredentials.ID, c.config, c.jobCredentials, tracePatch) } if update == common.UpdateSucceeded { c.sentTrace = tracePatch.Limit() c.sentTime = time.Now() } return update } func (c *clientJobTrace) resendPatch(id int, config common.RunnerConfig, jobCredentials *common.JobCredentials, tracePatch common.JobTracePatch) (update common.UpdateState) { if !tracePatch.ValidateRange() { config.Log().Warningln(id, "Full job update is needed") fullTrace := c.log.String() return c.client.UpdateJob(c.config, jobCredentials, c.id, c.state, &fullTrace) } config.Log().Warningln(id, "Resending trace patch due to range mismatch") update = c.client.PatchTrace(config, jobCredentials, tracePatch) if update == common.UpdateRangeMismatch { config.Log().Errorln(id, "Appending trace to coordinator...", "failed due to range mismatch") return common.UpdateFailed } return } func (c *clientJobTrace) fullUpdate() common.UpdateState { c.lock.RLock() state := c.state trace := c.log.String() c.lock.RUnlock() if c.sentState == state && c.sentTrace == len(trace) && time.Since(c.sentTime) < traceForceSendInterval { return common.UpdateSucceeded } upload := c.client.UpdateJob(c.config, c.jobCredentials, c.id, state, &trace) if upload == common.UpdateSucceeded { c.sentTrace = len(trace) c.sentState = state c.sentTime = time.Now() } return upload } func (c *clientJobTrace) abort() bool { if c.cancelFunc != nil { c.cancelFunc() c.cancelFunc = nil return true } return false } func (c *clientJobTrace) watch() { for { select { case <-time.After(traceUpdateInterval): state := c.incrementalUpdate() if state == common.UpdateAbort && c.abort() { <-c.finished return } break case <-c.finished: return } } } func newJobTrace(client common.Network, config common.RunnerConfig, jobCredentials *common.JobCredentials) *clientJobTrace { return &clientJobTrace{ client: client, config: config, jobCredentials: jobCredentials, id: jobCredentials.ID, } }