in iothub/device/src/net451/IotHubClientWebSocket.cs [220:408]
public async Task<int> ReceiveAsync(byte[] buffer, int offset, TimeSpan timeout)
{
if (Logging.IsEnabled)
Logging.Enter(this, timeout, $"{nameof(IotHubClientWebSocket)}.{nameof(ReceiveAsync)}");
byte[] header = new byte[2];
Fx.AssertAndThrow(State == WebSocketState.Open, ClientWebSocketNotInOpenStateDuringReceive);
_tcpClient.ReceiveTimeout = TimeoutHelper.ToMilliseconds(timeout);
bool succeeded = false;
try
{
byte payloadLength;
bool pongFrame;
// TODO: rewrite this section to handle all control frames (including ping)
int totalBytesRead;
int bytesRead;
do
{
// Ignore pong frame and start over
totalBytesRead = 0;
do
{
bytesRead = await _webSocketStream.ReadAsync(header, totalBytesRead, header.Length - totalBytesRead).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
while (totalBytesRead < header.Length);
if (!ParseWebSocketFrameHeader(header, out payloadLength, out pongFrame))
{
// Encountered a close frame or error in parsing frame from server. Close connection
byte[] closeHeader = PrepareWebSocketHeader(0, WebSocketMessageType.Close);
await _webSocketStream.WriteAsync(closeHeader, 0, closeHeader.Length).ConfigureAwait(false);
State = WebSocketState.Closed;
_webSocketStream?.Close();
_tcpClient?.Close();
return 0; // TODO: throw exception?
}
if (pongFrame && payloadLength > 0)
{
totalBytesRead = 0;
byte[] tempBuffer = new byte[payloadLength];
while (totalBytesRead < payloadLength)
{
bytesRead = await _webSocketStream.ReadAsync(tempBuffer, totalBytesRead, payloadLength - totalBytesRead).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
}
} while (pongFrame);
totalBytesRead = 0;
if (buffer.Length < payloadLength)
{
throw Fx.Exception.AsError(new InvalidOperationException(Resources.SizeExceedsRemainingBufferSpace));
}
if (payloadLength < MediumSizeFrame)
{
while (totalBytesRead < payloadLength)
{
bytesRead = await _webSocketStream.ReadAsync(buffer, offset + totalBytesRead, payloadLength - totalBytesRead).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
}
else
{
switch (payloadLength)
{
case MediumSizeFrame:
// read payload length (< 64K)
do
{
bytesRead = await _webSocketStream.ReadAsync(header, totalBytesRead, header.Length - totalBytesRead).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
while (totalBytesRead < header.Length);
totalBytesRead = 0;
ushort extendedPayloadLength = (ushort)((header[0] << 8) | header[1]);
// read payload
if (buffer.Length >= extendedPayloadLength)
{
while (totalBytesRead < extendedPayloadLength)
{
bytesRead = await _webSocketStream.ReadAsync(buffer, offset + totalBytesRead, extendedPayloadLength - totalBytesRead).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
}
else
{
throw Fx.Exception.AsError(new InvalidOperationException(Resources.SizeExceedsRemainingBufferSpace));
}
break;
case LargeSizeFrame:
// read payload length (>= 64K)
byte[] payloadLengthBuffer = new byte[8];
do
{
bytesRead = await _webSocketStream.ReadAsync(payloadLengthBuffer, totalBytesRead, payloadLengthBuffer.Length - totalBytesRead).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
while (totalBytesRead < payloadLengthBuffer.Length);
totalBytesRead = 0;
// ignore bytes 0-3 - length cannot be larger than a 32-bit number
uint superExtendedPayloadLength = (uint)((payloadLengthBuffer[4] << 24) | (payloadLengthBuffer[5] << 16) | (payloadLengthBuffer[6] << 8) | payloadLengthBuffer[7]);
// read payload
if (buffer.Length >= superExtendedPayloadLength)
{
while (totalBytesRead < superExtendedPayloadLength)
{
bytesRead = await _webSocketStream.ReadAsync(buffer, offset + totalBytesRead, (int)(superExtendedPayloadLength - totalBytesRead)).ConfigureAwait(false);
if (bytesRead == 0)
{
throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
}
totalBytesRead += bytesRead;
}
}
else
{
throw Fx.Exception.AsError(new InvalidOperationException(Resources.SizeExceedsRemainingBufferSpace));
}
break;
}
}
succeeded = true;
return totalBytesRead;
}
finally
{
if (!succeeded)
{
Fault();
}
if (Logging.IsEnabled)
Logging.Exit(this, timeout, $"{nameof(IotHubClientWebSocket)}.{nameof(ReceiveAsync)}");
}
}