csharp/rocketmq-client-csharp/PublishingMessage.cs (82 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.IO; using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using Proto = Apache.Rocketmq.V2; using Org.Apache.Rocketmq.Error; namespace Org.Apache.Rocketmq { /// <summary> /// Provides a view for message to publish. /// </summary> public class PublishingMessage : Message { public MessageType MessageType { get; } internal string MessageId { get; } public PublishingMessage(Message message, PublishingSettings publishingSettings, bool txEnabled) : base( message) { var maxBodySizeBytes = publishingSettings.GetMaxBodySizeBytes(); if (message.Body.Length > maxBodySizeBytes) { throw new IOException($"Message body size exceed the threshold, max size={maxBodySizeBytes} bytes"); } // Generate message id. MessageId = MessageIdGenerator.GetInstance().Next(); // For NORMAL message. if (string.IsNullOrEmpty(message.MessageGroup) && !message.DeliveryTimestamp.HasValue && !txEnabled) { MessageType = MessageType.Normal; return; } // For FIFO message. if (!string.IsNullOrEmpty(message.MessageGroup) && !txEnabled) { MessageType = MessageType.Fifo; return; } // For DELAY message. if (message.DeliveryTimestamp.HasValue && !txEnabled) { MessageType = MessageType.Delay; return; } // For TRANSACTION message. if (!string.IsNullOrEmpty(message.MessageGroup) || message.DeliveryTimestamp.HasValue || !txEnabled) { throw new InternalErrorException( "Transactional message should not set messageGroup or deliveryTimestamp"); } MessageType = MessageType.Transaction; } public Proto::Message ToProtobuf(int queueId) { var systemProperties = new Proto.SystemProperties { Keys = { Keys }, MessageId = MessageId, BornTimestamp = Timestamp.FromDateTime(DateTime.UtcNow), BornHost = Utilities.GetHostName(), BodyEncoding = EncodingHelper.ToProtobuf(MqEncoding.Identity), QueueId = queueId, MessageType = MessageTypeHelper.ToProtobuf(MessageType) }; if (null != Tag) { systemProperties.Tag = Tag; } if (DeliveryTimestamp.HasValue) { systemProperties.DeliveryTimestamp = Timestamp.FromDateTime(DeliveryTimestamp.Value.ToUniversalTime()); } if (null != MessageGroup) { systemProperties.MessageGroup = MessageGroup; } var topicResource = new Proto.Resource { Name = Topic }; return new Proto.Message { Topic = topicResource, Body = ByteString.CopyFrom(Body), SystemProperties = systemProperties, UserProperties = { Properties } }; } } }