func()

in golang/producer.go [183:273]


func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v2.MessageType,
	candidates []*v2.MessageQueue, pubMessages []*PublishingMessage, attempt int) ([]*SendReceipt, error) {

	ctx = p.cli.Sign(ctx)

	idx := utils.Mod(int32(attempt)-1, len(candidates))
	selectMessageQueue := candidates[idx]

	if p.pSetting.IsValidateMessageType() && !utils.MatchMessageType(selectMessageQueue, messageType) {
		return nil, fmt.Errorf("current message type not match with topic accept message types")
	}

	endpoints := selectMessageQueue.GetBroker().GetEndpoints()

	sendReq, err := p.wrapSendMessageRequest(pubMessages)
	if err != nil {
		return nil, err
	}
	messageCommons := make([]*MessageCommon, 0)
	for _, pubMessage := range pubMessages {
		messageCommons = append(messageCommons, pubMessage.msg.GetMessageCommon())
	}
	p.cli.doBefore(MessageHookPoints_SEND, messageCommons)
	watchTime := time.Now()
	resp, err := p.cli.clientManager.SendMessage(ctx, endpoints, sendReq, p.pSetting.GetRequestTimeout())
	duration := time.Since(watchTime)
	messageHookPointsStatus := MessageHookPointsStatus_OK
	// processSendResponse
	tooManyRequests := false
	if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
		tooManyRequests = resp.GetStatus().GetCode() == v2.Code_TOO_MANY_REQUESTS
		err = &ErrRpcStatus{
			Code:    int32(resp.Status.GetCode()),
			Message: resp.GetStatus().GetMessage(),
		}
	}
	if err != nil {
		messageHookPointsStatus = MessageHookPointsStatus_ERROR
	}
	p.cli.doAfter(MessageHookPoints_SEND, messageCommons, duration, messageHookPointsStatus)
	maxAttempts := p.getRetryMaxAttempts()
	if err != nil {
		messageIds := make([]string, 0)
		for _, pubMessage := range pubMessages {
			messageIds = append(messageIds, pubMessage.messageId)
		}
		// retry
		for _, address := range endpoints.GetAddresses() {
			p.isolated.Store(utils.ParseAddress(address), true)
		}
		if attempt >= maxAttempts {
			p.cli.log.Errorf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
				topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
			return nil, err
		}
		// No need more attempts for transactional message.
		if messageType == v2.MessageType_TRANSACTION {
			p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v,  maxAttempts=%d, attempt=%d, endpoints=%s, requestId=%s",
				topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
			return nil, err
		}
		// Try to do more attempts.
		nextAttempt := attempt + 1
		// Retry immediately if the request is not throttled.
		if tooManyRequests {
			waitTime := p.getNextAttemptDelay(nextAttempt)
			p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
				waitTime, topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
			time.Sleep(waitTime)
		} else {
			p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
				topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
		}
		return p.send1(ctx, topic, messageType, candidates, pubMessages, nextAttempt)
	}

	var res []*SendReceipt
	for i := 0; i < len(resp.GetEntries()); i++ {
		res = append(res, &SendReceipt{
			MessageID:     resp.GetEntries()[i].GetMessageId(),
			TransactionId: resp.GetEntries()[i].GetTransactionId(),
			Offset:        resp.GetEntries()[i].GetOffset(),
			Endpoints:     endpoints,
		})
	}
	if attempt > 1 {
		p.cli.log.Infof("resend message successfully, topic=%s, maxAttempts=%d, attempt=%d, endpoints=%s",
			topic, maxAttempts, attempt, endpoints.String())
	}
	return res, nil
}