func fromProtobuf_MessageView2()

in golang/message.go [197:271]


func fromProtobuf_MessageView2(message *v2.Message, messageQueue *v2.MessageQueue, deliveryTimestampFromRemote *timestamppb.Timestamp) *MessageView {
	systemProperties := message.GetSystemProperties()
	mv := &MessageView{
		topic:     message.GetTopic().GetName(),
		messageId: systemProperties.GetMessageId(),
		body:      message.GetBody(),
	}
	bodyDigest := systemProperties.GetBodyDigest()
	corrupted := false
	checksum := bodyDigest.GetChecksum()
	var expectedChecksum string
	switch bodyDigest.GetType() {
	case v2.DigestType_CRC32:
		expectedChecksum = strconv.FormatInt(int64(crc32.ChecksumIEEE(message.GetBody())), 16)
		if expectedChecksum != checksum {
			corrupted = true
		}
	case v2.DigestType_MD5:
		c := md5.New()
		c.Write(message.GetBody())
		expectedChecksum = hex.EncodeToString(c.Sum(nil))
		if expectedChecksum != checksum {
			corrupted = true
		}
	case v2.DigestType_SHA1:
		c := sha1.New()
		c.Write(message.GetBody())
		expectedChecksum = hex.EncodeToString(c.Sum(nil))
		if expectedChecksum != checksum {
			corrupted = true
		}
	default:
		sugarBaseLogger.Warnf("unsupported message body digest algorithm, digestType=%v, topic=%s, messageId=%s", bodyDigest.GetType(), mv.topic, mv.messageId)
	}
	bodyEncoding := systemProperties.GetBodyEncoding()
	switch bodyEncoding {
	case v2.Encoding_GZIP:
		unCompressBody, err := utils.GZIPDecode(message.GetBody())
		if err != nil {
			sugarBaseLogger.Errorf("failed to uncompress message body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
			corrupted = true
		} else {
			mv.body = unCompressBody
		}
	case v2.Encoding_IDENTITY:
		break
	default:
		sugarBaseLogger.Errorf("unsupported message encoding algorithm, topic=%s, messageId=%s, bodyEncoding=%v", mv.topic, mv.messageId, bodyEncoding)
	}
	mv.tag = systemProperties.Tag
	mv.messageGroup = systemProperties.MessageGroup

	mv.keys = systemProperties.GetKeys()
	mv.bornHost = &systemProperties.BornHost
	mv.deliveryAttempt = systemProperties.GetDeliveryAttempt()
	mv.messageQueue = messageQueue
	if messageQueue != nil {
		mv.endpoints = messageQueue.Broker.GetEndpoints()
	}
	mv.offset = systemProperties.GetQueueOffset()
	mv.properties = message.GetUserProperties()
	mv.ReceiptHandle = systemProperties.GetReceiptHandle()
	mv.traceContext = systemProperties.TraceContext
	mv.corrupted = corrupted
	if systemProperties.GetDeliveryTimestamp() != nil {
		deliveryTimestamp := systemProperties.GetDeliveryTimestamp().AsTime()
		mv.deliveryTimestamp = &deliveryTimestamp
	}
	if systemProperties.GetBornTimestamp() != nil {
		bornTimestamp := systemProperties.GetBornTimestamp().AsTime()
		mv.bornTimestamp = &bornTimestamp
	}
	mv.deliveryTimestampFromRemote = deliveryTimestampFromRemote
	return mv
}