in java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part6/serving/UserFn.java [64:92]
public CompletableFuture<Void> apply(Context context, Message message) {
if (message.is(USER_LOGIN_JSON_TYPE)) {
final UserLogin login = message.as(USER_LOGIN_JSON_TYPE);
int seenCount = context.storage().get(SEEN_COUNT).orElse(0);
seenCount++;
final long nowMs = System.currentTimeMillis();
final long lastSeenTimestampMs = context.storage().get(SEEN_TIMESTAMP_MS).orElse(nowMs);
context.storage().set(SEEN_COUNT, seenCount);
context.storage().set(SEEN_TIMESTAMP_MS, nowMs);
final UserProfile profile =
UserProfile.newBuilder()
.setName(login.getUserName())
.setLoginLocation(login.getLoginType().name())
.setSeenCount(seenCount)
.setLastSeenDeltaMs(nowMs - lastSeenTimestampMs)
.build();
context.send(
MessageBuilder.forAddress(GreetingsFn.TYPENAME, login.getUserId())
.withCustomType(USER_PROFILE_PROTOBUF_TYPE, profile)
.build());
} else {
throw new IllegalArgumentException("Unexpected message type: " + message.valueTypeName());
}
return context.done();
}