in src/shippers/elastic_v3/server/src/server_shipper.ts [240:284]
private setInternalSubscriber() {
// Create an emitter that emits when MIN_TIME_SINCE_LAST_SEND have passed since the last time we sent the data
const minimumTimeSinceLastSent$ = interval(SECOND).pipe(
filter(() => Date.now() - this.lastBatchSent >= MIN_TIME_SINCE_LAST_SEND)
);
merge(
minimumTimeSinceLastSent$.pipe(
takeUntil(this.shutdown$),
map(() => ({ shouldFlush: false }))
),
// Whenever a `flush` request comes in
this.flush$.pipe(map(() => ({ shouldFlush: true }))),
// Attempt to send one last time on shutdown, flushing the queue
this.shutdown$.pipe(map(() => ({ shouldFlush: true })))
)
.pipe(
// Only move ahead if it's opted-in and online, and there are some events in the queue
filter(() => {
const shouldSendAnything =
this.isOptedIn$.value === true && this.firstTimeOffline === null && this.internalQueue.length > 0;
// If it should not send anything, re-emit the inflight request observable just in case it's already 0
if (!shouldSendAnything) {
this.inFlightRequests$.next(this.inFlightRequests$.value);
}
return shouldSendAnything;
}),
// Send the events:
// 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions.
map(({ shouldFlush }) => {
this.lastBatchSent = Date.now();
return this.getEventsToSend(shouldFlush);
}),
// 2. Skip empty buffers (just to be sure)
filter((events) => events.length > 0),
// 3. Actually send the events
// Using `mergeMap` here because we want to send events whenever the emitter says so:
// We don't want to skip emissions (exhaustMap) or enqueue them (concatMap).
mergeMap((eventsToSend) => this.sendEvents(eventsToSend))
)
.subscribe();
}