in producer/producer.go [509:561]
func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue,
msg *primitive.Message) *remote.RemotingCommand {
if !msg.Batch && msg.GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex) == "" {
msg.WithProperty(primitive.PropertyUniqueClientMessageIdKeyIndex, primitive.CreateUniqID())
}
var (
sysFlag = 0
transferBody = msg.Body
)
if p.tryCompressMsg(msg) {
transferBody = msg.CompressedBody
sysFlag = primitive.SetCompressedFlag(sysFlag)
}
v := msg.GetProperty(primitive.PropertyTransactionPrepared)
if v != "" {
tranMsg, err := strconv.ParseBool(v)
if err == nil && tranMsg {
sysFlag |= primitive.TransactionPreparedType
}
}
req := &internal.SendMessageRequestHeader{
ProducerGroup: p.group,
Topic: mq.Topic,
QueueId: mq.QueueId,
SysFlag: sysFlag,
BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
Flag: msg.Flag,
Properties: msg.MarshallProperties(),
ReconsumeTimes: 0,
UnitMode: p.options.UnitMode,
Batch: msg.Batch,
DefaultTopic: p.options.CreateTopicKey,
DefaultTopicQueueNums: p.options.DefaultTopicQueueNums,
BrokerName: mq.BrokerName,
}
msgType := msg.GetProperty(primitive.PropertyMsgType)
if msgType == internal.ReplyMessageFlag {
return remote.NewRemotingCommand(internal.ReqSendReplyMessage, req, msg.Body)
}
cmd := internal.ReqSendMessage
if msg.Batch {
cmd = internal.ReqSendBatchMessage
reqv2 := &internal.SendMessageRequestV2Header{SendMessageRequestHeader: req}
return remote.NewRemotingCommand(cmd, reqv2, transferBody)
}
return remote.NewRemotingCommand(cmd, req, transferBody)
}