private setInternalSubscriber()

in src/shippers/elastic_v3/server/src/server_shipper.ts [240:284]


  private setInternalSubscriber() {
    // Create an emitter that emits when MIN_TIME_SINCE_LAST_SEND have passed since the last time we sent the data
    const minimumTimeSinceLastSent$ = interval(SECOND).pipe(
      filter(() => Date.now() - this.lastBatchSent >= MIN_TIME_SINCE_LAST_SEND)
    );

    merge(
      minimumTimeSinceLastSent$.pipe(
        takeUntil(this.shutdown$),
        map(() => ({ shouldFlush: false }))
      ),
      // Whenever a `flush` request comes in
      this.flush$.pipe(map(() => ({ shouldFlush: true }))),
      // Attempt to send one last time on shutdown, flushing the queue
      this.shutdown$.pipe(map(() => ({ shouldFlush: true })))
    )
      .pipe(
        // Only move ahead if it's opted-in and online, and there are some events in the queue
        filter(() => {
          const shouldSendAnything =
            this.isOptedIn$.value === true && this.firstTimeOffline === null && this.internalQueue.length > 0;

          // If it should not send anything, re-emit the inflight request observable just in case it's already 0
          if (!shouldSendAnything) {
            this.inFlightRequests$.next(this.inFlightRequests$.value);
          }

          return shouldSendAnything;
        }),

        // Send the events:
        // 1. Set lastBatchSent and retrieve the events to send (clearing the queue) in a synchronous operation to avoid race conditions.
        map(({ shouldFlush }) => {
          this.lastBatchSent = Date.now();
          return this.getEventsToSend(shouldFlush);
        }),
        // 2. Skip empty buffers (just to be sure)
        filter((events) => events.length > 0),
        // 3. Actually send the events
        // Using `mergeMap` here because we want to send events whenever the emitter says so:
        //   We don't want to skip emissions (exhaustMap) or enqueue them (concatMap).
        mergeMap((eventsToSend) => this.sendEvents(eventsToSend))
      )
      .subscribe();
  }