CompletableFuture handleInternally()

in statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java [72:100]


  CompletableFuture<FromFunction> handleInternally(ToFunction request) {
    if (!request.hasInvocation()) {
      return CompletableFuture.completedFuture(FromFunction.getDefaultInstance());
    }
    ToFunction.InvocationBatchRequest batchRequest = request.getInvocation();
    Address self = sdkAddressFromProto(batchRequest.getTarget());
    StatefulFunctionSpec targetSpec = functionSpecs.get(self.type());
    if (targetSpec == null) {
      throw new IllegalStateException("Unknown target type " + self);
    }
    Supplier<? extends StatefulFunction> supplier = targetSpec.supplier();
    if (supplier == null) {
      throw new NullPointerException("missing function supplier for " + self);
    }
    StatefulFunction function = supplier.get();
    if (function == null) {
      throw new NullPointerException("supplier for " + self + " supplied NULL function.");
    }
    StateValueContexts.ResolutionResult stateResolution =
        StateValueContexts.resolve(targetSpec.knownValues(), batchRequest.getStateList());
    if (stateResolution.hasMissingValues()) {
      // not enough information to compute this batch.
      FromFunction res = buildIncompleteInvocationResponse(stateResolution.missingValues());
      return CompletableFuture.completedFuture(res);
    }
    final ConcurrentAddressScopedStorage storage =
        new ConcurrentAddressScopedStorage(stateResolution.resolved());
    return executeBatch(batchRequest, self, storage, function);
  }