in statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java [72:100]
CompletableFuture<FromFunction> handleInternally(ToFunction request) {
if (!request.hasInvocation()) {
return CompletableFuture.completedFuture(FromFunction.getDefaultInstance());
}
ToFunction.InvocationBatchRequest batchRequest = request.getInvocation();
Address self = sdkAddressFromProto(batchRequest.getTarget());
StatefulFunctionSpec targetSpec = functionSpecs.get(self.type());
if (targetSpec == null) {
throw new IllegalStateException("Unknown target type " + self);
}
Supplier<? extends StatefulFunction> supplier = targetSpec.supplier();
if (supplier == null) {
throw new NullPointerException("missing function supplier for " + self);
}
StatefulFunction function = supplier.get();
if (function == null) {
throw new NullPointerException("supplier for " + self + " supplied NULL function.");
}
StateValueContexts.ResolutionResult stateResolution =
StateValueContexts.resolve(targetSpec.knownValues(), batchRequest.getStateList());
if (stateResolution.hasMissingValues()) {
// not enough information to compute this batch.
FromFunction res = buildIncompleteInvocationResponse(stateResolution.missingValues());
return CompletableFuture.completedFuture(res);
}
final ConcurrentAddressScopedStorage storage =
new ConcurrentAddressScopedStorage(stateResolution.resolved());
return executeBatch(batchRequest, self, storage, function);
}