in sources/Google.Solutions.Iap/Protocol/SshRelaySession.cs [81:276]
private async Task<INetworkStream> GetConnectionAsync(
Func<INetworkStream, CancellationToken, Task> resendUnacknoledgedDataAction,
CancellationToken cancellationToken)
{
//
// This method might be called concurrently by a
// writer and a reader, so we have to synchronize.
//
using (await this.connectLock
.AcquireAsync(cancellationToken)
.ConfigureAwait(false))
{
if (this.connection != null)
{
//
// We're still connected.
//
return this.connection;
}
else if (this.State.LastAckReceived == 0)
{
//
// Initial connect.
//
TraceVerbose($"Establishing new connection");
var connection = await this.Endpoint
.ConnectAsync(cancellationToken)
.ConfigureAwait(false);
//
// To complete connection establishment, we have to receive
// a CONNECT_SUCCESS_SID message.
//
var message = new byte[SshRelayFormat.MaxMessageSize];
string? connectionSid = null;
while (true)
{
var bytesRead = await connection
.ReadAsync(
message,
0,
message.Length,
cancellationToken)
.ConfigureAwait(false);
SshRelayFormat.Tag.Decode(message, out var tag);
if (bytesRead == 0)
{
throw new WebSocketStreamClosedByServerException(
WebSocketCloseStatus.NormalClosure,
"The connection was closed by the server");
}
else if (bytesRead < SshRelayFormat.Tag.Length)
{
throw new SshRelayProtocolViolationException(
"The server sent an incomplete message");
}
switch (tag)
{
case SshRelayMessageTag.CONNECT_SUCCESS_SID:
{
var bytesDecoded = SshRelayFormat.ConnectSuccessSid.Decode(
message,
out var sid);
connectionSid = sid;
Debug.Assert(bytesDecoded == bytesRead);
//
// If the previous connection broke before we received
// the first ACK, then there might be data to be resend.
//
await resendUnacknoledgedDataAction(
connection,
cancellationToken)
.ConfigureAwait(false);
this.Sid = connectionSid;
this.connection = connection;
TraceVerbose($"Received CONNECT_SUCCESS_SID, connected");
return connection;
}
case SshRelayMessageTag.LONG_CLOSE:
default:
//
// Unknown tag, ignore.
//
TraceVerbose($"Received unknown message: {tag}");
break;
}
}
}
else
{
//
// Reconnect + sync ack's + resend data.
//
Debug.Assert(this.Sid != null);
TraceVerbose($"Attempting reconnect with ack={this.State.LastAckReceived}");
var connection = await this.Endpoint
.ReconnectAsync(
this.Sid!,
this.State.BytesReceived,
cancellationToken)
.ConfigureAwait(false);
//
// To complete connection establishment, we have to receive
// a RECONNECT_SUCCESS_ACK message.
//
var message = new byte[SshRelayFormat.MaxMessageSize];
while (true)
{
var bytesRead = await connection
.ReadAsync(
message,
0,
message.Length,
cancellationToken)
.ConfigureAwait(false);
SshRelayFormat.Tag.Decode(message, out var tag);
if (bytesRead == 0)
{
throw new WebSocketStreamClosedByServerException(
WebSocketCloseStatus.NormalClosure,
"The connection was closed by the server");
}
else if (bytesRead < SshRelayFormat.Tag.Length)
{
throw new SshRelayProtocolViolationException(
"The server sent an incomplete message");
}
switch (tag)
{
case SshRelayMessageTag.RECONNECT_SUCCESS_ACK:
{
var bytesDecoded = SshRelayFormat.ReconnectAck.Decode(
message,
out var ack);
this.State.LastAckReceived = ack;
Debug.Assert(bytesDecoded == bytesRead);
//
// Resend all data since the ACK that we just received.
//
await resendUnacknoledgedDataAction(
connection,
cancellationToken)
.ConfigureAwait(false);
this.connection = connection;
TraceVerbose("Received RECONNECT_SUCCESS_ACK, reconnected");
return connection;
}
case SshRelayMessageTag.CONNECT_SUCCESS_SID:
{
//
// We shouldn't be receiving this message after
// a reconnect.
//
throw new SshRelayProtocolViolationException(
"The server sent an unexpected CONNECT_SUCCESS_SID " +
"message in response to a reconnect");
}
case SshRelayMessageTag.LONG_CLOSE:
default:
//
// Unknown tag, ignore.
//
TraceVerbose($"Received unknown message: {tag}");
break;
}
}
}
}
}