in java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part5/asyncops/AsyncOpsShowcaseFn.java [55:79]
public CompletableFuture<Void> apply(Context context, Message message) {
final Optional<UserProfile> storedUserProfile = context.storage().get(USER_PROFILE_VALUE);
if (!storedUserProfile.isPresent()) {
final String username = context.self().id();
final CompletableFuture<Integer> ageResult = getAgeFromRemoteDatabase(username);
final CompletableFuture<List<String>> friendsResult =
getFriendsListFromAnotherRemoteDatabase(username);
return CompletableFuture.allOf(ageResult, friendsResult)
.whenComplete(
// Upon completion of the various async operations, write our enriched user profile to
// be persisted in StateFun's state storage.
(ignored, exception) -> {
final UserProfile profile =
UserProfile.newBuilder()
.setAge(ageResult.join())
.addAllFriend(friendsResult.join())
.build();
context.storage().set(USER_PROFILE_VALUE, profile);
});
}
return context.done();
}