public CompletionStage submit()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/grpc/GrpcDispatcher.java [114:186]


  public CompletionStage<GrpcResponse> submit(ItemAndJob<GrpcRequest> item) {
    CompletableFuture<GrpcResponse> completableFuture =
        infra.contextManager().wrap(new CompletableFuture<>());
    if (item == null) {
      completableFuture.completeExceptionally(new NullPointerException());
      return completableFuture;
    }
    final GrpcRequest message = item.getItem();
    MessageStub.Attempt attempt = message.getStub().newAttempt();
    if (attempt.isCanceled()) {
      // already canceled
      completableFuture.complete(GrpcResponse.of(Status.CANCELLED));
      return attempt.complete(completableFuture);
    }

    final Job job = item.getJob();
    MetricsUtils.jobScope(infra.scope(), job, calleeAddress)
        .gauge(MetricNames.CHANNEL_USAGE)
        .update(channel.getMetrics().usage());
    MetricsUtils.jobScope(infra.scope(), job, calleeAddress)
        .gauge(MetricNames.CHANNEL_SIZE)
        .update(channel.getMetrics().size());

    String[] tags = extractDispatchMetricsTags(item);
    Context.CancellableContext context = Context.current().withCancellation();
    attempt.onCancel(() -> context.cancel(null));
    return attempt.complete(
        Instrumentation.instrument
            .withExceptionalCompletion(
                LOGGER,
                infra.scope(),
                wrapInContext(
                    context,
                    () -> {
                      Optional<Status> status = grpcFilter.tryHandleRequest(message, job);
                      if (status.isPresent()) {
                        return CompletableFuture.completedFuture(GrpcResponse.of(status.get()));
                      }

                      long timeoutMs =
                          GrpcUtils.getTimeout(
                              job, configuration, message.getTimeoutCount(), infra.scope());

                      Instrumentation.instrument.withStreamObserver(
                          LOGGER,
                          infra.scope(),
                          ClientCalls::asyncUnaryCall,
                          channelWithMetadata(message, tags)
                              .newCall(methodDescriptor, extractCallOptions(item)),
                          message.payload(),
                          newStreamObserver(completableFuture, timeoutMs, message, tags),
                          MetricNames.CALL,
                          tags);
                      return completableFuture;
                    }),
                MetricNames.DISPATCH,
                tags)
            .whenComplete(
                (r, t) -> {
                  // adding additional metric to include all statuses including filtered ones
                  Map<String, String> map = new HashMap<>();
                  Utils.copyTags(map, tags);
                  map.put(Tags.Key.code, r.status().getCode().name());
                  infra.scope().tagged(map).counter(MetricNames.CALL_WITH_FILTER).inc(1);
                  // close context to avoid memory leak
                  context.close();
                })
            .thenApply(
                response -> {
                  message.getFuture().complete(response);
                  return response;
                }));
  }