in golang/producer.go [183:273]
func (p *defaultProducer) send1(ctx context.Context, topic string, messageType v2.MessageType,
candidates []*v2.MessageQueue, pubMessages []*PublishingMessage, attempt int) ([]*SendReceipt, error) {
ctx = p.cli.Sign(ctx)
idx := utils.Mod(int32(attempt)-1, len(candidates))
selectMessageQueue := candidates[idx]
if p.pSetting.IsValidateMessageType() && !utils.MatchMessageType(selectMessageQueue, messageType) {
return nil, fmt.Errorf("current message type not match with topic accept message types")
}
endpoints := selectMessageQueue.GetBroker().GetEndpoints()
sendReq, err := p.wrapSendMessageRequest(pubMessages)
if err != nil {
return nil, err
}
messageCommons := make([]*MessageCommon, 0)
for _, pubMessage := range pubMessages {
messageCommons = append(messageCommons, pubMessage.msg.GetMessageCommon())
}
p.cli.doBefore(MessageHookPoints_SEND, messageCommons)
watchTime := time.Now()
resp, err := p.cli.clientManager.SendMessage(ctx, endpoints, sendReq, p.pSetting.GetRequestTimeout())
duration := time.Since(watchTime)
messageHookPointsStatus := MessageHookPointsStatus_OK
// processSendResponse
tooManyRequests := false
if err == nil && resp.GetStatus().GetCode() != v2.Code_OK {
tooManyRequests = resp.GetStatus().GetCode() == v2.Code_TOO_MANY_REQUESTS
err = &ErrRpcStatus{
Code: int32(resp.Status.GetCode()),
Message: resp.GetStatus().GetMessage(),
}
}
if err != nil {
messageHookPointsStatus = MessageHookPointsStatus_ERROR
}
p.cli.doAfter(MessageHookPoints_SEND, messageCommons, duration, messageHookPointsStatus)
maxAttempts := p.getRetryMaxAttempts()
if err != nil {
messageIds := make([]string, 0)
for _, pubMessage := range pubMessages {
messageIds = append(messageIds, pubMessage.messageId)
}
// retry
for _, address := range endpoints.GetAddresses() {
p.isolated.Store(utils.ParseAddress(address), true)
}
if attempt >= maxAttempts {
p.cli.log.Errorf("failed to send message(s) finally, run out of attempt times, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
return nil, err
}
// No need more attempts for transactional message.
if messageType == v2.MessageType_TRANSACTION {
p.cli.log.Errorf("failed to send transactional message finally, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%s, requestId=%s",
topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
return nil, err
}
// Try to do more attempts.
nextAttempt := attempt + 1
// Retry immediately if the request is not throttled.
if tooManyRequests {
waitTime := p.getNextAttemptDelay(nextAttempt)
p.cli.log.Warnf("failed to send message due to too many requests, would attempt to resend after %v, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
waitTime, topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
time.Sleep(waitTime)
} else {
p.cli.log.Warnf("failed to send message, would attempt to resend right now, topic=%s, messageId(s)=%v, maxAttempts=%d, attempt=%d, endpoints=%v, requestId=%s",
topic, messageIds, maxAttempts, attempt, endpoints, utils.GetRequestID(ctx))
}
return p.send1(ctx, topic, messageType, candidates, pubMessages, nextAttempt)
}
var res []*SendReceipt
for i := 0; i < len(resp.GetEntries()); i++ {
res = append(res, &SendReceipt{
MessageID: resp.GetEntries()[i].GetMessageId(),
TransactionId: resp.GetEntries()[i].GetTransactionId(),
Offset: resp.GetEntries()[i].GetOffset(),
Endpoints: endpoints,
})
}
if attempt > 1 {
p.cli.log.Infof("resend message successfully, topic=%s, maxAttempts=%d, attempt=%d, endpoints=%s",
topic, maxAttempts, attempt, endpoints.String())
}
return res, nil
}