in java/showcase/src/main/java/org/apache/flink/statefun/playground/java/showcase/part6/serving/GreetingsFn.java [52:67]
public CompletableFuture<Void> apply(Context context, Message message) {
if (message.is(USER_PROFILE_PROTOBUF_TYPE)) {
final UserProfile profile = message.as(USER_PROFILE_PROTOBUF_TYPE);
final String greetings = createGreetingsMessage(profile);
final String userId = context.self().id();
System.out.println("GreetingsFn (instance id: " + userId + "):\t" + greetings);
context.send(
KafkaEgressMessage.forEgress(KAFKA_EGRESS)
.withTopic("greetings")
.withUtf8Key(userId)
.withUtf8Value(greetings)
.build());
}
return context.done();
}