public Flowable createFirebaseInAppMessageStream()

in firebase-inappmessaging/src/main/java/com/google/firebase/inappmessaging/internal/InAppMessageStreamManager.java [168:298]


  public Flowable<TriggeredInAppMessage> createFirebaseInAppMessageStream() {
    return Flowable.merge(
            appForegroundEventFlowable,
            analyticsEventsManager.getAnalyticsEventsFlowable(),
            programmaticTriggerEventFlowable)
        .doOnNext(e -> Logging.logd("Event Triggered: " + e))
        .observeOn(schedulers.io())
        .concatMap(
            event -> {
              Maybe<FetchEligibleCampaignsResponse> cacheRead =
                  campaignCacheClient
                      .get()
                      .doOnSuccess(r -> Logging.logd("Fetched from cache"))
                      .doOnError(e -> Logging.logw("Cache read error: " + e.getMessage()))
                      .onErrorResumeNext(Maybe.empty()); // Absorb cache read failures

              Consumer<FetchEligibleCampaignsResponse> cacheWrite =
                  response ->
                      campaignCacheClient
                          .put(response)
                          .doOnComplete(() -> Logging.logd("Wrote to cache"))
                          .doOnError(e -> Logging.logw("Cache write error: " + e.getMessage()))
                          .onErrorResumeNext(
                              ignored -> Completable.complete()) // Absorb cache write fails
                          .subscribe();

              Function<ThickContent, Maybe<ThickContent>> filterAlreadyImpressed =
                  content ->
                      content.getIsTestCampaign()
                          ? Maybe.just(content)
                          : impressionStorageClient
                              .isImpressed(content)
                              .doOnError(
                                  e ->
                                      Logging.logw("Impression store read fail: " + e.getMessage()))
                              .onErrorResumeNext(
                                  Single.just(false)) // Absorb impression read errors
                              .doOnSuccess(isImpressed -> logImpressionStatus(content, isImpressed))
                              .filter(isImpressed -> !isImpressed)
                              .map(isImpressed -> content);

              Function<ThickContent, Maybe<ThickContent>> appForegroundRateLimitFilter =
                  content -> getContentIfNotRateLimited(event, content);

              Function<ThickContent, Maybe<ThickContent>> filterDisplayable =
                  thickContent -> {
                    switch (thickContent.getContent().getMessageDetailsCase()) {
                      case BANNER:
                      case IMAGE_ONLY:
                      case MODAL:
                      case CARD:
                        return Maybe.just(thickContent);
                      default:
                        Logging.logd("Filtering non-displayable message");
                        return Maybe.empty();
                    }
                  };

              Function<FetchEligibleCampaignsResponse, Maybe<TriggeredInAppMessage>>
                  selectThickContent =
                      response ->
                          getTriggeredInAppMessageMaybe(
                              event,
                              filterAlreadyImpressed,
                              appForegroundRateLimitFilter,
                              filterDisplayable,
                              response);

              Maybe<CampaignImpressionList> alreadySeenCampaigns =
                  impressionStorageClient
                      .getAllImpressions()
                      .doOnError(
                          e -> Logging.logw("Impressions store read fail: " + e.getMessage()))
                      .defaultIfEmpty(CampaignImpressionList.getDefaultInstance())
                      .onErrorResumeNext(Maybe.just(CampaignImpressionList.getDefaultInstance()));

              Maybe<InstallationIdResult> getIID =
                  Maybe.zip(
                          taskToMaybe(firebaseInstallations.getId()),
                          taskToMaybe(firebaseInstallations.getToken(false)),
                          InstallationIdResult::create)
                      .observeOn(schedulers.io());

              Function<CampaignImpressionList, Maybe<FetchEligibleCampaignsResponse>> serviceFetch =
                  campaignImpressionList -> {
                    if (!dataCollectionHelper.isAutomaticDataCollectionEnabled()) {
                      Logging.logi(
                          "Automatic data collection is disabled, not attempting campaign fetch from service.");
                      return Maybe.just(cacheExpiringResponse());
                    }

                    return getIID
                        .filter(InAppMessageStreamManager::validIID)
                        .map(iid -> apiClient.getFiams(iid, campaignImpressionList))
                        .switchIfEmpty(Maybe.just(cacheExpiringResponse()))
                        .doOnSuccess(
                            resp ->
                                Logging.logi(
                                    String.format(
                                        Locale.US,
                                        "Successfully fetched %d messages from backend",
                                        resp.getMessagesList().size())))
                        .doOnSuccess(
                            resp -> impressionStorageClient.clearImpressions(resp).subscribe())
                        .doOnSuccess(analyticsEventsManager::updateContextualTriggers)
                        .doOnSuccess(testDeviceHelper::processCampaignFetch)
                        .doOnError(e -> Logging.logw("Service fetch error: " + e.getMessage()))
                        .onErrorResumeNext(Maybe.empty()); // Absorb service failures
                  };

              if (shouldIgnoreCache(event)) {
                Logging.logi(
                    String.format(
                        "Forcing fetch from service rather than cache. "
                            + "Test Device: %s | App Fresh Install: %s",
                        testDeviceHelper.isDeviceInTestMode(),
                        testDeviceHelper.isAppInstallFresh()));
                return alreadySeenCampaigns
                    .flatMap(serviceFetch)
                    .flatMap(selectThickContent)
                    .toFlowable();
              }

              Logging.logd("Attempting to fetch campaigns using cache");
              return cacheRead
                  .switchIfEmpty(alreadySeenCampaigns.flatMap(serviceFetch).doOnSuccess(cacheWrite))
                  .flatMap(selectThickContent)
                  .toFlowable();
            })
        .observeOn(schedulers.mainThread()); // Updates are delivered on the main thread
  }