async connect()

in lib/eventstream_rpc.ts [384:477]


    async connect(options?: RpcClientConnectOptions) : Promise<SuccessfulConnectionResult> {
        return new Promise<SuccessfulConnectionResult>(async (resolve, reject) => {
            if (this.state != ClientState.None) {
                reject(createRpcError(RpcErrorType.ClientStateError, "RpcClient.connect() can only be called once"));
                return;
            }

            let onDisconnectWhileConnecting : eventstream.DisconnectionListener = (eventData: eventstream.DisconnectionEvent) => {
                if (this.state == ClientState.Connecting) {
                    this.state = ClientState.Finished;
                    reject(createRpcError(RpcErrorType.NetworkError, "RpcClient.connect() failed - connection closed"));
                    setImmediate(() => { this.close(); });
                }
            };

            this.connection.on('disconnection', onDisconnectWhileConnecting);

            this.state = ClientState.Connecting;
            let connack = undefined;

            try {
                await this.connection.connect({
                    cancelController: options?.cancelController
                });

                // create, transform, and send the connect
                let connectMessage: eventstream.Message = {
                    type: eventstream.MessageType.Connect
                };

                if (this.config.connectTransform) {
                    connectMessage = await this.config.connectTransform({
                        message: connectMessage,
                        cancelController: options?.cancelController
                    });
                }

                this._applyEventstreamRpcHeadersToConnect(connectMessage);

                let connackPromise : Promise<eventstream.Message> = cancel.newCancellablePromiseFromNextEvent({
                    cancelController: options?.cancelController,
                    emitter : this.connection,
                    eventName : eventstream.ClientConnection.PROTOCOL_MESSAGE,
                    eventDataTransformer: (eventData: any) => { return (eventData as eventstream.MessageEvent).message; },
                    cancelMessage: "Eventstream connect() cancelled by user request"
                });

                await this.connection.sendProtocolMessage({
                    message: connectMessage,
                    cancelController: options?.cancelController
                });

                // wait for the conn ack or cancel
                connack = await connackPromise;
            } catch (err) {
                if (this.state == ClientState.Connecting) {
                    this.state = ClientState.Finished;
                    setImmediate(() => { this.close(); });
                }

                reject(createRpcError(RpcErrorType.InternalError, "Failed to establish eventstream RPC connection", err as CrtError));
                return;
            }

            if (this.state != ClientState.Connecting) {
                reject(createRpcError(RpcErrorType.InternalError, "Eventstream RPC connection attempt interrupted"));
                return;
            }

            if (!connack || !RpcClient.isValidConnack(connack)) {
                this.state = ClientState.Finished;
                reject(createRpcError(RpcErrorType.HandshakeError, "Failed to establish eventstream RPC connection - invalid connack"));
                setImmediate(() => { this.close(); });
                return;
            }

            /*
             * Remove the promise-rejecting disconnect listener and replace it with a regular old listener that
             * doesn't reject the connect() promise since we're going to resolve it now.
             */
            this.connection.removeListener('disconnection', onDisconnectWhileConnecting);
            this.connection.on('disconnection', (eventData: eventstream.DisconnectionEvent) => {
                if (eventData.errorCode != 0) {
                    this.disconnectionReason = new CrtError(eventData.errorCode);
                }
                setImmediate(() => { this.close(); });
            });

            /* Per the client contract, we only emit disconnect after a successful connection establishment */
            this.emitDisconnectOnClose = true;
            this.state = ClientState.Connected;
            resolve({});
        });
    }