protected override async Task ProtectedReadAsync()

in sources/Google.Solutions.Iap/Protocol/SshRelayStream.cs [186:297]


        protected override async Task<int> ProtectedReadAsync(
            byte[] buffer,
            int offset,
            int count,
            CancellationToken cancellationToken)
        {
            if (count < MinReadSize)
            {
                throw new IndexOutOfRangeException(
                    $"Read buffer too small ({count}), must be at least {MinReadSize}");
            }

            var message = new byte[Math.Max(
                SshRelayFormat.MinMessageSize,
                SshRelayFormat.Data.HeaderLength + count)];

            return (int)await this.session.IoAsync(
                async stream =>
                {
                    while (true)
                    {
                        var bytesRead = await stream
                            .ReadAsync(
                                message,
                                0,
                                message.Length,
                                cancellationToken)
                            .ConfigureAwait(false);
                        if (bytesRead == 0)
                        {
                            return 0;
                        }
                        else if (bytesRead < SshRelayFormat.Tag.Length)
                        {
                            throw new SshRelayProtocolViolationException(
                                "The server sent an incomplete message");
                        }

                        SshRelayFormat.Tag.Decode(message, out var tag);

                        switch (tag)
                        {
                            case SshRelayMessageTag.DATA:
                                {
                                    var bytesDecoded = SshRelayFormat.Data.Decode(
                                        message,
                                        buffer,
                                        (uint)offset,
                                        (uint)count,
                                        out var dataLength);

                                    Debug.Assert(dataLength < bytesDecoded);
                                    Debug.Assert(bytesDecoded == bytesRead);

                                    TraceVerbose($"Received DATA message ({dataLength} bytes)");

                                    this.session.State.AddBytesReceived(dataLength);

                                    return dataLength;
                                }
                            case SshRelayMessageTag.ACK:
                                {
                                    var bytesDecoded = SshRelayFormat.Ack.Decode(message, out var ack);

                                    Debug.Assert(bytesDecoded == bytesRead);

                                    if (ack == 0)
                                    {
                                        throw new SshRelayProtocolViolationException(
                                            "The server sent an invalid zero-ack");
                                    }
                                    else if (ack > (ulong)this.session.State.BytesSent)
                                    {
                                        throw new SshRelayProtocolViolationException(
                                            "The server sent a mismatched ack");
                                    }

                                    this.session.State.LastAckReceived = ack;

                                    using (await this.unacknoledgedQueueLock
                                        .AcquireAsync(cancellationToken)
                                        .ConfigureAwait(false))
                                    {
                                        //
                                        // The server might be acknolodging multiple messages at once.
                                        //
                                        while (this.unacknoledgedQueue.Count > 0 &&
                                               this.unacknoledgedQueue.Peek().ExpectedAck <= ack)
                                        {
                                            this.unacknoledgedQueue.Dequeue();
                                        }
                                    }

                                    TraceVerbose($"Received ACK #{ack}");

                                    break;
                                }
                            case SshRelayMessageTag.LONG_CLOSE:
                            default:
                                //
                                // Unknown tag, ignore.
                                //
                                TraceVerbose($"Received unknown message: {tag}");

                                break;
                        }
                    }
                },
                ResendUnacknoledgedDataAsync,
                false, // Normal closes are ok.
                cancellationToken);
        }