in src/client/src/analytics_client/analytics_client.ts [281:333]
private reportEnqueuedEventsWhenClientIsReady() {
// Observer that will emit when both events occur: the OptInConfig is set + a shipper has been registered
const configReceivedAndShipperReceivedObserver$ = combineLatest([
this.optInConfigWithReplay$,
merge([
this.shipperRegistered$,
// Merging shipperRegistered$ with the optInConfigWithReplay$ when optedIn is false, so that we don't need to wait for the shipper if opted-in === false
this.optInConfigWithReplay$.pipe(filter((cfg) => cfg?.isOptedIn() === false)),
]),
]);
// Flush the internal queue when we get any optInConfig and, at least, 1 shipper
this.internalEventQueue$
.pipe(
// Take until will close the observer once we reach the condition below
takeUntil(configReceivedAndShipperReceivedObserver$),
// Accumulate the events until we can send them
buffer(configReceivedAndShipperReceivedObserver$),
// Minimal delay only to make this chain async and let the optIn operation to complete first.
delay(0),
// Re-emit the context to make sure all the shippers got it (only if opted-in)
tap(() => {
if (this.optInConfig$.value?.isOptedIn()) {
this.context$.next(this.context$.value);
}
}),
// Minimal delay only to make this chain async and let
// the context update operation to complete first.
delay(0),
// Flatten the array of events
concatMap((events) => from(events)),
// Discard opted-out events
filter((event) => this.optInConfig$.value?.isEventTypeOptedIn(event.event_type) === true),
// Let's group the requests per eventType for easier batching
groupBy((event) => event.event_type),
mergeMap((groupedObservable) =>
groupedObservable.pipe(
bufferCount(1000), // Batching up-to 1000 events per event type for backpressure reasons
map((events) => ({ eventType: groupedObservable.key, events }))
)
)
)
.subscribe(({ eventType, events }) => {
this.sendToShipper(eventType, events);
});
}