producer/io_worker.go (107 lines of code) (raw):

package producer import ( "sync" "sync/atomic" "time" sls "github.com/aliyun/aliyun-log-go-sdk" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" uberatomic "go.uber.org/atomic" ) type CallBack interface { Success(result *Result) Fail(result *Result) } type IoWorker struct { taskCount int64 client sls.ClientInterface retryQueue *RetryQueue retryQueueShutDownFlag *uberatomic.Bool logger log.Logger maxIoWorker chan int64 noRetryStatusCodeMap map[int]*string producer *Producer } func initIoWorker(client sls.ClientInterface, retryQueue *RetryQueue, logger log.Logger, maxIoWorkerCount int64, errorStatusMap map[int]*string, producer *Producer) *IoWorker { return &IoWorker{ client: client, retryQueue: retryQueue, taskCount: 0, retryQueueShutDownFlag: uberatomic.NewBool(false), logger: logger, maxIoWorker: make(chan int64, maxIoWorkerCount), noRetryStatusCodeMap: errorStatusMap, producer: producer, } } func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) { level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server") sendBegin := time.Now() var err error if producerBatch.isUseMetricStoreUrl() { // not use compress type now err = ioWorker.client.PutLogsWithMetricStoreURL(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup) } else { req := &sls.PostLogStoreLogsRequest{ LogGroup: producerBatch.logGroup, HashKey: producerBatch.getShardHash(), CompressType: ioWorker.producer.producerConfig.CompressType, Processor: ioWorker.producer.producerConfig.Processor, } err = ioWorker.client.PostLogStoreLogsV2(producerBatch.getProject(), producerBatch.getLogstore(), req) } sendEnd := time.Now() // send ok if err == nil { level.Debug(ioWorker.logger).Log("msg", "sendToServer success") defer ioWorker.producer.monitor.recordSuccess(sendBegin, sendEnd) producerBatch.OnSuccess(sendBegin) // After successful delivery, producer removes the batch size sent out atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize) return } slsError := parseSlsError(err) canRetry := ioWorker.canRetry(producerBatch, slsError) level.Error(ioWorker.logger).Log("msg", "sendToServer failed", "retryTimes", producerBatch.attemptCount, "requestId", slsError.RequestID, "errorCode", slsError.Code, "errorMessage", slsError.Message, "logs", len(producerBatch.logGroup.Logs), "canRetry", canRetry) if !canRetry { defer ioWorker.producer.monitor.recordFailure(sendBegin, sendEnd) producerBatch.OnFail(slsError, sendBegin) atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize) return } // do retry ioWorker.producer.monitor.recordRetry(sendEnd.Sub(sendBegin)) producerBatch.addAttempt(slsError, sendBegin) producerBatch.nextRetryMs = producerBatch.getRetryBackoffIntervalMs() + time.Now().UnixMilli() level.Debug(ioWorker.logger).Log("msg", "Submit to the retry queue after meeting the retry criteria。") ioWorker.retryQueue.sendToRetryQueue(producerBatch, ioWorker.logger) } func parseSlsError(err error) *sls.Error { if slsError, ok := err.(*sls.Error); ok { return slsError } return &sls.Error{ Message: err.Error(), } } func (ioWorker *IoWorker) canRetry(producerBatch *ProducerBatch, err *sls.Error) bool { if ioWorker.retryQueueShutDownFlag.Load() { return false } if _, ok := ioWorker.noRetryStatusCodeMap[int(err.HTTPCode)]; ok { return false } return producerBatch.attemptCount < producerBatch.maxRetryTimes } func (ioWorker *IoWorker) closeSendTask(ioWorkerWaitGroup *sync.WaitGroup) { <-ioWorker.maxIoWorker atomic.AddInt64(&ioWorker.taskCount, -1) ioWorkerWaitGroup.Done() } func (ioWorker *IoWorker) startSendTask(ioWorkerWaitGroup *sync.WaitGroup) { atomic.AddInt64(&ioWorker.taskCount, 1) ioWorker.maxIoWorker <- 1 ioWorkerWaitGroup.Add(1) }