pkg/tools/buffer/buffer.go (719 lines of code) (raw):
// Licensed to Apache Software Foundation (ASF) under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Apache Software Foundation (ASF) licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package buffer
import (
"container/list"
"errors"
"fmt"
"io"
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/apache/skywalking-rover/pkg/logger"
"github.com/apache/skywalking-rover/pkg/tools/enums"
"github.com/apache/skywalking-rover/pkg/tools/host"
)
var (
ErrNotComplete = errors.New("socket: not complete event")
emptyList = list.New()
log = logger.GetLogger("tools", "buffer")
PooledBuffer = sync.Pool{
New: func() any {
return &[2048]byte{}
},
}
)
func BorrowNewBuffer() *[2048]byte {
return PooledBuffer.Get().(*[2048]byte)
}
type SocketDataBuffer interface {
// Protocol of the buffer
Protocol() enums.ConnectionProtocol
// GenerateConnectionID for identity the buffer belong which connection
GenerateConnectionID() string
// BufferData of the buffer
BufferData() []byte
// TotalSize of socket data, the data may exceed the size of the BufferData()
TotalSize() uint64
// Direction of the data, send or receive
Direction() enums.SocketDataDirection
// BufferStartPosition the buffer start index
BufferStartPosition() int
// BufferLen the buffer data length
BufferLen() int
// DataID data id of the buffer
DataID() uint64
// PrevDataID the previous data id of the buffer
PrevDataID() uint64
// DataSequence the data sequence under same data id
DataSequence() int
// IsStart this buffer is start of the same data id
IsStart() bool
// IsFinished this buffer is finish of the same data id
IsFinished() bool
// HaveReduceDataAfterChunk check have reduced data after current buffer
HaveReduceDataAfterChunk() bool
// StartTime the data start timestamp
StartTime() uint64
// EndTime the data end timestamp
EndTime() uint64
ReleaseBuffer() *[2048]byte
}
type SocketDataDetail interface {
// DataID data id of the buffer
DataID() uint64
// Time (BPF) of the detail event
Time() uint64
}
type DataIDRange struct {
From uint64
To uint64
IsToBufferReadFinished bool
}
func (i *DataIDRange) String() string {
return fmt.Sprintf("from: %d, to: %d, isToBufferReadFinished: %t", i.From, i.To, i.IsToBufferReadFinished)
}
type Buffer struct {
dataEvents *list.List
detailEvents *list.List
validated bool // the events list is validated or not
// shouldResetPosition means the buffer have new buffer appended into the buffer
// so the position should reset to the head, and re-read the buffer
shouldResetPosition bool
eventLocker sync.RWMutex
head *Position
current *Position
// record the latest expired data id in connection for expire the older socket detail
// because the older socket detail may not be received in buffer
latestExpiredDataID uint64
// originalBuffer record this buffer is from Buffer.Slice or CombineSlices
// if it's not empty, then when getting the details should query from originalBuffer
// Because the BPF Queue is unsorted and can be delayed, so the details should query by real buffer
originalBuffer *Buffer
// endPosition record the end position of the originalBuffer
endPosition *Position
}
type SocketDataEventLimited struct {
SocketDataBuffer
From int
Size int
}
func (s *SocketDataEventLimited) Protocol() enums.ConnectionProtocol {
return s.SocketDataBuffer.Protocol()
}
func (s *SocketDataEventLimited) BufferData() []byte {
return s.SocketDataBuffer.BufferData()[s.From:s.Size]
}
func (s *SocketDataEventLimited) BufferLen() int {
return s.Size - s.From
}
func (s *SocketDataEventLimited) BufferStartPosition() int {
return s.From
}
func (i *DataIDRange) IsIncludeAllDetails(l *list.List) bool {
if l.Len() == 0 {
return false
}
for e := l.Front(); e != nil; e = e.Next() {
if e.Value.(SocketDataDetail).DataID() < i.From || e.Value.(SocketDataDetail).DataID() > i.To {
return false
}
}
return true
}
func (i *DataIDRange) Append(other *DataIDRange) *DataIDRange {
if other.From < i.From {
i.From = other.From
}
if other.To > i.To {
i.To = other.To
}
i.IsToBufferReadFinished = other.IsToBufferReadFinished
return i
}
func (i *DataIDRange) DeleteDetails(buf *Buffer) {
if buf.originalBuffer != nil {
i.DeleteDetails(buf.originalBuffer)
}
for e := buf.detailEvents.Front(); e != nil; {
next := e.Next()
dataID := e.Value.(SocketDataDetail).DataID()
if dataID >= i.From && dataID <= i.To {
if !i.IsToBufferReadFinished && dataID == i.To {
break
}
buf.detailEvents.Remove(e)
log.Debugf("delete detail event from buffer, data id: %d, ref: %p, range: %d-%d(%t)",
dataID, buf, i.From, i.To, i.IsToBufferReadFinished)
}
e = next
}
}
type Position struct {
// element of the event list
element *list.Element
// bufIndex the buffer index of the element
bufIndex int
}
func (p *Position) Clone() *Position {
return &Position{element: p.element, bufIndex: p.bufIndex}
}
func (p *Position) DataID() uint64 {
return p.element.Value.(SocketDataBuffer).DataID()
}
func (p *Position) PrevDataID() uint64 {
return p.element.Value.(SocketDataBuffer).PrevDataID()
}
func (p *Position) Seq() int {
return p.element.Value.(SocketDataBuffer).DataSequence()
}
func NewBuffer() *Buffer {
return &Buffer{
dataEvents: list.New(),
detailEvents: list.New(),
validated: false,
}
}
func (r *Buffer) FindFirstDataBuffer(dataID uint64) SocketDataBuffer {
for e := r.dataEvents.Front(); e != nil; e = e.Next() {
cur := e.Value.(SocketDataBuffer)
if cur.DataID() == dataID {
return cur
}
}
return nil
}
func (r *Buffer) BuildTotalDataIDRange() *DataIDRange {
if r.dataEvents.Len() == 0 {
return nil
}
var toIndex uint64
var isToBufferReadFinished bool
if r.endPosition != nil {
toIndex = r.endPosition.DataID()
isToBufferReadFinished = r.endPosition.bufIndex == r.endPosition.element.Value.(SocketDataBuffer).BufferLen()
} else {
toIndex = r.current.DataID()
isToBufferReadFinished = r.current.bufIndex == r.current.element.Value.(SocketDataBuffer).BufferLen()
}
return &DataIDRange{
From: r.head.DataID(),
To: toIndex,
IsToBufferReadFinished: isToBufferReadFinished,
}
}
func (r *Buffer) Position() *Position {
return r.current.Clone()
}
func (r *Buffer) Clean() {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
r.dataEvents = list.New()
r.detailEvents = list.New()
r.head = nil
r.current = nil
r.endPosition = nil
}
// nolint
func (r *Buffer) Slice(validated bool, start, end *Position) *Buffer {
dataEvents := list.New()
for nextElement := start.element; nextElement != end.element; nextElement = nextElement.Next() {
if nextElement == nil || nextElement.Value == nil {
break
}
currentBuffer := nextElement.Value.(SocketDataBuffer)
dataEvents.PushBack(currentBuffer)
}
lastBuffer := end.element.Value.(SocketDataBuffer)
dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer: lastBuffer, Size: end.bufIndex})
return &Buffer{
dataEvents: dataEvents,
detailEvents: emptyList,
validated: validated,
head: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex},
current: &Position{element: dataEvents.Front(), bufIndex: start.bufIndex},
originalBuffer: r,
endPosition: end,
}
}
func (r *Buffer) Len() int {
if r == nil || r.head == nil {
return 0
}
var result int
var startIndex = r.head.bufIndex
for e := r.head.element; e != nil; e = e.Next() {
result += r.head.element.Value.(SocketDataBuffer).BufferLen() - startIndex
startIndex = 0
}
return result
}
func (r *Buffer) BuildDetails() *list.List {
// if the original buffer is not empty, then query the details from original buffer
if r.originalBuffer != nil {
events := list.New()
fromDataID := r.head.DataID()
var endDataID uint64
if r.endPosition != nil {
endDataID = r.endPosition.DataID()
} else {
endDataID = r.current.DataID()
}
for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() {
if e.Value == nil {
continue
}
if e.Value.(SocketDataDetail).DataID() >= fromDataID && e.Value.(SocketDataDetail).DataID() <= endDataID {
events.PushBack(e.Value)
}
}
if events.Len() == 0 && log.Enable(logrus.DebugLevel) {
dataIDList := make([]uint64, 0)
for e := r.originalBuffer.detailEvents.Front(); e != nil; e = e.Next() {
if e.Value != nil {
dataIDList = append(dataIDList, e.Value.(SocketDataDetail).DataID())
}
}
log.Infof("cannot found details from original buffer, from data id: %d, end data id: %d, "+
"ref: %p, existing details data id list: %v", fromDataID, endDataID, r.originalBuffer, dataIDList)
}
return events
}
return r.detailEvents
}
func (r *Buffer) DataSize() int64 {
if r == nil {
return 0
}
var result int64
var headPosition = r.head
if headPosition == nil {
if !r.PrepareForReading() {
return 0
}
defer func() {
r.ResetForLoopReading()
}()
headPosition = r.head
}
isFirst := true
for e := headPosition.element; e != nil; e = e.Next() {
if isFirst {
result += int64(e.Value.(SocketDataBuffer).BufferLen() - headPosition.bufIndex)
isFirst = false
} else {
result += int64(e.Value.(SocketDataBuffer).BufferLen())
}
}
return result
}
func (r *Buffer) FirstSocketBuffer() SocketDataBuffer {
if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Front().Value.(SocketDataBuffer)
}
func (r *Buffer) LastSocketBuffer() SocketDataBuffer {
if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
return nil
}
return r.dataEvents.Back().Value.(SocketDataBuffer)
}
func (r *Buffer) TotalBuffer() []SocketDataBuffer {
if r == nil || r.dataEvents == nil || r.dataEvents.Len() == 0 {
return nil
}
result := make([]SocketDataBuffer, 0, r.dataEvents.Len())
for e := r.dataEvents.Front(); e != nil; e = e.Next() {
if e.Value == nil {
continue
}
result = append(result, e.Value.(SocketDataBuffer))
}
return result
}
// DetectNotSendingLastPosition detect the buffer contains not sending data: the BPF limited socket data count
func (r *Buffer) DetectNotSendingLastPosition() *Position {
if r == nil || r.dataEvents.Len() == 0 {
return nil
}
for e := r.dataEvents.Front(); e != nil; e = e.Next() {
buf := e.Value.(SocketDataBuffer)
// the buffer is sent finished but still have reduced data not send
if buf.IsFinished() && buf.HaveReduceDataAfterChunk() {
return &Position{element: e, bufIndex: buf.BufferLen()}
}
}
return nil
}
func CombineSlices(validated bool, originalBuffer *Buffer, buffers ...*Buffer) *Buffer {
if len(buffers) == 0 {
return nil
}
if len(buffers) == 1 {
return buffers[0]
}
dataEvents := list.New()
for _, b := range buffers {
if b == nil || b.head == nil {
continue
}
if b.head.bufIndex > 0 {
headBuffer := b.dataEvents.Front().Value.(SocketDataBuffer)
dataEvents.PushBack(&SocketDataEventLimited{SocketDataBuffer: headBuffer,
From: b.head.bufIndex, Size: headBuffer.BufferLen()})
for next := b.dataEvents.Front().Next(); next != nil; next = next.Next() {
dataEvents.PushBack(next.Value)
}
} else {
dataEvents.PushBackList(b.dataEvents)
}
}
var endPosition *Position
for i := len(buffers) - 1; i >= 0; i-- {
if buffers[i] == nil {
continue
}
if buffers[i].endPosition != nil {
endPosition = buffers[i].endPosition
break
} else if buffers[i].Position() != nil {
endPosition = buffers[i].Position()
break
}
}
return &Buffer{
dataEvents: dataEvents,
detailEvents: emptyList, // for the combined buffer, the details list should be queried from original buffer
validated: validated,
head: &Position{element: dataEvents.Front(), bufIndex: 0},
current: &Position{element: dataEvents.Front(), bufIndex: 0},
originalBuffer: originalBuffer,
endPosition: endPosition,
}
}
func (r *Buffer) Peek(p []byte) (n int, err error) {
// save the index temporary
tmpPosition := r.current.Clone()
// restore the index
defer func() {
r.current = tmpPosition
}()
readIndex := 0
for readIndex < len(p) {
count, err := r.Read(p[readIndex:])
if err != nil {
return 0, err
}
readIndex += count
}
return readIndex, nil
}
func (r *Buffer) OffsetPosition(offset int) *Position {
var nextElement func(e *list.Element) *list.Element
if offset == 0 {
return r.current.Clone()
} else if offset > 0 {
nextElement = func(e *list.Element) *list.Element {
return e.Next()
}
} else {
nextElement = func(e *list.Element) *list.Element {
return e.Prev()
}
}
var curEle = r.current.element
var curIndex = r.current.bufIndex
for ; curEle != nil; curEle = nextElement(curEle) {
nextOffset := curIndex + offset
bufferLen := curEle.Value.(SocketDataBuffer).BufferLen()
if nextOffset >= 0 && nextOffset < bufferLen {
curIndex += offset
break
}
if offset > 0 {
offset -= bufferLen - curIndex
curIndex = 0
} else {
offset += curIndex
next := nextElement(curEle)
if next == nil {
curEle = next
break
}
curIndex = curEle.Value.(SocketDataBuffer).BufferLen()
}
}
if curEle == nil {
return nil
}
return &Position{element: curEle, bufIndex: curIndex}
}
func (r *Buffer) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if r.current == nil || r.current.element == nil {
return 0, io.EOF
}
element, n := r.ReadFromCurrent(p)
if n > 0 {
return n, nil
}
curEvent := element.Value.(SocketDataBuffer)
next := r.nextElement(element)
if next == nil {
return 0, io.EOF
}
nextEvent := next.Value.(SocketDataBuffer)
var shouldRead = false
if r.validated {
shouldRead = true
// same data id and sequence orders
} else if (curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence()+1 == nextEvent.DataSequence()) ||
// cur event is finished and next event is start
(nextEvent.IsStart() && curEvent.IsFinished()) ||
// same data id and sequence but have difference buffer index
(curEvent.DataID() == nextEvent.DataID() && curEvent.DataSequence() == nextEvent.DataSequence() &&
r.current.bufIndex <= nextEvent.BufferStartPosition()) {
shouldRead = true
}
if !shouldRead {
return 0, ErrNotComplete
}
return r.read0(next, nextEvent, p)
}
func (r *Buffer) ReadFromCurrent(p []byte) (element *list.Element, n int) {
element = r.current.element
curEvent := element.Value.(SocketDataBuffer)
residueSize := curEvent.BufferLen() - r.current.bufIndex
if residueSize > 0 {
readLen := len(p)
if residueSize < readLen {
readLen = residueSize
}
n = copy(p, curEvent.BufferData()[r.current.bufIndex:r.current.bufIndex+readLen])
r.current.bufIndex += n
return element, n
}
return element, 0
}
func (r *Buffer) ReadUntilBufferFull(data []byte) error {
reduceLen := len(data)
currentIndex := 0
for reduceLen > 0 {
readCount, err := r.Read(data[:reduceLen])
if err != nil {
return err
}
reduceLen -= readCount
currentIndex += readCount
}
return nil
}
func (r *Buffer) Merge(other *Buffer) {
if other == nil {
return
}
for e := other.dataEvents.Front(); e != nil; e = e.Next() {
if v, ok := e.Value.(SocketDataBuffer); ok && v != nil {
r.AppendDataEvent(v)
}
}
for e := other.detailEvents.Front(); e != nil; e = e.Next() {
if v, ok := e.Value.(SocketDataDetail); ok && v != nil {
r.AppendDetailEvent(v)
}
}
}
func (r *Buffer) read0(currentElement *list.Element, currentBuffer SocketDataBuffer, p []byte) (n int, err error) {
readLen := len(p)
if currentBuffer.BufferLen() < readLen {
readLen = currentBuffer.BufferLen()
}
copy(p, currentBuffer.BufferData()[:readLen])
r.current.element = currentElement
r.current.bufIndex = readLen
return readLen, nil
}
// IsCurrentPacketReadFinished means to validate the current reading package is reading finished
func (r *Buffer) IsCurrentPacketReadFinished() bool {
return r.current.bufIndex == r.current.element.Value.(SocketDataBuffer).BufferLen()
}
func (r *Buffer) ResetForLoopReading() {
r.head = nil
r.current = nil
}
func (r *Buffer) PrepareForReading() bool {
if r.dataEvents.Len() == 0 {
return false
}
// if the buffer should reset, then reset the position and cannot be read from current position
if r.shouldResetPosition {
r.ResetForLoopReading()
r.shouldResetPosition = false
return r.PrepareForReading()
}
if r.head == nil || r.head.element == nil {
// read in the first element
r.eventLocker.RLock()
defer r.eventLocker.RUnlock()
r.head = &Position{element: r.dataEvents.Front(), bufIndex: 0}
r.current = r.head.Clone()
} else {
// make sure we can read from head
r.current = r.head.Clone()
}
return true
}
// nolint
func (r *Buffer) RemoveReadElements(includeDetails bool) bool {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
// delete until the last data id
if includeDetails && r.head.element != nil && r.current.element != nil {
firstDataID := r.head.element.Value.(SocketDataBuffer).DataID()
currentBuffer := r.current.element.Value.(SocketDataBuffer)
lastDataID := currentBuffer.DataID()
startDelete := false
for e := r.detailEvents.Front(); e != nil; {
event := e.Value.(SocketDataDetail)
if firstDataID == lastDataID && currentBuffer.BufferLen() != r.current.bufIndex {
// current buffer is not finished, so the detail cannot be deleted
break
}
if !startDelete && event.DataID() >= firstDataID && event.DataID() <= lastDataID {
startDelete = true
} else if startDelete && event.DataID() > lastDataID {
// out of the data id, just break
break
}
if startDelete {
tmp := e.Next()
r.detailEvents.Remove(e)
log.Debugf("delete detail event from readed buffer, data id: %d, ref: %p", event.DataID(), r)
e = tmp
} else {
e = e.Next()
}
}
}
// delete until to current position
next := r.head.element
stillCurrent := true
for ; next != nil && next != r.current.element; next = r.removeElement0(next) {
stillCurrent = false
}
// not enough data, then return
if !stillCurrent && next == nil {
return true
}
if next != nil && next.Value.(SocketDataBuffer).BufferLen() == r.current.bufIndex { // all buffer finished, then delete it
// the last event already read finished, then delete it
r.head.element = r.removeElement0(next)
r.head.bufIndex = 0
} else {
if !stillCurrent {
r.head.element = next
}
// still have reduced buffer, then keep reading from current index in next loop
r.head.bufIndex = r.current.bufIndex
}
return false
}
// SkipCurrentElement skip current element in reader, if return true means have read finished
func (r *Buffer) SkipCurrentElement() bool {
r.head.element = r.nextElement(r.current.element)
r.current.bufIndex = 0
return r.head.element == nil
}
func (r *Buffer) removeElement0(element *list.Element) *list.Element {
if element == nil {
return nil
}
result := element.Next()
r.dataEvents.Remove(element)
if element.Value != nil {
if b, ok := element.Value.(SocketDataBuffer); ok && b != nil {
PooledBuffer.Put(b.ReleaseBuffer())
}
}
return result
}
func (r *Buffer) AppendDetailEvent(event SocketDataDetail) {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
if r.detailEvents.Len() == 0 {
r.detailEvents.PushFront(event)
return
}
if r.detailEvents.Back().Value == nil {
r.detailEvents.PushFront(event)
return
}
if r.detailEvents.Back().Value.(SocketDataDetail).DataID() < event.DataID() {
r.detailEvents.PushBack(event)
return
}
beenAdded := false
for element := r.detailEvents.Front(); element != nil; element = element.Next() {
existEvent, ok := element.Value.(SocketDataDetail)
if !ok {
continue
}
if existEvent.DataID() > event.DataID() {
// data id needs order
beenAdded = true
}
if beenAdded {
r.detailEvents.InsertBefore(event, element)
break
}
}
if !beenAdded {
r.detailEvents.PushBack(event)
}
}
// AppendDataEvent insert the event to the event list following the order
func (r *Buffer) AppendDataEvent(event SocketDataBuffer) {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
defer func() {
// if the current position is not nil and the current reading data id is bigger than the event data id
if r.current != nil && r.current.DataID() > event.DataID() {
r.shouldResetPosition = true
}
}()
if r.dataEvents.Len() == 0 {
r.dataEvents.PushFront(event)
r.shouldResetPosition = true
return
}
if r.dataEvents.Back().Value.(SocketDataBuffer).DataID() < event.DataID() {
r.dataEvents.PushBack(event)
return
}
beenAdded := false
for element := r.dataEvents.Front(); element != nil; element = element.Next() {
existEvent := element.Value.(SocketDataBuffer)
if existEvent.DataID() > event.DataID() {
// data id needs order
beenAdded = true
} else if existEvent.DataID() == event.DataID() && existEvent.DataSequence() > event.DataSequence() {
// following the sequence order
beenAdded = true
}
if beenAdded {
r.dataEvents.InsertBefore(event, element)
break
}
}
if !beenAdded {
r.dataEvents.PushBack(event)
}
}
func (r *Buffer) DeleteExpireEvents(expireDuration time.Duration) int {
r.eventLocker.Lock()
defer r.eventLocker.Unlock()
expireTime := time.Now().Add(-expireDuration)
// data event queue
count := r.deleteEventsWithJudgement(r.dataEvents, func(element *list.Element) bool {
if element.Value == nil {
return true
}
buffer := element.Value.(SocketDataBuffer)
startTime := host.Time(buffer.StartTime())
if expireTime.After(startTime) {
r.latestExpiredDataID = buffer.DataID()
PooledBuffer.Put(buffer.ReleaseBuffer())
return true
}
return false
})
// detail event queue
count += r.deleteEventsWithJudgement(r.detailEvents, func(element *list.Element) bool {
detail, ok := element.Value.(SocketDataDetail)
if !ok {
return true
}
isDelete := r.latestExpiredDataID > 0 && detail.DataID() <= r.latestExpiredDataID ||
(detail.Time() > 0 && expireTime.After(host.Time(detail.Time())))
if isDelete {
log.Debugf("delete expired detail event, data id: %d, buf: %p", detail.DataID(), r)
}
return isDelete
})
return count
}
func (r *Buffer) DataLength() int {
r.eventLocker.RLock()
defer r.eventLocker.RUnlock()
if r.dataEvents == nil {
return 0
}
return r.dataEvents.Len()
}
func (r *Buffer) deleteEventsWithJudgement(l *list.List, checker func(element *list.Element) bool) int {
count := 0
for e := l.Front(); e != nil; {
if checker(e) {
count++
cur := e
e = e.Next()
l.Remove(cur)
} else {
break
}
}
return count
}
func (r *Buffer) nextElement(e *list.Element) *list.Element {
if e == nil {
return nil
}
r.eventLocker.RLock()
defer r.eventLocker.RUnlock()
return e.Next()
}