in src/directLine.ts [938:990]
private observableWebSocket<T>() {
return Observable.create((subscriber: Subscriber<T>) => {
konsole.log("creating WebSocket", this.streamUrl);
const ws = new this.services.WebSocket(this.streamUrl);
let sub: Subscription;
let closed: boolean;
ws.onopen = open => {
konsole.log("WebSocket open", open);
// Chrome is pretty bad at noticing when a WebSocket connection is broken.
// If we periodically ping the server with empty messages, it helps Chrome
// realize when connection breaks, and close the socket. We then throw an
// error, and that give us the opportunity to attempt to reconnect.
sub = Observable.interval(this.timeout, this.services.scheduler).subscribe(_ => {
try {
ws.send("")
} catch(e) {
konsole.log("Ping error", e);
}
});
}
ws.onclose = close => {
konsole.log("WebSocket close", close);
if (sub) sub.unsubscribe();
// RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored.
// We need to guard against extraneous "error" signal to workaround the bug.
closed || subscriber.error(close);
closed = true;
}
ws.onerror = error => {
konsole.log("WebSocket error", error);
if (sub) sub.unsubscribe();
// RxJS.retryWhen has a bug that would cause "error" signal to be sent after the observable is completed/errored.
// We need to guard against extraneous "error" signal to workaround the bug.
closed || subscriber.error(error);
closed = true;
}
ws.onmessage = message => message.data && subscriber.next(JSON.parse(message.data));
// This is the 'unsubscribe' method, which is called when this observable is disposed.
// When the WebSocket closes itself, we throw an error, and this function is eventually called.
// When the observable is closed first (e.g. when tearing down a WebChat instance) then
// we need to manually close the WebSocket.
return () => {
if (ws.readyState === 0 || ws.readyState === 1) ws.close();
}
}) as Observable<T>
}