in lib/eventstream_rpc.ts [384:477]
async connect(options?: RpcClientConnectOptions) : Promise<SuccessfulConnectionResult> {
return new Promise<SuccessfulConnectionResult>(async (resolve, reject) => {
if (this.state != ClientState.None) {
reject(createRpcError(RpcErrorType.ClientStateError, "RpcClient.connect() can only be called once"));
return;
}
let onDisconnectWhileConnecting : eventstream.DisconnectionListener = (eventData: eventstream.DisconnectionEvent) => {
if (this.state == ClientState.Connecting) {
this.state = ClientState.Finished;
reject(createRpcError(RpcErrorType.NetworkError, "RpcClient.connect() failed - connection closed"));
setImmediate(() => { this.close(); });
}
};
this.connection.on('disconnection', onDisconnectWhileConnecting);
this.state = ClientState.Connecting;
let connack = undefined;
try {
await this.connection.connect({
cancelController: options?.cancelController
});
// create, transform, and send the connect
let connectMessage: eventstream.Message = {
type: eventstream.MessageType.Connect
};
if (this.config.connectTransform) {
connectMessage = await this.config.connectTransform({
message: connectMessage,
cancelController: options?.cancelController
});
}
this._applyEventstreamRpcHeadersToConnect(connectMessage);
let connackPromise : Promise<eventstream.Message> = cancel.newCancellablePromiseFromNextEvent({
cancelController: options?.cancelController,
emitter : this.connection,
eventName : eventstream.ClientConnection.PROTOCOL_MESSAGE,
eventDataTransformer: (eventData: any) => { return (eventData as eventstream.MessageEvent).message; },
cancelMessage: "Eventstream connect() cancelled by user request"
});
await this.connection.sendProtocolMessage({
message: connectMessage,
cancelController: options?.cancelController
});
// wait for the conn ack or cancel
connack = await connackPromise;
} catch (err) {
if (this.state == ClientState.Connecting) {
this.state = ClientState.Finished;
setImmediate(() => { this.close(); });
}
reject(createRpcError(RpcErrorType.InternalError, "Failed to establish eventstream RPC connection", err as CrtError));
return;
}
if (this.state != ClientState.Connecting) {
reject(createRpcError(RpcErrorType.InternalError, "Eventstream RPC connection attempt interrupted"));
return;
}
if (!connack || !RpcClient.isValidConnack(connack)) {
this.state = ClientState.Finished;
reject(createRpcError(RpcErrorType.HandshakeError, "Failed to establish eventstream RPC connection - invalid connack"));
setImmediate(() => { this.close(); });
return;
}
/*
* Remove the promise-rejecting disconnect listener and replace it with a regular old listener that
* doesn't reject the connect() promise since we're going to resolve it now.
*/
this.connection.removeListener('disconnection', onDisconnectWhileConnecting);
this.connection.on('disconnection', (eventData: eventstream.DisconnectionEvent) => {
if (eventData.errorCode != 0) {
this.disconnectionReason = new CrtError(eventData.errorCode);
}
setImmediate(() => { this.close(); });
});
/* Per the client contract, we only emit disconnect after a successful connection establishment */
this.emitDisconnectOnClose = true;
this.state = ClientState.Connected;
resolve({});
});
}