golang/message.go (300 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package golang import ( "crypto/md5" "crypto/sha1" "encoding/hex" "hash/crc32" "strconv" "time" "github.com/apache/rocketmq-clients/golang/v5/pkg/utils" v2 "github.com/apache/rocketmq-clients/golang/v5/protocol/v2" "google.golang.org/protobuf/types/known/timestamppb" ) type UnifiedMessage struct { msg *Message pubMsg *PublishingMessage } func (uMsg *UnifiedMessage) GetMessage() *Message { if uMsg.pubMsg != nil { return uMsg.pubMsg.msg } return uMsg.msg } type MessageHookPointsStatus int32 const ( MessageHookPointsStatus_UNSET MessageHookPointsStatus = iota MessageHookPointsStatus_OK MessageHookPointsStatus_ERROR ) type MessageHookPoints int32 const ( MessageHookPoints_SEND MessageHookPoints = iota MessageHookPoints_RECEIVE MessageHookPoints_CONSUME MessageHookPoints_ACK MessageHookPoints_CHANGE_INVISIBLE_DURATION MessageHookPoints_COMMIT_TRANSACTION MessageHookPoints_ROLLBACK_TRANSACTION MessageHookPoints_FORWARD_TO_DLQ ) type MessageInterceptor interface { doBefore(messageHookPoints MessageHookPoints, messageCommons []*MessageCommon) error doAfter(messageHookPoints MessageHookPoints, messageCommons []*MessageCommon, duration time.Duration, status MessageHookPointsStatus) error } type Message struct { Topic string Body []byte Tag *string messageGroup *string keys []string properties map[string]string deliveryTimestamp *time.Time parentTraceContext *string } type SendReceipt struct { MessageID string TransactionId string Offset int64 Endpoints *v2.Endpoints } func (msg *Message) SetTag(tag string) { msg.Tag = &tag } func (msg *Message) GetTag() *string { return msg.Tag } func (msg *Message) GetKeys() []string { return msg.keys } func (msg *Message) SetKeys(keys ...string) { msg.keys = keys } func (msg *Message) getOrNewProperties() map[string]string { if msg.properties == nil { msg.properties = make(map[string]string) } return msg.properties } func (msg *Message) GetProperties() map[string]string { return msg.getOrNewProperties() } func (msg *Message) AddProperty(key, value string) { msg.getOrNewProperties()[key] = value } func (msg *Message) SetDelayTimestamp(deliveryTimestamp time.Time) { msg.deliveryTimestamp = &deliveryTimestamp } func (msg *Message) GetDeliveryTimestamp() *time.Time { return msg.deliveryTimestamp } func (msg *Message) SetMessageGroup(messageGroup string) { msg.messageGroup = &messageGroup } func (msg *Message) GetMessageGroup() *string { return msg.messageGroup } func (msg *Message) GetMessageCommon() *MessageCommon { return &MessageCommon{ topic: msg.Topic, body: msg.Body, tag: msg.Tag, messageGroup: msg.messageGroup, deliveryTimestamp: msg.deliveryTimestamp, parentTraceContext: msg.parentTraceContext, keys: msg.keys, properties: msg.properties, } } type MessageCommon struct { messageId *string topic string body []byte properties map[string]string tag *string keys []string messageGroup *string deliveryTimestamp *time.Time bornHost *string parentTraceContext *string traceContext *string bornTimestamp *time.Time deliveryAttempt int32 decodeStopwatch *time.Time deliveryTimestampFromRemote *timestamppb.Timestamp } type MessageView struct { messageId string topic string body []byte properties map[string]string tag *string keys []string messageGroup *string deliveryTimestamp *time.Time bornHost *string traceContext *string bornTimestamp *time.Time messageQueue *v2.MessageQueue endpoints *v2.Endpoints deliveryAttempt int32 decodeStopwatch *time.Time deliveryTimestampFromRemote *timestamppb.Timestamp offset int64 ReceiptHandle string corrupted bool } func fromProtobuf_MessageView0(message *v2.Message) *MessageView { return fromProtobuf_MessageView1(message, nil) } func fromProtobuf_MessageView1(message *v2.Message, messageQueue *v2.MessageQueue) *MessageView { return fromProtobuf_MessageView2(message, messageQueue, nil) } func fromProtobuf_MessageView2(message *v2.Message, messageQueue *v2.MessageQueue, deliveryTimestampFromRemote *timestamppb.Timestamp) *MessageView { systemProperties := message.GetSystemProperties() mv := &MessageView{ topic: message.GetTopic().GetName(), messageId: systemProperties.GetMessageId(), body: message.GetBody(), } bodyDigest := systemProperties.GetBodyDigest() corrupted := false checksum := bodyDigest.GetChecksum() var expectedChecksum string switch bodyDigest.GetType() { case v2.DigestType_CRC32: expectedChecksum = strconv.FormatInt(int64(crc32.ChecksumIEEE(message.GetBody())), 16) if expectedChecksum != checksum { corrupted = true } case v2.DigestType_MD5: c := md5.New() c.Write(message.GetBody()) expectedChecksum = hex.EncodeToString(c.Sum(nil)) if expectedChecksum != checksum { corrupted = true } case v2.DigestType_SHA1: c := sha1.New() c.Write(message.GetBody()) expectedChecksum = hex.EncodeToString(c.Sum(nil)) if expectedChecksum != checksum { corrupted = true } default: sugarBaseLogger.Warnf("unsupported message body digest algorithm, digestType=%v, topic=%s, messageId=%s", bodyDigest.GetType(), mv.topic, mv.messageId) } bodyEncoding := systemProperties.GetBodyEncoding() switch bodyEncoding { case v2.Encoding_GZIP: unCompressBody, err := utils.AutoDecode(message.GetBody()) if err != nil { sugarBaseLogger.Errorf("failed to uncompress message body, topic=%s, messageId=%s, err=%w", mv.topic, mv.messageId, err) corrupted = true } else { mv.body = unCompressBody } case v2.Encoding_IDENTITY: break default: sugarBaseLogger.Errorf("unsupported message encoding algorithm, topic=%s, messageId=%s, bodyEncoding=%v", mv.topic, mv.messageId, bodyEncoding) } mv.tag = systemProperties.Tag mv.messageGroup = systemProperties.MessageGroup mv.keys = systemProperties.GetKeys() mv.bornHost = &systemProperties.BornHost mv.deliveryAttempt = systemProperties.GetDeliveryAttempt() mv.messageQueue = messageQueue if messageQueue != nil { mv.endpoints = messageQueue.Broker.GetEndpoints() } mv.offset = systemProperties.GetQueueOffset() mv.properties = message.GetUserProperties() mv.ReceiptHandle = systemProperties.GetReceiptHandle() mv.traceContext = systemProperties.TraceContext mv.corrupted = corrupted if systemProperties.GetDeliveryTimestamp() != nil { deliveryTimestamp := systemProperties.GetDeliveryTimestamp().AsTime() mv.deliveryTimestamp = &deliveryTimestamp } if systemProperties.GetBornTimestamp() != nil { bornTimestamp := systemProperties.GetBornTimestamp().AsTime() mv.bornTimestamp = &bornTimestamp } mv.deliveryTimestampFromRemote = deliveryTimestampFromRemote return mv } func (mv *MessageView) isCorrupted() bool { return mv.corrupted } func (msg *MessageView) GetMessageCommon() *MessageCommon { return &MessageCommon{ messageId: &msg.messageId, topic: msg.topic, body: msg.body, tag: msg.tag, messageGroup: msg.messageGroup, deliveryTimestamp: msg.deliveryTimestamp, keys: msg.keys, properties: msg.properties, bornHost: msg.bornHost, traceContext: msg.traceContext, bornTimestamp: msg.bornTimestamp, deliveryAttempt: msg.deliveryAttempt, decodeStopwatch: msg.decodeStopwatch, deliveryTimestampFromRemote: msg.deliveryTimestampFromRemote, } } func (msg *MessageView) GetMessageId() string { return msg.messageId } func (msg *MessageView) GetTopic() string { return msg.topic } func (msg *MessageView) GetBody() []byte { return msg.body } func (msg *MessageView) GetProperties() map[string]string { return msg.properties } func (msg *MessageView) GetTag() *string { return msg.tag } func (msg *MessageView) GetKeys() []string { return msg.keys } func (msg *MessageView) GetMessageGroup() *string { return msg.messageGroup } func (msg *MessageView) SetTag(tag string) { msg.tag = &tag } func (msg *MessageView) GetDeliveryTimestamp() *time.Time { return msg.deliveryTimestamp } func (msg *MessageView) GetBornHost() *string { return msg.bornHost } func (msg *MessageView) GetBornTimestamp() *time.Time { return msg.bornTimestamp } func (msg *MessageView) GetDeliveryAttempt() int32 { return msg.deliveryAttempt } func (msg *MessageView) GetTraceContext() *string { return msg.traceContext } func (msg *MessageView) GetReceiptHandle() string { return msg.ReceiptHandle } func (msg *MessageView) GetOffset() int64 { return msg.offset } func (msg *MessageView) SetKeys(keys ...string) { msg.keys = keys } func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time) { msg.deliveryTimestamp = &deliveryTimestamp } func (msg *MessageView) SetMessageGroup(messageGroup string) { msg.messageGroup = &messageGroup }