iothub/device/src/Message.cs (248 lines of code) (raw):

// Copyright (c) Microsoft. All rights reserved. // Licensed under the MIT license. See LICENSE file in the project root for full license information. using System; using System.Collections.Generic; using System.Diagnostics.CodeAnalysis; using System.IO; using System.Threading; using Microsoft.Azure.Devices.Client.Common.Api; using Microsoft.Azure.Devices.Shared; namespace Microsoft.Azure.Devices.Client { /// <summary> /// The data structure represent the message that is used for interacting with IotHub. /// </summary> public sealed class Message : IReadOnlyIndicator, IDisposable { private volatile Stream _bodyStream; private bool _disposed; private StreamDisposalResponsibility _streamDisposalResponsibility; private const long StreamCannotSeek = -1; private long _originalStreamPosition = StreamCannotSeek; private int _getBodyCalled; private long _sizeInBytesCalled; /// <summary> /// Default constructor with no body data /// </summary> public Message() { Properties = new ReadOnlyDictionary45<string, string>(new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase), this); SystemProperties = new ReadOnlyDictionary45<string, object>(new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase), this); InitializeWithStream(Stream.Null, StreamDisposalResponsibility.Sdk); } /// <summary> /// Constructor which uses the argument stream as the body stream. /// </summary> /// <remarks>User is expected to own the disposing of the stream when using this constructor.</remarks> /// <param name="stream">A stream which will be used as body stream.</param> // UWP cannot expose a method with System.IO.Stream in signature. TODO: consider adding an IRandomAccessStream overload public Message(Stream stream) : this() { if (stream != null) { InitializeWithStream(stream, StreamDisposalResponsibility.App); } } /// <summary> /// Constructor which uses the input byte array as the body. /// </summary> /// <remarks>User should treat the input byte array as immutable when sending the message.</remarks> /// <param name="byteArray">A byte array which will be used to form the body stream.</param> public Message(byte[] byteArray) : this(new MemoryStream(byteArray)) { // Reset the owning of the stream _streamDisposalResponsibility = StreamDisposalResponsibility.Sdk; } /// <summary> /// This constructor is only used on the Gateway HTTP path so that we can clean up the stream. /// </summary> /// <param name="stream">A stream which will be used as body stream.</param> /// <param name="streamDisposalResponsibility">Indicates if the stream passed in should be disposed by the /// client library, or by the calling application.</param> internal Message(Stream stream, StreamDisposalResponsibility streamDisposalResponsibility) : this(stream) { _streamDisposalResponsibility = streamDisposalResponsibility; } /// <summary> /// [Required for two way requests] Used to correlate two-way communication. /// Format: A case-sensitive string ( up to 128 char long) of ASCII 7-bit alphanumeric chars /// + {'-', ':', '/', '\', '.', '+', '%', '_', '#', '*', '?', '!', '(', ')', ',', '=', '@', ';', '$', '''}. /// Non-alphanumeric characters are from URN RFC. /// </summary> public string MessageId { get => GetSystemProperty<string>(MessageSystemPropertyNames.MessageId); set => SystemProperties[MessageSystemPropertyNames.MessageId] = value; } /// <summary> /// [Required] Destination of the message /// </summary> public string To { get => GetSystemProperty<string>(MessageSystemPropertyNames.To); set => SystemProperties[MessageSystemPropertyNames.To] = value; } /// <summary> /// [Optional] The time when this message is considered expired /// </summary> public DateTime ExpiryTimeUtc { get => GetSystemProperty<DateTime>(MessageSystemPropertyNames.ExpiryTimeUtc); internal set => SystemProperties[MessageSystemPropertyNames.ExpiryTimeUtc] = value; } /// <summary> /// Used in message responses and feedback /// </summary> public string CorrelationId { get => GetSystemProperty<string>(MessageSystemPropertyNames.CorrelationId); set => SystemProperties[MessageSystemPropertyNames.CorrelationId] = value; } /// <summary> /// [Required] SequenceNumber of the received message /// </summary> public ulong SequenceNumber { get => GetSystemProperty<ulong>(MessageSystemPropertyNames.SequenceNumber); internal set => SystemProperties[MessageSystemPropertyNames.SequenceNumber] = value; } /// <summary> /// [Required] LockToken of the received message /// </summary> public string LockToken { get => GetSystemProperty<string>(MessageSystemPropertyNames.LockToken); internal set => SystemProperties[MessageSystemPropertyNames.LockToken] = value; } /// <summary> /// Date and time when the device-to-cloud message was received by the server. /// </summary> public DateTime EnqueuedTimeUtc { get => GetSystemProperty<DateTime>(MessageSystemPropertyNames.EnqueuedTime); internal set => SystemProperties[MessageSystemPropertyNames.EnqueuedTime] = value; } /// <summary> /// Number of times the message has been previously delivered /// </summary> public uint DeliveryCount { get => GetSystemProperty<byte>(MessageSystemPropertyNames.DeliveryCount); internal set => SystemProperties[MessageSystemPropertyNames.DeliveryCount] = (byte)value; } /// <summary> /// [Required in feedback messages] Used to specify the origin of messages generated by device hub. /// Possible value: “{hub name}/” /// </summary> public string UserId { get => GetSystemProperty<string>(MessageSystemPropertyNames.UserId); set => SystemProperties[MessageSystemPropertyNames.UserId] = value; } /// <summary> /// For outgoing messages, contains the Mqtt topic that the message is being sent to /// For incoming messages, contains the Mqtt topic that the message arrived on /// </summary> internal string MqttTopicName { get; set; } /// <summary> /// Used to specify the schema of the message content. /// </summary> public string MessageSchema { get => GetSystemProperty<string>(MessageSystemPropertyNames.MessageSchema); set => SystemProperties[MessageSystemPropertyNames.MessageSchema] = value; } /// <summary> /// Custom date property set by the originator of the message. /// </summary> public DateTime CreationTimeUtc { get => GetSystemProperty<DateTime>(MessageSystemPropertyNames.CreationTimeUtc); set => SystemProperties[MessageSystemPropertyNames.CreationTimeUtc] = value; } /// <summary> /// True if the message is set as a security message /// </summary> public bool IsSecurityMessage => CommonConstants.SecurityMessageInterfaceId.Equals( GetSystemProperty<string>(MessageSystemPropertyNames.InterfaceId), StringComparison.Ordinal); /// <summary> /// Used to specify the content type of the message. /// </summary> public string ContentType { get => GetSystemProperty<string>(MessageSystemPropertyNames.ContentType); set => SystemProperties[MessageSystemPropertyNames.ContentType] = value; } /// <summary> /// Specifies the input name on which the message was sent, if there was one. /// </summary> public string InputName { get => GetSystemProperty<string>(MessageSystemPropertyNames.InputName); internal set => SystemProperties[MessageSystemPropertyNames.InputName] = value; } /// <summary> /// Specifies the device Id from which this message was sent, if there is one. /// </summary> public string ConnectionDeviceId { get => GetSystemProperty<string>(MessageSystemPropertyNames.ConnectionDeviceId); internal set => SystemProperties[MessageSystemPropertyNames.ConnectionDeviceId] = value; } /// <summary> /// Specifies the module Id from which this message was sent, if there is one. /// </summary> public string ConnectionModuleId { get => GetSystemProperty<string>(MessageSystemPropertyNames.ConnectionModuleId); internal set => SystemProperties[MessageSystemPropertyNames.ConnectionModuleId] = value; } /// <summary> /// Used to specify the content encoding type of the message. /// </summary> public string ContentEncoding { get => GetSystemProperty<string>(MessageSystemPropertyNames.ContentEncoding); set => SystemProperties[MessageSystemPropertyNames.ContentEncoding] = value; } /// <summary> /// The DTDL component name from where the telemetry message has originated. /// This is relevant only for plug and play certified devices. /// </summary> public string ComponentName { get => GetSystemProperty<string>(MessageSystemPropertyNames.ComponentName); set => SystemProperties[MessageSystemPropertyNames.ComponentName] = value; } /// <summary> /// Gets the dictionary of user properties which are set when user send the data. /// </summary> public IDictionary<string, string> Properties { get; private set; } /// <summary> /// Gets the dictionary of system properties which are managed internally. /// </summary> internal IDictionary<string, object> SystemProperties { get; private set; } bool IReadOnlyIndicator.IsReadOnly => Interlocked.Read(ref _sizeInBytesCalled) == 1; /// <summary> /// The body stream of the current event data instance /// </summary> [SuppressMessage( "Naming", "CA1721:Property names should not match get methods", Justification = "Cannot remove public property on a public facing type")] public Stream BodyStream => _bodyStream; /// <summary> /// Gets or sets the deliveryTag which is used for server side check-pointing. /// </summary> internal ArraySegment<byte> DeliveryTag { get; set; } /// <summary> /// Dispose the current event data instance /// </summary> public void Dispose() { Dispose(true); } internal bool HasBodyStream() { return _bodyStream != null; } /// <summary> /// Return the body stream of the current event data instance /// </summary> /// <returns></returns> /// <exception cref="InvalidOperationException">throws if the method has been called.</exception> /// <exception cref="ObjectDisposedException">throws if the event data has already been disposed.</exception> /// <remarks>This method can only be called once and afterwards method will throw <see cref="InvalidOperationException"/>.</remarks> public Stream GetBodyStream() { ThrowIfDisposed(); SetGetBodyCalled(); return _bodyStream ?? Stream.Null; } /// <summary> /// This methods return the body stream as a byte array /// </summary> /// <returns></returns> /// <exception cref="ObjectDisposedException">throws if the event data has already been disposed.</exception> public byte[] GetBytes() { ThrowIfDisposed(); SetGetBodyCalled(); if (_bodyStream == null) { #if NET451 return new byte[] { }; #else return Array.Empty<byte>(); #endif } return ReadFullStream(_bodyStream); } /// <summary> /// Clones an existing <see cref="Message"/> instance and sets content body defined by <paramref name="byteArray"/> on it. /// </summary> /// <remarks> /// The cloned message has the message <see cref="MessageId" /> as the original message. /// User should treat the input byte array as immutable when sending the message. /// </remarks> /// <param name="byteArray">Message content to be set after clone.</param> /// <returns>A new instance of <see cref="Message"/> with body content defined by <paramref name="byteArray"/>, /// and user/system properties of the cloned <see cref="Message"/> instance. /// </returns> public Message CloneWithBody(in byte[] byteArray) { var result = new Message(byteArray); foreach (string key in Properties.Keys) { result.Properties.Add(key, Properties[key]); } foreach (string key in SystemProperties.Keys) { result.SystemProperties.Add(key, SystemProperties[key]); } return result; } internal void ResetBody() { if (_originalStreamPosition == StreamCannotSeek) { throw new IOException("Stream cannot seek."); } _bodyStream.Seek(_originalStreamPosition, SeekOrigin.Begin); Interlocked.Exchange(ref _getBodyCalled, 0); } internal bool IsBodyCalled => Volatile.Read(ref _getBodyCalled) == 1; private void SetGetBodyCalled() { if (1 == Interlocked.Exchange(ref _getBodyCalled, 1)) { throw Fx.Exception.AsError(new InvalidOperationException(ApiResources.MessageBodyConsumed)); } } /// <summary> /// Sets the message as an security message /// </summary> public void SetAsSecurityMessage() { SystemProperties[MessageSystemPropertyNames.InterfaceId] = CommonConstants.SecurityMessageInterfaceId; } private void InitializeWithStream(Stream stream, StreamDisposalResponsibility streamDisposalResponsibility) { // This method should only be used in constructor because // this has no locking on the bodyStream. _bodyStream = stream; _streamDisposalResponsibility = streamDisposalResponsibility; if (_bodyStream.CanSeek) { _originalStreamPosition = _bodyStream.Position; } } private static byte[] ReadFullStream(Stream inputStream) { using var ms = new MemoryStream(); inputStream.CopyTo(ms); return ms.ToArray(); } private T GetSystemProperty<T>(string key) { return SystemProperties.ContainsKey(key) ? (T)SystemProperties[key] : default; } internal void ThrowIfDisposed() { if (_disposed) { throw Fx.Exception.ObjectDisposed(ApiResources.MessageDisposed); } } private void Dispose(bool disposing) { if (!_disposed) { if (disposing) { if (_bodyStream != null && _streamDisposalResponsibility == StreamDisposalResponsibility.Sdk) { _bodyStream.Dispose(); _bodyStream = null; } } } _disposed = true; } } }