public CompletionStage submit()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/grpc/GrpcDispatcher.java [112:170]


  public CompletionStage<GrpcResponse> submit(ItemAndJob<GrpcRequest> item) {
    CompletableFuture<GrpcResponse> completableFuture =
        infra.contextManager().wrap(new CompletableFuture<>());
    if (item == null) {
      completableFuture.completeExceptionally(new NullPointerException());
      return completableFuture;
    }
    final GrpcRequest message = item.getItem();
    MessageStub.Attempt attempt = message.getStub().newAttempt();
    if (attempt.isCanceled()) {
      // already canceled
      completableFuture.complete(GrpcResponse.of(Status.CANCELLED));
      return attempt.complete(completableFuture);
    }

    final Job job = item.getJob();
    MetricsUtils.jobScope(infra.scope(), job, calleeAddress)
        .gauge(MetricNames.CHANNEL_USAGE)
        .update(channel.getMetrics().usage());

    String[] tags = extractDispatchMetricsTags(item);
    return attempt.complete(
        Instrumentation.instrument
            .withExceptionalCompletion(
                LOGGER,
                infra.scope(),
                () -> {
                  Optional<Status> status = grpcFilter.tryHandleRequest(message, job);
                  if (status.isPresent()) {
                    return CompletableFuture.completedFuture(GrpcResponse.of(status.get()));
                  }

                  long timeoutMs =
                      GrpcUtils.getTimeout(
                          job, configuration, message.getTimeoutCount(), infra.scope());
                  ClientCall clientCall =
                      channelWithMetadata(message)
                          .newCall(methodDescriptor, extractCallOptions(item));

                  attempt.onCancel(() -> clientCall.cancel(ERROR_MESSAGE_CANCEL, null));
                  Instrumentation.instrument.withStreamObserver(
                      LOGGER,
                      infra.scope(),
                      ClientCalls::asyncUnaryCall,
                      clientCall,
                      message.payload(),
                      newStreamObserver(completableFuture, timeoutMs, message, tags),
                      MetricNames.CALL,
                      tags);
                  return completableFuture;
                },
                MetricNames.DISPATCH,
                tags)
            .thenApply(
                response -> {
                  message.getFuture().complete(response);
                  return response;
                }));
  }