agent/taskengine/outputbuffer/legacybuffer.go (133 lines of code) (raw):

package outputbuffer import ( "bytes" "io" "sync" ) type transformFunc func([]byte) []byte type LegacyOutputBuffer struct { buffer bytes.Buffer mutex sync.Mutex preQuota int postQuota int hasRead int hasWrote int64 inited bool transformer func([]byte) []byte transformerLock sync.RWMutex } func (b *LegacyOutputBuffer) Init(preQuota, postQuota int) (io.Writer, error) { b.mutex.Lock() defer b.mutex.Unlock() if b.inited { return b, nil } b.inited = true b.preQuota = preQuota b.postQuota = postQuota b.hasWrote = 0 b.hasRead = 0 return b, nil } func (b *LegacyOutputBuffer) Uninit() error { b.mutex.Lock() defer b.mutex.Unlock() if !b.inited { return ErrUninited } b.inited = false b.buffer.Reset() b.hasWrote = 0 b.hasRead = 0 return nil } func (b *LegacyOutputBuffer) SetTransformer(f transformFunc) { b.transformerLock.Lock() defer b.transformerLock.Unlock() b.transformer = f } func (b *LegacyOutputBuffer) getTransformer() transformFunc { b.transformerLock.RLock() defer b.transformerLock.RUnlock() return b.transformer } func (b *LegacyOutputBuffer) Write(p []byte) (n int, err error) { b.mutex.Lock() defer b.mutex.Unlock() n, e := b.buffer.Write(p) b.hasWrote += int64(n) return n, e } func (b *LegacyOutputBuffer) ReadPre() []byte { r := b.readPre() if r == nil { return nil } if f := b.getTransformer(); f != nil { return f(r) } return r } func (b *LegacyOutputBuffer) readPre() []byte { b.mutex.Lock() defer b.mutex.Unlock() if b.hasRead >= b.preQuota { return nil } res := make([]byte, b.preQuota-b.hasRead) n, _ := b.buffer.Read(res) b.hasRead += n return res[:n] } func (b *LegacyOutputBuffer) ReadAll() []byte { r := b.readAll() if r == nil { return nil } if f := b.getTransformer(); f != nil { return f(r) } return r } func (b *LegacyOutputBuffer) readAll() []byte { b.mutex.Lock() defer b.mutex.Unlock() var resPre []byte if b.hasRead < b.preQuota { sizePre := b.preQuota - b.hasRead if sizePre > b.buffer.Len() { sizePre = b.buffer.Len() } if sizePre > 0 { resPre = make([]byte, sizePre) n, _ := b.buffer.Read(resPre) b.hasRead += n } } remain := b.buffer.Len() couldRead := b.postQuota + b.preQuota - b.hasRead if couldRead > remain { couldRead = remain } resPost := make([]byte, couldRead) copy(resPost, b.buffer.Bytes()[remain-couldRead:]) b.buffer.Reset() b.hasRead += len(resPost) if len(resPre) == 0 { return resPost } else if len(resPost) == 0 { return resPre } r := make([]byte, len(resPre)+len(resPost)) copy(r, resPre) copy(r[len(resPre):], resPost) return r } func (b *LegacyOutputBuffer) Dropped() int { b.mutex.Lock() defer b.mutex.Unlock() if b.hasWrote <= int64(b.preQuota+b.postQuota) { return 0 } return int(b.hasWrote - int64(b.preQuota) - int64(b.postQuota)) }