private observableWebSocket()

in src/directLine.ts [938:990]


    private observableWebSocket<T>() {
        return Observable.create((subscriber: Subscriber<T>) => {
            konsole.log("creating WebSocket", this.streamUrl);
            const ws = new this.services.WebSocket(this.streamUrl);
            let sub: Subscription;
            let closed: boolean;

            ws.onopen = open => {
                konsole.log("WebSocket open", open);
                // Chrome is pretty bad at noticing when a WebSocket connection is broken.
                // If we periodically ping the server with empty messages, it helps Chrome
                // realize when connection breaks, and close the socket. We then throw an
                // error, and that give us the opportunity to attempt to reconnect.
                sub = Observable.interval(this.timeout, this.services.scheduler).subscribe(_ => {
                    try {
                        ws.send("")
                    } catch(e) {
                        konsole.log("Ping error", e);
                    }
                });
            }

            ws.onclose = close => {
                konsole.log("WebSocket close", close);
                if (sub) sub.unsubscribe();

                // RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored.
                // We need to guard against extraneous "error" signal to workaround the bug.
                closed || subscriber.error(close);
                closed = true;
            }

            ws.onerror = error => {
                konsole.log("WebSocket error", error);
                if (sub) sub.unsubscribe();

                // RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored.
                // We need to guard against extraneous "error" signal to workaround the bug.
                closed || subscriber.error(error);
                closed = true;
            }

            ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));

            // This is the 'unsubscribe' method, which is called when this observable is disposed.
            // When the WebSocket closes itself, we throw an error, and this function is eventually called.
            // When the observable is closed first (e.g. when tearing down a WebChat instance) then
            // we need to manually close the WebSocket.
            return () => {
                if (ws.readyState === 0 || ws.readyState === 1) ws.close();
            }
        }) as Observable<T>
    }