iothub/device/src/net451/Common/LegacyClientWebSocketTransport.cs (337 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
using System;
using System.IO;
using System.Net;
using System.Net.WebSockets;
using System.Threading.Tasks;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Transport;
using Microsoft.Azure.Devices.Shared;
namespace Microsoft.Azure.Devices.Client
{
internal sealed class LegacyClientWebSocketTransport : TransportBase
{
private const string ClientWebSocketTransportReadBufferTooSmall = "LegacyClientWebSocketTransport Read Buffer too small.";
private const int MaxReadBufferSize = 256 * 1024; // Max Read buffer size is hard-coded to 256k
private static readonly AsyncCallback s_onWriteComplete = OnWriteComplete;
private readonly IotHubClientWebSocket _webSocket;
private readonly EndPoint _localEndPoint;
private readonly EndPoint _remoteEndPoint;
private readonly TimeSpan _operationTimeout;
private readonly int _asyncReadBufferSize;
private readonly byte[] _asyncReadBuffer;
private bool _isDisposed;
private int _asyncReadBufferOffset;
private int _remainingBytes;
public LegacyClientWebSocketTransport(
IotHubClientWebSocket webSocket,
TimeSpan operationTimeout,
EndPoint localEndpoint,
EndPoint remoteEndpoint)
: base("legacyclientwebsocket")
{
_webSocket = webSocket;
_operationTimeout = operationTimeout;
_localEndPoint = localEndpoint;
_remoteEndPoint = remoteEndpoint;
_asyncReadBufferSize = MaxReadBufferSize; // TODO: read from Config Settings
_asyncReadBuffer = new byte[_asyncReadBufferSize];
}
public override string LocalEndPoint => _localEndPoint.ToString();
public override string RemoteEndPoint => _remoteEndPoint.ToString();
public override bool RequiresCompleteFrames => true;
public override bool IsSecure => true;
public override void SetMonitor(ITransportMonitor usageMeter)
{
// Do Nothing
}
public override bool WriteAsync(TransportAsyncCallbackArgs args)
{
if (Logging.IsEnabled)
Logging.Enter(this, args.Transport, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(WriteAsync)}");
ThrowIfNotOpen();
Fx.AssertAndThrow(args.Buffer != null || args.ByteBufferList != null, "must have a buffer to write");
Fx.AssertAndThrow(args.CompletedCallback != null, "must have a valid callback");
args.Exception = null; // null out any exceptions
Task taskResult = WriteCoreAsync(args);
if (WriteTaskDone(taskResult, args))
{
return false;
}
taskResult.ToAsyncResult(s_onWriteComplete, args);
if (Logging.IsEnabled)
Logging.Exit(this, args.Transport, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(WriteAsync)}");
return true;
}
public override bool ReadAsync(TransportAsyncCallbackArgs args)
{
if (Logging.IsEnabled)
Logging.Enter(this, args.Transport, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(ReadAsync)}");
ThrowIfNotOpen();
// Read with buffer list not supported
Fx.AssertAndThrow(args.Buffer != null, "must have buffer to read");
Fx.AssertAndThrow(args.CompletedCallback != null, "must have a valid callback");
// TODO: Is this assert valid at all? It should be ok for caller to ask for more bytes than we can give...
Fx.AssertAndThrow(args.Count <= _asyncReadBufferSize, ClientWebSocketTransportReadBufferTooSmall);
Utils.ValidateBufferBounds(args.Buffer, args.Offset, args.Count);
args.Exception = null;
if (_asyncReadBufferOffset > 0)
{
Fx.AssertAndThrow(_remainingBytes > 0, "Must have data in buffer to transfer");
// Data left over from previous read
TransferData(_remainingBytes, args);
return false;
}
args.Exception = null; // null out any exceptions
Task<int> taskResult = ReadCoreAsync();
if (ReadTaskDone(taskResult, args))
{
return false;
}
taskResult.ToAsyncResult(OnReadComplete, args);
if (Logging.IsEnabled)
Logging.Exit(this, args.Transport, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(ReadAsync)}");
return true;
}
private async Task WriteCoreAsync(TransportAsyncCallbackArgs args)
{
if (Logging.IsEnabled)
Logging.Enter(this, args.Transport, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(WriteCoreAsync)}");
bool succeeded = false;
try
{
if (args.Buffer != null)
{
await _webSocket
.SendAsync(args.Buffer, args.Offset, args.Count, IotHubClientWebSocket.WebSocketMessageType.Binary, _operationTimeout)
.ConfigureAwait(false);
}
else
{
foreach (ByteBuffer byteBuffer in args.ByteBufferList)
{
await _webSocket
.SendAsync(
byteBuffer.Buffer,
byteBuffer.Offset,
byteBuffer.Length,
IotHubClientWebSocket.WebSocketMessageType.Binary,
_operationTimeout)
.ConfigureAwait(false);
}
}
succeeded = true;
}
catch (WebSocketException webSocketException)
{
throw new IOException(webSocketException.Message, webSocketException);
}
catch (HttpListenerException httpListenerException)
{
throw new IOException(httpListenerException.Message, httpListenerException);
}
catch (TaskCanceledException taskCanceledException)
{
throw new TimeoutException(taskCanceledException.Message, taskCanceledException);
}
finally
{
if (!succeeded)
{
Abort();
}
if (Logging.IsEnabled)
Logging.Exit(this, args.Transport, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(WriteCoreAsync)}");
}
}
protected override bool OpenInternal()
{
ThrowIfNotOpen();
return true;
}
protected override bool CloseInternal()
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(CloseInternal)}");
IotHubClientWebSocket.WebSocketState webSocketState = _webSocket.State;
if (webSocketState != IotHubClientWebSocket.WebSocketState.Closed
&& webSocketState != IotHubClientWebSocket.WebSocketState.Aborted)
{
CloseInternalAsync().GetAwaiter().GetResult();
}
return true;
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(CloseInternal)}");
}
}
protected override void AbortInternal()
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(AbortInternal)}");
if (!_isDisposed
&& _webSocket.State != IotHubClientWebSocket.WebSocketState.Aborted)
{
_isDisposed = true;
_webSocket.Abort();
}
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(AbortInternal)}");
}
}
private async Task<int> ReadCoreAsync()
{
if (Logging.IsEnabled)
Logging.Enter(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(ReadCoreAsync)}");
bool succeeded = false;
try
{
int numBytes = await _webSocket.ReceiveAsync(_asyncReadBuffer, _asyncReadBufferOffset, _operationTimeout).ConfigureAwait(false);
succeeded = true;
return numBytes;
}
catch (WebSocketException webSocketException)
{
throw new IOException(webSocketException.Message, webSocketException);
}
catch (HttpListenerException httpListenerException)
{
throw new IOException(httpListenerException.Message, httpListenerException);
}
catch (TaskCanceledException taskCanceledException)
{
throw new TimeoutException(taskCanceledException.Message, taskCanceledException);
}
finally
{
if (!succeeded)
{
Abort();
}
if (Logging.IsEnabled)
{
Logging.Exit(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(ReadCoreAsync)}");
}
}
}
private async Task CloseInternalAsync()
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(CloseInternalAsync)}");
await _webSocket.CloseAsync().ConfigureAwait(false);
}
catch (Exception e) when (!Fx.IsFatal(e))
{
// do nothing
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(CloseInternalAsync)}");
}
}
private void OnReadComplete(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
var taskResult = (Task<int>)result;
var args = (TransportAsyncCallbackArgs)taskResult.AsyncState;
ReadTaskDone(taskResult, args);
args.CompletedCallback(args);
}
private bool ReadTaskDone(Task<int> taskResult, TransportAsyncCallbackArgs args)
{
IAsyncResult result = taskResult;
args.BytesTransfered = 0; // reset bytes transferred
if (taskResult.IsFaulted)
{
args.Exception = taskResult.Exception;
return true;
}
if (taskResult.IsCompleted)
{
TransferData(taskResult.Result, args);
args.CompletedSynchronously = result.CompletedSynchronously;
return true;
}
// This should not be canceled since TaskCanceledException is handled in ReadCoreAsync.
return taskResult.IsCanceled;
}
private void TransferData(int bytesRead, TransportAsyncCallbackArgs args)
{
if (bytesRead <= args.Count)
{
Buffer.BlockCopy(_asyncReadBuffer, _asyncReadBufferOffset, args.Buffer, args.Offset, bytesRead);
_asyncReadBufferOffset = 0;
_remainingBytes = 0;
args.BytesTransfered = bytesRead;
}
else
{
Buffer.BlockCopy(_asyncReadBuffer, _asyncReadBufferOffset, args.Buffer, args.Offset, args.Count);
// read only part of the data
_asyncReadBufferOffset += args.Count;
_remainingBytes = bytesRead - args.Count;
args.BytesTransfered = args.Count;
}
}
private static void OnWriteComplete(IAsyncResult result)
{
if (result.CompletedSynchronously)
{
return;
}
var taskResult = (Task)result;
var args = (TransportAsyncCallbackArgs)taskResult.AsyncState;
WriteTaskDone(taskResult, args);
args.CompletedCallback(args);
}
private static bool WriteTaskDone(Task taskResult, TransportAsyncCallbackArgs args)
{
IAsyncResult result = taskResult;
args.BytesTransfered = 0; // reset bytes transferred
if (taskResult.IsFaulted)
{
args.Exception = taskResult.Exception;
return true;
}
if (taskResult.IsCompleted)
{
args.BytesTransfered = args.Count;
args.CompletedSynchronously = result.CompletedSynchronously;
return true;
}
// This should not be true since TaskCanceledException is handled in WriteCoreAsync.
return taskResult.IsCanceled;
}
private void ThrowIfNotOpen()
{
try
{
if (Logging.IsEnabled)
Logging.Enter(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(ThrowIfNotOpen)}");
IotHubClientWebSocket.WebSocketState webSocketState = _webSocket.State;
if (webSocketState == IotHubClientWebSocket.WebSocketState.Open)
{
return;
}
if (webSocketState == IotHubClientWebSocket.WebSocketState.Aborted
|| webSocketState == IotHubClientWebSocket.WebSocketState.Closed)
{
throw new ObjectDisposedException(GetType().Name);
}
throw new AmqpException(AmqpErrorCode.IllegalState, null);
}
finally
{
if (Logging.IsEnabled)
Logging.Exit(this, $"{nameof(LegacyClientWebSocketTransport)}.{nameof(ThrowIfNotOpen)}");
}
}
}
}