func()

in producer/io_worker.go [43:93]


func (ioWorker *IoWorker) sendToServer(producerBatch *ProducerBatch) {
	level.Debug(ioWorker.logger).Log("msg", "ioworker send data to server")
	sendBegin := time.Now()
	var err error
	if producerBatch.isUseMetricStoreUrl() {
		// not use compress type now
		err = ioWorker.client.PutLogsWithMetricStoreURL(producerBatch.getProject(), producerBatch.getLogstore(), producerBatch.logGroup)
	} else {
		req := &sls.PostLogStoreLogsRequest{
			LogGroup:     producerBatch.logGroup,
			HashKey:      producerBatch.getShardHash(),
			CompressType: ioWorker.producer.producerConfig.CompressType,
			Processor:    ioWorker.producer.producerConfig.Processor,
		}
		err = ioWorker.client.PostLogStoreLogsV2(producerBatch.getProject(), producerBatch.getLogstore(), req)
	}
	sendEnd := time.Now()

	// send ok
	if err == nil {
		level.Debug(ioWorker.logger).Log("msg", "sendToServer success")
		defer ioWorker.producer.monitor.recordSuccess(sendBegin, sendEnd)
		producerBatch.OnSuccess(sendBegin)
		// After successful delivery, producer removes the batch size sent out
		atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
		return
	}

	slsError := parseSlsError(err)
	canRetry := ioWorker.canRetry(producerBatch, slsError)
	level.Error(ioWorker.logger).Log("msg", "sendToServer failed",
		"retryTimes", producerBatch.attemptCount,
		"requestId", slsError.RequestID,
		"errorCode", slsError.Code,
		"errorMessage", slsError.Message,
		"logs", len(producerBatch.logGroup.Logs),
		"canRetry", canRetry)
	if !canRetry {
		defer ioWorker.producer.monitor.recordFailure(sendBegin, sendEnd)
		producerBatch.OnFail(slsError, sendBegin)
		atomic.AddInt64(&ioWorker.producer.producerLogGroupSize, -producerBatch.totalDataSize)
		return
	}

	// do retry
	ioWorker.producer.monitor.recordRetry(sendEnd.Sub(sendBegin))
	producerBatch.addAttempt(slsError, sendBegin)
	producerBatch.nextRetryMs = producerBatch.getRetryBackoffIntervalMs() + time.Now().UnixMilli()
	level.Debug(ioWorker.logger).Log("msg", "Submit to the retry queue after meeting the retry criteria。")
	ioWorker.retryQueue.sendToRetryQueue(producerBatch, ioWorker.logger)
}