producer/producer_batch.go (125 lines of code) (raw):
package producer
import (
"math"
"time"
sls "github.com/aliyun/aliyun-log-go-sdk"
"github.com/gogo/protobuf/proto"
)
var PACK_ID_KEY = "__pack_id__"
type ProducerBatch struct {
// read only fields
maxRetryIntervalInMs int64
baseRetryBackoffMs int64
maxRetryTimes int
createTimeMs int64
project string
logstore string
shardHash *string
maxReservedAttempts int
useMetricStoreUrl bool
// read only after seal
totalDataSize int64
logGroup *sls.LogGroup
callBackList []CallBack
// transient fields, but rw by at most one thread
attemptCount int
nextRetryMs int64
result *Result
}
func newProducerBatch(packIdGenerator *PackIdGenerator, project, logstore, logTopic, logSource, shardHash string, config *ProducerConfig) *ProducerBatch {
logGroup := &sls.LogGroup{
Topic: proto.String(logTopic),
Source: proto.String(logSource),
Logs: make([]*sls.Log, 0, config.MaxBatchCount+4),
}
if config.GeneratePackId {
logGroup.LogTags = append(make([]*sls.LogTag, 0, len(config.LogTags)+1), config.LogTags...)
logGroup.LogTags = append(logGroup.LogTags, &sls.LogTag{
Key: &PACK_ID_KEY,
Value: proto.String(packIdGenerator.GeneratePackId(project, logstore)),
})
} else {
logGroup.LogTags = config.LogTags
}
producerBatch := &ProducerBatch{
logGroup: logGroup,
maxRetryIntervalInMs: config.MaxRetryBackoffMs,
callBackList: []CallBack{},
createTimeMs: time.Now().UnixMilli(),
maxRetryTimes: config.Retries,
baseRetryBackoffMs: config.BaseRetryBackoffMs,
project: project,
logstore: logstore,
result: initResult(),
maxReservedAttempts: config.MaxReservedAttempts,
useMetricStoreUrl: config.UseMetricStoreURL,
}
if shardHash != "" {
producerBatch.shardHash = &shardHash
}
return producerBatch
}
func (producerBatch *ProducerBatch) getProject() string {
return producerBatch.project
}
func (producerBatch *ProducerBatch) getLogstore() string {
return producerBatch.logstore
}
func (producerBatch *ProducerBatch) getShardHash() *string {
return producerBatch.shardHash
}
func (producerBatch *ProducerBatch) isUseMetricStoreUrl() bool {
return producerBatch.useMetricStoreUrl
}
func (producerBatch *ProducerBatch) meetSendCondition(producerConfig *ProducerConfig) bool {
return producerBatch.totalDataSize >= producerConfig.MaxBatchSize || len(producerBatch.logGroup.Logs) >= producerConfig.MaxBatchCount
}
func (producerBatch *ProducerBatch) addLog(log *sls.Log, size int64, callback CallBack) {
producerBatch.logGroup.Logs = append(producerBatch.logGroup.Logs, log)
producerBatch.totalDataSize += size
if callback != nil {
producerBatch.callBackList = append(producerBatch.callBackList, callback)
}
}
func (producerBatch *ProducerBatch) addLogList(logList []*sls.Log, size int64, callback CallBack) {
producerBatch.logGroup.Logs = append(producerBatch.logGroup.Logs, logList...)
producerBatch.totalDataSize += size
if callback != nil {
producerBatch.callBackList = append(producerBatch.callBackList, callback)
}
}
func (producerBatch *ProducerBatch) OnSuccess(begin time.Time) {
producerBatch.addAttempt(nil, begin)
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Success(producerBatch.result)
}
}
}
func (producerBatch *ProducerBatch) OnFail(err *sls.Error, begin time.Time) {
producerBatch.addAttempt(err, begin)
if len(producerBatch.callBackList) > 0 {
for _, callBack := range producerBatch.callBackList {
callBack.Fail(producerBatch.result)
}
}
}
func (producerBatch *ProducerBatch) addAttempt(err *sls.Error, begin time.Time) {
producerBatch.result.successful = (err == nil)
producerBatch.attemptCount += 1
if producerBatch.attemptCount > producerBatch.maxReservedAttempts {
return
}
now := time.Now()
if err == nil {
attempt := createAttempt(true, "", "", "", now.UnixMilli(), now.Sub(begin).Milliseconds())
producerBatch.result.attemptList = append(producerBatch.result.attemptList, attempt)
return
}
attempt := createAttempt(false, err.RequestID, err.Code, err.Message, now.UnixMilli(), now.Sub(begin).Milliseconds())
producerBatch.result.attemptList = append(producerBatch.result.attemptList, attempt)
}
func (producerBatch *ProducerBatch) getRetryBackoffIntervalMs() int64 {
retryWaitTime := producerBatch.baseRetryBackoffMs * int64(math.Pow(2, float64(producerBatch.attemptCount)-1))
if retryWaitTime < producerBatch.maxRetryIntervalInMs {
return retryWaitTime
}
return producerBatch.maxRetryIntervalInMs
}