producer/log_accumulator.go (106 lines of code) (raw):
package producer
import (
"errors"
"strings"
"sync"
"sync/atomic"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
uberatomic "go.uber.org/atomic"
)
type LogAccumulator struct {
lock sync.Mutex
logGroupData map[string]*ProducerBatch
producerConfig *ProducerConfig
ioWorker *IoWorker
shutDownFlag *uberatomic.Bool
logger log.Logger
threadPool *IoThreadPool
producer *Producer
packIdGenrator *PackIdGenerator
}
func initLogAccumulator(config *ProducerConfig, ioWorker *IoWorker, logger log.Logger, threadPool *IoThreadPool, producer *Producer) *LogAccumulator {
return &LogAccumulator{
logGroupData: make(map[string]*ProducerBatch),
producerConfig: config,
ioWorker: ioWorker,
shutDownFlag: uberatomic.NewBool(false),
logger: logger,
threadPool: threadPool,
producer: producer,
packIdGenrator: newPackIdGenerator(),
}
}
func (logAccumulator *LogAccumulator) addLogToProducerBatch(project, logstore, shardHash, logTopic, logSource string,
logData interface{}, callback CallBack) error {
if logAccumulator.shutDownFlag.Load() {
level.Warn(logAccumulator.logger).Log("msg", "Producer has started and shut down and cannot write to new logs")
return errors.New("Producer has started and shut down and cannot write to new logs")
}
if log, ok := logData.(*sls.Log); ok {
logAccumulator.addLog(project, logstore, shardHash, logTopic, logSource, log, callback)
return nil
}
if logList, ok := logData.([]*sls.Log); ok {
logAccumulator.addLogList(project, logstore, shardHash, logTopic, logSource, logList, callback)
return nil
}
level.Error(logAccumulator.logger).Log("msg", "Invalid logType")
return errors.New("invalid logType")
}
func (logAccumulator *LogAccumulator) addLog(project, logstore, shardHash, logTopic, logSource string,
log *sls.Log, callback CallBack) {
key := logAccumulator.getKeyString(project, logstore, logTopic, shardHash, logSource)
logSize := int64(GetLogSizeCalculate(log))
atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logSize)
logAccumulator.lock.Lock()
producerBatch := logAccumulator.getOrCreateProducerBatch(key, project, logstore, logTopic, logSource, shardHash)
producerBatch.addLog(log, logSize, callback)
if !producerBatch.meetSendCondition(logAccumulator.producerConfig) {
logAccumulator.lock.Unlock()
return
}
logAccumulator.logGroupData[key] = nil
logAccumulator.lock.Unlock()
logAccumulator.threadPool.addTask(producerBatch)
}
func (logAccumulator *LogAccumulator) addLogList(project, logstore, shardHash, logTopic, logSource string,
logList []*sls.Log, callback CallBack) {
key := logAccumulator.getKeyString(project, logstore, logTopic, shardHash, logSource)
logListSize := int64(GetLogListSize(logList))
atomic.AddInt64(&logAccumulator.producer.producerLogGroupSize, logListSize)
logAccumulator.lock.Lock()
producerBatch := logAccumulator.getOrCreateProducerBatch(key, project, logstore, logTopic, logSource, shardHash)
producerBatch.addLogList(logList, logListSize, callback)
if !producerBatch.meetSendCondition(logAccumulator.producerConfig) {
logAccumulator.lock.Unlock()
return
}
logAccumulator.logGroupData[key] = nil
logAccumulator.lock.Unlock()
logAccumulator.threadPool.addTask(producerBatch)
}
func (logAccumulator *LogAccumulator) getOrCreateProducerBatch(key, project, logstore, logTopic, logSource, shardHash string) *ProducerBatch {
if producerBatch, ok := logAccumulator.logGroupData[key]; ok && producerBatch != nil {
return producerBatch
}
logAccumulator.producer.monitor.incCreateBatch()
batch := newProducerBatch(logAccumulator.packIdGenrator, project, logstore, logTopic, logSource, shardHash, logAccumulator.producerConfig)
logAccumulator.logGroupData[key] = batch
return batch
}
func (logAccumulator *LogAccumulator) getKeyString(project, logstore, logTopic, shardHash, logSource string) string {
var key strings.Builder
key.Grow(len(project) + len(logstore) + len(logTopic) + len(shardHash) + len(logSource) + len(Delimiter)*4)
key.WriteString(project)
key.WriteString(Delimiter)
key.WriteString(logstore)
key.WriteString(Delimiter)
key.WriteString(logTopic)
key.WriteString(Delimiter)
key.WriteString(shardHash)
key.WriteString(Delimiter)
key.WriteString(logSource)
return key.String()
}