in internal/client.go [728:772]
func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) error {
var status primitive.SendStatus
switch cmd.Code {
case ResFlushDiskTimeout:
status = primitive.SendFlushDiskTimeout
case ResFlushSlaveTimeout:
status = primitive.SendFlushSlaveTimeout
case ResSlaveNotAvailable:
status = primitive.SendSlaveNotAvailable
case ResSuccess:
status = primitive.SendOK
default:
return fmt.Errorf("CODE: %d, DESC: %s", cmd.Code, cmd.Remark)
}
msgIDs := make([]string, 0)
for i := 0; i < len(msgs); i++ {
msgIDs = append(msgIDs, msgs[i].GetProperty(primitive.PropertyUniqueClientMessageIdKeyIndex))
}
uniqueMsgId := strings.Join(msgIDs, ",")
regionId := cmd.ExtFields[primitive.PropertyMsgRegion]
trace := cmd.ExtFields[primitive.PropertyTraceSwitch]
if regionId == "" {
regionId = defaultTraceRegionID
}
qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
resp.Status = status
resp.MsgID = uniqueMsgId
resp.OffsetMsgID = cmd.ExtFields["msgId"]
resp.MessageQueue = &primitive.MessageQueue{
Topic: msgs[0].Topic,
BrokerName: brokerName,
QueueId: qId,
}
resp.QueueOffset = off
resp.TransactionID = cmd.ExtFields["transactionId"]
resp.RegionID = regionId
resp.TraceOn = trace != "" && trace != _TraceOff
return nil
}