in sources/Google.Solutions.Iap/Protocol/SshRelayStream.cs [186:297]
protected override async Task<int> ProtectedReadAsync(
byte[] buffer,
int offset,
int count,
CancellationToken cancellationToken)
{
if (count < MinReadSize)
{
throw new IndexOutOfRangeException(
$"Read buffer too small ({count}), must be at least {MinReadSize}");
}
var message = new byte[Math.Max(
SshRelayFormat.MinMessageSize,
SshRelayFormat.Data.HeaderLength + count)];
return (int)await this.session.IoAsync(
async stream =>
{
while (true)
{
var bytesRead = await stream
.ReadAsync(
message,
0,
message.Length,
cancellationToken)
.ConfigureAwait(false);
if (bytesRead == 0)
{
return 0;
}
else if (bytesRead < SshRelayFormat.Tag.Length)
{
throw new SshRelayProtocolViolationException(
"The server sent an incomplete message");
}
SshRelayFormat.Tag.Decode(message, out var tag);
switch (tag)
{
case SshRelayMessageTag.DATA:
{
var bytesDecoded = SshRelayFormat.Data.Decode(
message,
buffer,
(uint)offset,
(uint)count,
out var dataLength);
Debug.Assert(dataLength < bytesDecoded);
Debug.Assert(bytesDecoded == bytesRead);
TraceVerbose($"Received DATA message ({dataLength} bytes)");
this.session.State.AddBytesReceived(dataLength);
return dataLength;
}
case SshRelayMessageTag.ACK:
{
var bytesDecoded = SshRelayFormat.Ack.Decode(message, out var ack);
Debug.Assert(bytesDecoded == bytesRead);
if (ack == 0)
{
throw new SshRelayProtocolViolationException(
"The server sent an invalid zero-ack");
}
else if (ack > (ulong)this.session.State.BytesSent)
{
throw new SshRelayProtocolViolationException(
"The server sent a mismatched ack");
}
this.session.State.LastAckReceived = ack;
using (await this.unacknoledgedQueueLock
.AcquireAsync(cancellationToken)
.ConfigureAwait(false))
{
//
// The server might be acknolodging multiple messages at once.
//
while (this.unacknoledgedQueue.Count > 0 &&
this.unacknoledgedQueue.Peek().ExpectedAck <= ack)
{
this.unacknoledgedQueue.Dequeue();
}
}
TraceVerbose($"Received ACK #{ack}");
break;
}
case SshRelayMessageTag.LONG_CLOSE:
default:
//
// Unknown tag, ignore.
//
TraceVerbose($"Received unknown message: {tag}");
break;
}
}
},
ResendUnacknoledgedDataAsync,
false, // Normal closes are ok.
cancellationToken);
}