in golang/message.go [197:271]
func fromProtobuf_MessageView2(message *v2.Message, messageQueue *v2.MessageQueue, deliveryTimestampFromRemote *timestamppb.Timestamp) *MessageView {
systemProperties := message.GetSystemProperties()
mv := &MessageView{
topic: message.GetTopic().GetName(),
messageId: systemProperties.GetMessageId(),
body: message.GetBody(),
}
bodyDigest := systemProperties.GetBodyDigest()
corrupted := false
checksum := bodyDigest.GetChecksum()
var expectedChecksum string
switch bodyDigest.GetType() {
case v2.DigestType_CRC32:
expectedChecksum = strconv.FormatInt(int64(crc32.ChecksumIEEE(message.GetBody())), 16)
if expectedChecksum != checksum {
corrupted = true
}
case v2.DigestType_MD5:
c := md5.New()
c.Write(message.GetBody())
expectedChecksum = hex.EncodeToString(c.Sum(nil))
if expectedChecksum != checksum {
corrupted = true
}
case v2.DigestType_SHA1:
c := sha1.New()
c.Write(message.GetBody())
expectedChecksum = hex.EncodeToString(c.Sum(nil))
if expectedChecksum != checksum {
corrupted = true
}
default:
sugarBaseLogger.Warnf("unsupported message body digest algorithm, digestType=%v, topic=%s, messageId=%s", bodyDigest.GetType(), mv.topic, mv.messageId)
}
bodyEncoding := systemProperties.GetBodyEncoding()
switch bodyEncoding {
case v2.Encoding_GZIP:
unCompressBody, err := utils.GZIPDecode(message.GetBody())
if err != nil {
sugarBaseLogger.Errorf("failed to uncompress message body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err)
corrupted = true
} else {
mv.body = unCompressBody
}
case v2.Encoding_IDENTITY:
break
default:
sugarBaseLogger.Errorf("unsupported message encoding algorithm, topic=%s, messageId=%s, bodyEncoding=%v", mv.topic, mv.messageId, bodyEncoding)
}
mv.tag = systemProperties.Tag
mv.messageGroup = systemProperties.MessageGroup
mv.keys = systemProperties.GetKeys()
mv.bornHost = &systemProperties.BornHost
mv.deliveryAttempt = systemProperties.GetDeliveryAttempt()
mv.messageQueue = messageQueue
if messageQueue != nil {
mv.endpoints = messageQueue.Broker.GetEndpoints()
}
mv.offset = systemProperties.GetQueueOffset()
mv.properties = message.GetUserProperties()
mv.ReceiptHandle = systemProperties.GetReceiptHandle()
mv.traceContext = systemProperties.TraceContext
mv.corrupted = corrupted
if systemProperties.GetDeliveryTimestamp() != nil {
deliveryTimestamp := systemProperties.GetDeliveryTimestamp().AsTime()
mv.deliveryTimestamp = &deliveryTimestamp
}
if systemProperties.GetBornTimestamp() != nil {
bornTimestamp := systemProperties.GetBornTimestamp().AsTime()
mv.bornTimestamp = &bornTimestamp
}
mv.deliveryTimestampFromRemote = deliveryTimestampFromRemote
return mv
}