func()

in golang/producer.go [274:335]


func (p *defaultProducer) send0(ctx context.Context, msgs []*UnifiedMessage, txEnabled bool) ([]*SendReceipt, error) {
	// check topic Name
	topicName := msgs[0].GetMessage().Topic
	for _, msg := range msgs {
		if msg.GetMessage().Topic != topicName {
			return nil, fmt.Errorf("messages to send have different topics")
		}
	}

	pubMessages := make([]*PublishingMessage, len(msgs))
	for idx, uMsg := range msgs {
		msg := uMsg.GetMessage()
		var pubMessage *PublishingMessage
		var err error
		pubMessage = uMsg.pubMsg
		if uMsg.pubMsg == nil {
			pubMessage, err = NewPublishingMessage(msg, p.pSetting, txEnabled)
			if err != nil {
				return nil, err
			}
		}
		pubMessages[idx] = pubMessage
	}

	// check message Type
	messageType := pubMessages[0].messageType
	for _, pubMessage := range pubMessages {
		if pubMessage.messageType != messageType {
			return nil, fmt.Errorf("messages to send have different types, please check")
		}
	}

	var messageGroup *string
	// Message group must be same if message type is FIFO, or no need to proceed.
	if messageType == v2.MessageType_FIFO {
		messageGroup = pubMessages[0].msg.GetMessageGroup()
		for _, pubMessage := range pubMessages {
			if pubMessage.msg.GetMessageGroup() != messageGroup {
				return nil, fmt.Errorf("fifo messages to send have different message groups")
			}
		}
	}
	if _, ok := p.pSetting.topics.Load(topicName); !ok {
		p.pSetting.topics.Store(topicName, &v2.Resource{
			Name: topicName,
		})
	}
	pubLoadBalancer, err := p.getPublishingTopicRouteResult(ctx, topicName)
	if err != nil {
		return nil, err
	}
	var candidates []*v2.MessageQueue
	if messageGroup == nil {
		candidates, err = p.takeMessageQueues(pubLoadBalancer)
	} else {
		candidates, err = pubLoadBalancer.TakeMessageQueueByMessageGroup(messageGroup)
	}
	if err != nil || len(candidates) == 0 {
		return nil, fmt.Errorf("no broker available to sendMessage")
	}
	return p.send1(ctx, topicName, messageType, candidates, pubMessages, 1)
}