public CompletableFuture apply()

in java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part6/serving/UserFn.java [64:92]


  public CompletableFuture<Void> apply(Context context, Message message) {
    if (message.is(USER_LOGIN_JSON_TYPE)) {
      final UserLogin login = message.as(USER_LOGIN_JSON_TYPE);

      int seenCount = context.storage().get(SEEN_COUNT).orElse(0);
      seenCount++;

      final long nowMs = System.currentTimeMillis();
      final long lastSeenTimestampMs = context.storage().get(SEEN_TIMESTAMP_MS).orElse(nowMs);

      context.storage().set(SEEN_COUNT, seenCount);
      context.storage().set(SEEN_TIMESTAMP_MS, nowMs);

      final UserProfile profile =
          UserProfile.newBuilder()
              .setName(login.getUserName())
              .setLoginLocation(login.getLoginType().name())
              .setSeenCount(seenCount)
              .setLastSeenDeltaMs(nowMs - lastSeenTimestampMs)
              .build();
      context.send(
          MessageBuilder.forAddress(GreetingsFn.TYPENAME, login.getUserId())
              .withCustomType(USER_PROFILE_PROTOBUF_TYPE, profile)
              .build());
    } else {
      throw new IllegalArgumentException("Unexpected message type: " + message.valueTypeName());
    }
    return context.done();
  }