iothub/device/src/DirectMethod/MethodRequestInternal.cs (114 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.IO;
using System.Threading;
namespace Microsoft.Azure.Devices.Client
{
/// <summary>
/// The data structure represent the method request coming from the IoT hub.
/// </summary>
public sealed class MethodRequestInternal : IDisposable
{
private volatile Stream _bodyStream;
private bool _disposed;
private bool _ownsBodyStream;
private int _getBodyCalled;
/// <summary>
/// Default constructor with no body data.
/// </summary>
internal MethodRequestInternal(CancellationToken cancellationToken)
{
InitializeWithStream(Stream.Null, true);
CancellationToken = cancellationToken;
}
/// <summary>
/// This constructor is only used in the receive path from Amqp path, or in cloning from a message
/// that has serialized.
/// </summary>
internal MethodRequestInternal(string name, string requestId, Stream bodyStream, CancellationToken cancellationToken)
: this(cancellationToken)
{
Name = name;
RequestId = requestId;
Stream stream = bodyStream;
InitializeWithStream(stream, false);
}
internal CancellationToken CancellationToken { get; private set; }
/// <summary>
/// Property indicating the method name for this instance.
/// </summary>
internal string Name { get; private set; }
/// <summary>
/// the request Id for the transport layer.
/// </summary>
internal string RequestId { get; private set; }
internal Stream BodyStream => _bodyStream;
/// <summary>
/// Dispose the current method data instance.
/// </summary>
public void Dispose()
{
Dispose(true);
}
/// <summary>
/// Return the body stream of the current method data instance.
/// </summary>
/// <returns></returns>
/// <exception cref="InvalidOperationException">throws if the method has been called.</exception>
/// <exception cref="ObjectDisposedException">throws if the method data has already been disposed.</exception>
/// <remarks>This method can only be called once and afterwards method will throw <see cref="InvalidOperationException"/>.</remarks>
internal Stream GetBodyStream()
{
ThrowIfDisposed();
SetGetBodyCalled();
return _bodyStream ?? Stream.Null;
}
/// <summary>
/// This methods return the body stream as a byte array.
/// </summary>
/// <returns></returns>
/// <exception cref="InvalidOperationException">throws if the method has been called.</exception>
/// <exception cref="ObjectDisposedException">throws if the method data has already been disposed.</exception>
internal byte[] GetBytes()
{
ThrowIfDisposed();
SetGetBodyCalled();
if (_bodyStream == null)
{
#if NET451
return new byte[0];
#else
return Array.Empty<byte>();
#endif
}
// This is just fail safe code in case we are not using the AMQP protocol.
return ReadFullStream(_bodyStream);
}
// Test hook only
internal void ResetGetBodyCalled()
{
Interlocked.Exchange(ref _getBodyCalled, 0);
if (_bodyStream != null && _bodyStream.CanSeek)
{
_bodyStream.Seek(0, SeekOrigin.Begin);
}
}
internal bool TryResetBody(long position)
{
if (_bodyStream != null && _bodyStream.CanSeek)
{
_bodyStream.Seek(position, SeekOrigin.Begin);
Interlocked.Exchange(ref _getBodyCalled, 0);
return true;
}
return false;
}
internal bool IsBodyCalled => Volatile.Read(ref _getBodyCalled) == 1;
private void SetGetBodyCalled()
{
if (1 == Interlocked.Exchange(ref _getBodyCalled, 1))
{
throw Fx.Exception.AsError(new InvalidOperationException(Common.Api.ApiResources.MessageBodyConsumed));
}
}
private void InitializeWithStream(Stream stream, bool ownsStream)
{
// This method should only be used in constructor because
// this has no locking on the bodyStream.
_bodyStream = stream;
_ownsBodyStream = ownsStream;
}
private static byte[] ReadFullStream(Stream inputStream)
{
using var ms = new MemoryStream();
inputStream.CopyTo(ms);
return ms.ToArray();
}
private void ThrowIfDisposed()
{
if (_disposed)
{
throw Fx.Exception.ObjectDisposed(Common.Api.ApiResources.MessageDisposed);
}
}
private void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
if (_bodyStream != null && _ownsBodyStream)
{
_bodyStream.Dispose();
_bodyStream = null;
}
}
_disposed = true;
}
}
}
}