in golang/producer.go [274:335]
func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txEnabled bool) ([]*SendReceipt, error) {
// check topic Name
topicName := msgs[0].GetMessage().Topic
for _, msg := range msgs {
if msg.GetMessage().Topic != topicName {
return nil, fmt.Errorf("messages to send have different topics")
}
}
pubMessages := make([]*PublishingMessage, len(msgs))
for idx, uMsg := range msgs {
msg := uMsg.GetMessage()
var pubMessage *PublishingMessage
var err error
pubMessage = uMsg.pubMsg
if uMsg.pubMsg == nil {
pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
if err != nil {
return nil, err
}
}
pubMessages[idx] = pubMessage
}
// check message Type
messageType := pubMessages[0].messageType
for _, pubMessage := range pubMessages {
if pubMessage.messageType != messageType {
return nil, fmt.Errorf("messages to send have different types, please check")
}
}
var messageGroup *string
// Message group must be same if message type is FIFO, or no need to proceed.
if messageType == v2.MessageType_FIFO {
messageGroup = pubMessages[0].msg.GetMessageGroup()
for _, pubMessage := range pubMessages {
if pubMessage.msg.GetMessageGroup() != messageGroup {
return nil, fmt.Errorf("fifo messages to send have different message groups")
}
}
}
if _, ok := p.pSetting.topics.Load(topicName); !ok {
p.pSetting.topics.Store(topicName, &v2.Resource{
Name: topicName,
})
}
pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
if err != nil {
return nil, err
}
var candidates []*v2.MessageQueue
if messageGroup == nil {
candidates, err = p.takeMessageQueues(pubLoadBalancer)
} else {
candidates, err = pubLoadBalancer.TakeMessageQueueByMessageGroup(messageGroup)
}
if err != nil || len(candidates) == 0 {
return nil, fmt.Errorf("no broker available to sendMessage")
}
return p.send1(ctx, topicName, messageType, candidates, pubMessages, 1)
}