private async Task GetConnectionAsync()

in sources/Google.Solutions.Iap/Protocol/SshRelaySession.cs [81:276]


        private async Task<INetworkStream> GetConnectionAsync(
            Func<INetworkStream, CancellationToken, Task> resendUnacknoledgedDataAction,
            CancellationToken cancellationToken)
        {
            //
            // This method might be called concurrently by a
            // writer and a reader, so we have to synchronize.
            //
            using (await this.connectLock
                .AcquireAsync(cancellationToken)
                .ConfigureAwait(false))
            {
                if (this.connection != null)
                {
                    //
                    // We're still connected.
                    //
                    return this.connection;
                }
                else if (this.State.LastAckReceived == 0)
                {
                    //
                    // Initial connect.
                    //
                    TraceVerbose($"Establishing new connection");

                    var connection = await this.Endpoint
                        .ConnectAsync(cancellationToken)
                        .ConfigureAwait(false);

                    //
                    // To complete connection establishment, we have to receive
                    // a CONNECT_SUCCESS_SID message.
                    //
                    var message = new byte[SshRelayFormat.MaxMessageSize];
                    string? connectionSid = null;

                    while (true)
                    {
                        var bytesRead = await connection
                            .ReadAsync(
                                message,
                                0,
                                message.Length,
                                cancellationToken)
                            .ConfigureAwait(false);

                        SshRelayFormat.Tag.Decode(message, out var tag);

                        if (bytesRead == 0)
                        {
                            throw new WebSocketStreamClosedByServerException(
                                WebSocketCloseStatus.NormalClosure,
                                "The connection was closed by the server");
                        }
                        else if (bytesRead < SshRelayFormat.Tag.Length)
                        {
                            throw new SshRelayProtocolViolationException(
                                "The server sent an incomplete message");
                        }

                        switch (tag)
                        {
                            case SshRelayMessageTag.CONNECT_SUCCESS_SID:
                                {
                                    var bytesDecoded = SshRelayFormat.ConnectSuccessSid.Decode(
                                        message,
                                        out var sid);
                                    connectionSid = sid;

                                    Debug.Assert(bytesDecoded == bytesRead);

                                    //
                                    // If the previous connection broke before we received
                                    // the first ACK, then there might be data to be resend.
                                    //

                                    await resendUnacknoledgedDataAction(
                                            connection,
                                            cancellationToken)
                                        .ConfigureAwait(false);

                                    this.Sid = connectionSid;
                                    this.connection = connection;

                                    TraceVerbose($"Received CONNECT_SUCCESS_SID, connected");

                                    return connection;
                                }

                            case SshRelayMessageTag.LONG_CLOSE:
                            default:
                                //
                                // Unknown tag, ignore.
                                //

                                TraceVerbose($"Received unknown message: {tag}");

                                break;
                        }
                    }
                }
                else
                {
                    //
                    // Reconnect + sync ack's + resend data.
                    //
                    Debug.Assert(this.Sid != null);
                    TraceVerbose($"Attempting reconnect with ack={this.State.LastAckReceived}");

                    var connection = await this.Endpoint
                        .ReconnectAsync(
                            this.Sid!,
                            this.State.BytesReceived,
                            cancellationToken)
                        .ConfigureAwait(false);

                    //
                    // To complete connection establishment, we have to receive
                    // a RECONNECT_SUCCESS_ACK message.
                    //
                    var message = new byte[SshRelayFormat.MaxMessageSize];
                    while (true)
                    {
                        var bytesRead = await connection
                            .ReadAsync(
                                message,
                                0,
                                message.Length,
                                cancellationToken)
                            .ConfigureAwait(false);

                        SshRelayFormat.Tag.Decode(message, out var tag);

                        if (bytesRead == 0)
                        {
                            throw new WebSocketStreamClosedByServerException(
                                WebSocketCloseStatus.NormalClosure,
                                "The connection was closed by the server");
                        }
                        else if (bytesRead < SshRelayFormat.Tag.Length)
                        {
                            throw new SshRelayProtocolViolationException(
                                "The server sent an incomplete message");
                        }

                        switch (tag)
                        {
                            case SshRelayMessageTag.RECONNECT_SUCCESS_ACK:
                                {
                                    var bytesDecoded = SshRelayFormat.ReconnectAck.Decode(
                                        message,
                                        out var ack);
                                    this.State.LastAckReceived = ack;

                                    Debug.Assert(bytesDecoded == bytesRead);

                                    //
                                    // Resend all data since the ACK that we just received.
                                    //

                                    await resendUnacknoledgedDataAction(
                                            connection,
                                            cancellationToken)
                                        .ConfigureAwait(false);

                                    this.connection = connection;

                                    TraceVerbose("Received RECONNECT_SUCCESS_ACK, reconnected");

                                    return connection;
                                }
                            case SshRelayMessageTag.CONNECT_SUCCESS_SID:
                                {
                                    //
                                    // We shouldn't be receiving this message after
                                    // a reconnect.
                                    //
                                    throw new SshRelayProtocolViolationException(
                                        "The server sent an unexpected CONNECT_SUCCESS_SID " +
                                        "message in response to a reconnect");
                                }

                            case SshRelayMessageTag.LONG_CLOSE:
                            default:
                                //
                                // Unknown tag, ignore.
                                //
                                TraceVerbose($"Received unknown message: {tag}");

                                break;
                        }
                    }
                }
            }
        }