in src/directLine.ts [858:910]
private pollingGetActivity$() {
const poller$: Observable<AjaxResponse> = Observable.create((subscriber: Subscriber<any>) => {
// A BehaviorSubject to trigger polling. Since it is a BehaviorSubject
// the first event is produced immediately.
const trigger$ = new BehaviorSubject<any>({});
// TODO: remove Date.now, use reactive interval to space out every request
trigger$.subscribe(() => {
if (this.connectionStatus$.getValue() === ConnectionStatus.Online) {
const startTimestamp = Date.now();
this.services.ajax({
headers: {
Accept: 'application/json',
...this.commonHeaders()
},
method: 'GET',
url: `${ this.domain }/conversations/${ this.conversationId }/activities?watermark=${ this.watermark }`,
timeout: this.timeout
}).subscribe(
(result: AjaxResponse) => {
subscriber.next(result);
setTimeout(() => trigger$.next(null), Math.max(0, this.pollingInterval - Date.now() + startTimestamp));
},
(error: any) => {
switch (error.status) {
case 403:
this.connectionStatus$.next(ConnectionStatus.ExpiredToken);
setTimeout(() => trigger$.next(null), this.pollingInterval);
break;
case 404:
this.connectionStatus$.next(ConnectionStatus.Ended);
break;
default:
// propagate the error
subscriber.error(error);
break;
}
}
);
}
});
});
return this.checkConnection()
.flatMap(_ => poller$
.catch(() => Observable.empty<AjaxResponse>())
.map(ajaxResponse => ajaxResponse.response as ActivityGroup)
.flatMap(activityGroup => this.observableFromActivityGroup(activityGroup)));
}