internal/satellite/module/buffer/buffer.go (48 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package buffer import ( "github.com/apache/skywalking-satellite/internal/pkg/log" "github.com/apache/skywalking-satellite/internal/satellite/event" ) // BatchBuffer is a buffer to cache the input data in Sender. type BatchBuffer struct { buf []*event.OutputEventContext // cache first *event.Offset // the first OutputEventContext offset last *event.Offset // the last OutputEventContext offset size int // usage size cap int // the max capacity } // NewBatchBuffer creates a new BatchBuffer according to the capacity param. func NewBatchBuffer(capacity int) *BatchBuffer { return &BatchBuffer{ buf: make([]*event.OutputEventContext, capacity), first: nil, last: nil, size: 0, cap: capacity, } } // Buf returns the cached data in BatchBuffer. func (b *BatchBuffer) Buf() []*event.OutputEventContext { return b.buf } // First returns the first OutputEventContext offset. func (b *BatchBuffer) First() *event.Offset { return b.first } // Last returns the last OutputEventContext offset. func (b *BatchBuffer) Last() *event.Offset { return b.last } // Len returns the usage size. func (b *BatchBuffer) Len() int { return b.size } // Add adds a new data input buffer. func (b *BatchBuffer) Add(data *event.OutputEventContext) { if b.size == b.cap { log.Logger.Errorf("cannot add one item to the fulling BatchBuffer, the capacity is %d", b.cap) return } else if data.Offset == nil { log.Logger.Errorf("cannot add one item to BatchBuffer because the input data is illegal, the offset is empty") return } if b.size == 0 { b.first = data.Offset } b.last = data.Offset b.buf[b.size] = data b.size++ }