in primitive/message.go [278:403]
func DecodeMessage(data []byte) []*MessageExt {
msgs := make([]*MessageExt, 0)
buf := bytes.NewBuffer(data)
count := 0
for count < len(data) {
msg := &MessageExt{}
msg.Queue = &MessageQueue{}
// 1. total size
binary.Read(buf, binary.BigEndian, &msg.StoreSize)
count += 4
// 2. magic code
buf.Next(4)
count += 4
// 3. body CRC32
binary.Read(buf, binary.BigEndian, &msg.BodyCRC)
count += 4
// 4. queueID
var qId int32
binary.Read(buf, binary.BigEndian, &qId)
msg.Queue.QueueId = int(qId)
count += 4
// 5. Flag
binary.Read(buf, binary.BigEndian, &msg.Flag)
count += 4
// 6. QueueOffset
binary.Read(buf, binary.BigEndian, &msg.QueueOffset)
count += 8
// 7. physical offset
binary.Read(buf, binary.BigEndian, &msg.CommitLogOffset)
count += 8
// 8. SysFlag
binary.Read(buf, binary.BigEndian, &msg.SysFlag)
count += 4
// 9. BornTimestamp
binary.Read(buf, binary.BigEndian, &msg.BornTimestamp)
count += 8
var (
port int32
hostBytes []byte
)
// 10. born host
if msg.SysFlag&FlagBornHostV6 == FlagBornHostV6 {
hostBytes = buf.Next(16)
binary.Read(buf, binary.BigEndian, &port)
msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
count += 20
} else {
hostBytes = buf.Next(4)
binary.Read(buf, binary.BigEndian, &port)
msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
count += 8
}
// 11. store timestamp
binary.Read(buf, binary.BigEndian, &msg.StoreTimestamp)
count += 8
// 12. store host
if msg.SysFlag&FlagStoreHostV6 == FlagStoreHostV6 {
hostBytes = buf.Next(16)
binary.Read(buf, binary.BigEndian, &port)
msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
count += 20
} else {
hostBytes = buf.Next(4)
binary.Read(buf, binary.BigEndian, &port)
msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
count += 8
}
// 13. reconsume times
binary.Read(buf, binary.BigEndian, &msg.ReconsumeTimes)
count += 4
// 14. prepared transaction offset
binary.Read(buf, binary.BigEndian, &msg.PreparedTransactionOffset)
count += 8
// 15. body
var length int32
binary.Read(buf, binary.BigEndian, &length)
msg.Body = buf.Next(int(length))
if (msg.SysFlag & FlagCompressed) == FlagCompressed {
msg.Body = utils.UnCompress(msg.Body)
}
count += 4 + int(length)
// 16. topic
_byte, _ := buf.ReadByte()
msg.Topic = string(buf.Next(int(_byte)))
count += 1 + int(_byte)
// 17. properties
var propertiesLength int16
binary.Read(buf, binary.BigEndian, &propertiesLength)
if propertiesLength > 0 {
msg.UnmarshalProperties(buf.Next(int(propertiesLength)))
}
count += 2 + int(propertiesLength)
msg.OffsetMsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
//count += 16
if msg.properties == nil {
msg.properties = make(map[string]string, 0)
}
msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
if len(msgID) == 0 {
msg.MsgId = msg.OffsetMsgId
} else {
msg.MsgId = msgID
}
msgs = append(msgs, msg)
}
return msgs
}