func DecodeMessage()

in primitive/message.go [284:409]


func DecodeMessage(data []byte) []*MessageExt {
	msgs := make([]*MessageExt, 0)
	buf := bytes.NewBuffer(data)
	count := 0
	for count < len(data) {
		msg := &MessageExt{}
		msg.Queue = &MessageQueue{}

		// 1. total size
		binary.Read(buf, binary.BigEndian, &msg.StoreSize)
		count += 4

		// 2. magic code
		buf.Next(4)
		count += 4

		// 3. body CRC32
		binary.Read(buf, binary.BigEndian, &msg.BodyCRC)
		count += 4

		// 4. queueID
		var qId int32
		binary.Read(buf, binary.BigEndian, &qId)
		msg.Queue.QueueId = int(qId)
		count += 4

		// 5. Flag
		binary.Read(buf, binary.BigEndian, &msg.Flag)
		count += 4

		// 6. QueueOffset
		binary.Read(buf, binary.BigEndian, &msg.QueueOffset)
		count += 8

		// 7. physical offset
		binary.Read(buf, binary.BigEndian, &msg.CommitLogOffset)
		count += 8

		// 8. SysFlag
		binary.Read(buf, binary.BigEndian, &msg.SysFlag)
		count += 4

		// 9. BornTimestamp
		binary.Read(buf, binary.BigEndian, &msg.BornTimestamp)
		count += 8

		var (
			port      int32
			hostBytes []byte
		)
		// 10. born host
		if msg.SysFlag&FlagBornHostV6 == FlagBornHostV6 {
			hostBytes = buf.Next(16)
			binary.Read(buf, binary.BigEndian, &port)
			msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
			count += 20
		} else {
			hostBytes = buf.Next(4)
			binary.Read(buf, binary.BigEndian, &port)
			msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
			count += 8
		}

		// 11. store timestamp
		binary.Read(buf, binary.BigEndian, &msg.StoreTimestamp)
		count += 8

		// 12. store host
		if msg.SysFlag&FlagStoreHostV6 == FlagStoreHostV6 {
			hostBytes = buf.Next(16)
			binary.Read(buf, binary.BigEndian, &port)
			msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
			count += 20
		} else {
			hostBytes = buf.Next(4)
			binary.Read(buf, binary.BigEndian, &port)
			msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
			count += 8
		}

		// 13. reconsume times
		binary.Read(buf, binary.BigEndian, &msg.ReconsumeTimes)
		count += 4

		// 14. prepared transaction offset
		binary.Read(buf, binary.BigEndian, &msg.PreparedTransactionOffset)
		count += 8

		// 15. body
		var length int32
		binary.Read(buf, binary.BigEndian, &length)
		msg.Body = buf.Next(int(length))
		if (msg.SysFlag & FlagCompressed) == FlagCompressed {
			msg.Body = utils.UnCompress(msg.Body)
		}
		count += 4 + int(length)

		// 16. topic
		_byte, _ := buf.ReadByte()
		msg.Topic = string(buf.Next(int(_byte)))
		count += 1 + int(_byte)

		// 17. properties
		var propertiesLength int16
		binary.Read(buf, binary.BigEndian, &propertiesLength)
		if propertiesLength > 0 {
			msg.UnmarshalProperties(buf.Next(int(propertiesLength)))
		}
		count += 2 + int(propertiesLength)

		msg.OffsetMsgId = CreateMessageId(hostBytes, port, msg.CommitLogOffset)
		//count += 16
		if msg.properties == nil {
			msg.properties = make(map[string]string, 0)
		}
		msgID := msg.GetProperty(PropertyUniqueClientMessageIdKeyIndex)
		if len(msgID) == 0 {
			msg.MsgId = msg.OffsetMsgId
		} else {
			msg.MsgId = msgID
		}
		msgs = append(msgs, msg)
	}

	return msgs
}