func()

in producer/producer.go [509:561]


func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
	msg *primitive.Message) *remote.RemotingCommand {
	if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" {
		msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID())
	}

	var (
		sysFlag      = 0
		transferBody = msg.Body
	)

	if p.tryCompressMsg(msg) {
		transferBody = msg.CompressedBody
		sysFlag = primitive.SetCompressedFlag(sysFlag)
	}
	v := msg.GetProperty(primitive.PropertyTransactionPrepared)
	if v != "" {
		tranMsg, err := strconv.ParseBool(v)
		if err == nil && tranMsg {
			sysFlag |= primitive.TransactionPreparedType
		}
	}

	req := &internal.SendMessageRequestHeader{
		ProducerGroup:         p.group,
		Topic:                 mq.Topic,
		QueueId:               mq.QueueId,
		SysFlag:               sysFlag,
		BornTimestamp:         time.Now().UnixNano() / int64(time.Millisecond),
		Flag:                  msg.Flag,
		Properties:            msg.MarshallProperties(),
		ReconsumeTimes:        0,
		UnitMode:              p.options.UnitMode,
		Batch:                 msg.Batch,
		DefaultTopic:          p.options.CreateTopicKey,
		DefaultTopicQueueNums: p.options.DefaultTopicQueueNums,
		BrokerName:            mq.BrokerName,
	}

	msgType := msg.GetProperty(primitive.PropertyMsgType)
	if msgType == internal.ReplyMessageFlag {
		return remote.NewRemotingCommand(internal.ReqSendReplyMessage, req, msg.Body)
	}

	cmd := internal.ReqSendMessage
	if msg.Batch {
		cmd = internal.ReqSendBatchMessage
		reqv2 := &internal.SendMessageRequestV2Header{SendMessageRequestHeader: req}
		return remote.NewRemotingCommand(cmd, reqv2, transferBody)
	}

	return remote.NewRemotingCommand(cmd, req, transferBody)
}