in producer/io_worker.go [43:93]
func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
sendBegin := time.Now()
var err error
if producerBatch.isUseMetricStoreUrl() {
// not use compress type now
err = ioWorker.client.PutLogsWithMetricStoreURL(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup)
} else {
req := &sls.PostLogStoreLogsRequest{
LogGroup: producerBatch.logGroup,
HashKey: producerBatch.getShardHash(),
CompressType: ioWorker.producer.producerConfig.CompressType,
Processor: ioWorker.producer.producerConfig.Processor,
}
err = ioWorker.client.PostLogStoreLogsV2(producerBatch.getProject(), producerBatch.getLogstore(), req)
}
sendEnd := time.Now()
// send ok
if err == nil {
level.Debug(ioWorker.logger).Log("msg", "sendToServer success")
defer ioWorker.producer.monitor.recordSuccess(sendBegin, sendEnd)
producerBatch.OnSuccess(sendBegin)
// After successful delivery, producer removes the batch size sent out
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
return
}
slsError := parseSlsError(err)
canRetry := ioWorker.canRetry(producerBatch, slsError)
level.Error(ioWorker.logger).Log("msg", "sendToServer failed",
"retryTimes", producerBatch.attemptCount,
"requestId", slsError.RequestID,
"errorCode", slsError.Code,
"errorMessage", slsError.Message,
"logs", len(producerBatch.logGroup.Logs),
"canRetry", canRetry)
if !canRetry {
defer ioWorker.producer.monitor.recordFailure(sendBegin, sendEnd)
producerBatch.OnFail(slsError, sendBegin)
atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
return
}
// do retry
ioWorker.producer.monitor.recordRetry(sendEnd.Sub(sendBegin))
producerBatch.addAttempt(slsError, sendBegin)
producerBatch.nextRetryMs = producerBatch.getRetryBackoffIntervalMs() + time.Now().UnixMilli()
level.Debug(ioWorker.logger).Log("msg", "Submit to the retry queue after meeting the retry criteria。")
ioWorker.retryQueue.sendToRetryQueue(producerBatch, ioWorker.logger)
}