in firebase-inappmessaging/src/main/java/com/google/firebase/inappmessaging/internal/InAppMessageStreamManager.java [168:298]
public Flowable<TriggeredInAppMessage> createFirebaseInAppMessageStream() {
return Flowable.merge(
appForegroundEventFlowable,
analyticsEventsManager.getAnalyticsEventsFlowable(),
programmaticTriggerEventFlowable)
.doOnNext(e -> Logging.logd("Event Triggered: " + e))
.observeOn(schedulers.io())
.concatMap(
event -> {
Maybe<FetchEligibleCampaignsResponse> cacheRead =
campaignCacheClient
.get()
.doOnSuccess(r -> Logging.logd("Fetched from cache"))
.doOnError(e -> Logging.logw("Cache read error: " + e.getMessage()))
.onErrorResumeNext(Maybe.empty()); // Absorb cache read failures
Consumer<FetchEligibleCampaignsResponse> cacheWrite =
response ->
campaignCacheClient
.put(response)
.doOnComplete(() -> Logging.logd("Wrote to cache"))
.doOnError(e -> Logging.logw("Cache write error: " + e.getMessage()))
.onErrorResumeNext(
ignored -> Completable.complete()) // Absorb cache write fails
.subscribe();
Function<ThickContent, Maybe<ThickContent>> filterAlreadyImpressed =
content ->
content.getIsTestCampaign()
? Maybe.just(content)
: impressionStorageClient
.isImpressed(content)
.doOnError(
e ->
Logging.logw("Impression store read fail: " + e.getMessage()))
.onErrorResumeNext(
Single.just(false)) // Absorb impression read errors
.doOnSuccess(isImpressed -> logImpressionStatus(content, isImpressed))
.filter(isImpressed -> !isImpressed)
.map(isImpressed -> content);
Function<ThickContent, Maybe<ThickContent>> appForegroundRateLimitFilter =
content -> getContentIfNotRateLimited(event, content);
Function<ThickContent, Maybe<ThickContent>> filterDisplayable =
thickContent -> {
switch (thickContent.getContent().getMessageDetailsCase()) {
case BANNER:
case IMAGE_ONLY:
case MODAL:
case CARD:
return Maybe.just(thickContent);
default:
Logging.logd("Filtering non-displayable message");
return Maybe.empty();
}
};
Function<FetchEligibleCampaignsResponse, Maybe<TriggeredInAppMessage>>
selectThickContent =
response ->
getTriggeredInAppMessageMaybe(
event,
filterAlreadyImpressed,
appForegroundRateLimitFilter,
filterDisplayable,
response);
Maybe<CampaignImpressionList> alreadySeenCampaigns =
impressionStorageClient
.getAllImpressions()
.doOnError(
e -> Logging.logw("Impressions store read fail: " + e.getMessage()))
.defaultIfEmpty(CampaignImpressionList.getDefaultInstance())
.onErrorResumeNext(Maybe.just(CampaignImpressionList.getDefaultInstance()));
Maybe<InstallationIdResult> getIID =
Maybe.zip(
taskToMaybe(firebaseInstallations.getId()),
taskToMaybe(firebaseInstallations.getToken(false)),
InstallationIdResult::create)
.observeOn(schedulers.io());
Function<CampaignImpressionList, Maybe<FetchEligibleCampaignsResponse>> serviceFetch =
campaignImpressionList -> {
if (!dataCollectionHelper.isAutomaticDataCollectionEnabled()) {
Logging.logi(
"Automatic data collection is disabled, not attempting campaign fetch from service.");
return Maybe.just(cacheExpiringResponse());
}
return getIID
.filter(InAppMessageStreamManager::validIID)
.map(iid -> apiClient.getFiams(iid, campaignImpressionList))
.switchIfEmpty(Maybe.just(cacheExpiringResponse()))
.doOnSuccess(
resp ->
Logging.logi(
String.format(
Locale.US,
"Successfully fetched %d messages from backend",
resp.getMessagesList().size())))
.doOnSuccess(
resp -> impressionStorageClient.clearImpressions(resp).subscribe())
.doOnSuccess(analyticsEventsManager::updateContextualTriggers)
.doOnSuccess(testDeviceHelper::processCampaignFetch)
.doOnError(e -> Logging.logw("Service fetch error: " + e.getMessage()))
.onErrorResumeNext(Maybe.empty()); // Absorb service failures
};
if (shouldIgnoreCache(event)) {
Logging.logi(
String.format(
"Forcing fetch from service rather than cache. "
+ "Test Device: %s | App Fresh Install: %s",
testDeviceHelper.isDeviceInTestMode(),
testDeviceHelper.isAppInstallFresh()));
return alreadySeenCampaigns
.flatMap(serviceFetch)
.flatMap(selectThickContent)
.toFlowable();
}
Logging.logd("Attempting to fetch campaigns using cache");
return cacheRead
.switchIfEmpty(alreadySeenCampaigns.flatMap(serviceFetch).doOnSuccess(cacheWrite))
.flatMap(selectThickContent)
.toFlowable();
})
.observeOn(schedulers.mainThread()); // Updates are delivered on the main thread
}