public async Task ReceiveAsync()

in iothub/service/src/IotHubClientWebSocket.cs [211:389]


        public async Task<int> ReceiveAsync(byte[] buffer, int offset, TimeSpan timeout)
        {
            byte[] header = new byte[2];

            Fx.AssertAndThrow(State == WebSocketState.Open, ClientWebSocketNotInOpenStateDuringReceive);
            TcpClient.ReceiveTimeout = TimeoutHelper.ToMilliseconds(timeout);

            bool succeeded = false;
            try
            {
                byte payloadLength;
                bool pongFrame;

                // TODO: rewrite this section to handle all control frames (including ping)
                int totalBytesRead;
                int bytesRead;
                do
                {
                    // Ignore pong frame and start over
                    totalBytesRead = 0;
                    totalBytesRead = await ReadFromStreamAsync(WebSocketStream, header).ConfigureAwait(false);

                    if (totalBytesRead == 0)
                    {
                        throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                    }

                    if (!ParseWebSocketFrameHeader(header, out payloadLength, out pongFrame))
                    {
                        // Encountered a close frame or error in parsing frame from server. Close connection
                        byte[] closeHeader = PrepareWebSocketHeader(0, WebSocketMessageType.Close);

                        await WriteToStreamAsync(WebSocketStream, closeHeader).ConfigureAwait(false);

                        State = WebSocketState.Closed;
                        WebSocketStream.Close();
                        TcpClient.Close();
                        return 0;  // TODO: throw exception?
                    }

                    if (pongFrame && payloadLength > 0)
                    {
                        totalBytesRead = 0;
                        byte[] tempBuffer = new byte[payloadLength];
                        while (totalBytesRead < payloadLength)
                        {
                            bytesRead = await ReadFromStreamAsync(WebSocketStream, tempBuffer, totalBytesRead, payloadLength - totalBytesRead).ConfigureAwait(false);
                            if (bytesRead == 0)
                            {
                                throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                            }

                            totalBytesRead += bytesRead;
                        }
                    }
                }
                while (pongFrame);

                totalBytesRead = 0;

                if (buffer.Length < payloadLength)
                {
                    throw Fx.Exception.AsError(new InvalidOperationException(SizeExceedsRemainingBufferSpace));
                }

                if (payloadLength < MediumSizeFrame)
                {
                    while (totalBytesRead < payloadLength)
                    {
                        bytesRead = await ReadFromStreamAsync(WebSocketStream, buffer, offset + totalBytesRead, payloadLength - totalBytesRead).ConfigureAwait(false);

                        if (bytesRead == 0)
                        {
                            throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                        }

                        totalBytesRead += bytesRead;
                    }
                }
                else
                {
                    switch (payloadLength)
                    {
                        case MediumSizeFrame:
                            // read payload length (< 64K)
                            do
                            {
                                bytesRead = await ReadFromStreamAsync(WebSocketStream, header, totalBytesRead, header.Length - totalBytesRead).ConfigureAwait(false);

                                if (bytesRead == 0)
                                {
                                    throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                                }

                                totalBytesRead += bytesRead;
                            }
                            while (totalBytesRead < header.Length);

                            totalBytesRead = 0;
                            ushort extendedPayloadLength = (ushort)((header[0] << 8) | header[1]);

                            // read payload
                            if (buffer.Length >= extendedPayloadLength)
                            {
                                while (totalBytesRead < extendedPayloadLength)
                                {
                                    bytesRead = await ReadFromStreamAsync(WebSocketStream, buffer, offset + totalBytesRead, extendedPayloadLength - totalBytesRead).ConfigureAwait(false);

                                    if (bytesRead == 0)
                                    {
                                        throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                                    }

                                    totalBytesRead += bytesRead;
                                }
                            }
                            else
                            {
                                throw Fx.Exception.AsError(new InvalidOperationException(SizeExceedsRemainingBufferSpace));
                            }

                            break;

                        case LargeSizeFrame:
                            // read payload length (>= 64K)
                            byte[] payloadLengthBuffer = new byte[8];
                            do
                            {
                                bytesRead = await ReadFromStreamAsync(WebSocketStream, payloadLengthBuffer, totalBytesRead, payloadLengthBuffer.Length - totalBytesRead).ConfigureAwait(false);

                                if (bytesRead == 0)
                                {
                                    throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                                }

                                totalBytesRead += bytesRead;
                            }
                            while (totalBytesRead < payloadLengthBuffer.Length);

                            totalBytesRead = 0;

                            // ignore bytes 0-3 - length cannot be larger than a 32-bit number
                            uint superExtendedPayloadLength = (uint)((payloadLengthBuffer[4] << 24) | (payloadLengthBuffer[5] << 16) | (payloadLengthBuffer[6] << 8) | payloadLengthBuffer[7]);

                            // read payload
                            if (buffer.Length >= superExtendedPayloadLength)
                            {
                                while (totalBytesRead < superExtendedPayloadLength)
                                {
                                    bytesRead = await ReadFromStreamAsync(WebSocketStream, buffer, offset + totalBytesRead, (int)(superExtendedPayloadLength - totalBytesRead)).ConfigureAwait(false);

                                    if (bytesRead == 0)
                                    {
                                        throw new IOException(FramingPrematureEOF, new InvalidDataException("IotHubClientWebSocket was expecting more bytes"));
                                    }

                                    totalBytesRead += bytesRead;
                                }
                            }
                            else
                            {
                                throw Fx.Exception.AsError(new InvalidOperationException(SizeExceedsRemainingBufferSpace));
                            }

                            break;
                    }
                }

                succeeded = true;
                return totalBytesRead;
            }
            finally
            {
                if (!succeeded)
                {
                    Fault();
                }
            }
        }