private CompletionStage doDispatch()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [323:398]


  private CompletionStage<DispatcherResponseAndOffset> doDispatch(
      ProcessorMessage processorMessage,
      InflightLimiter.Permit permit,
      Job finalJob,
      Histogram responseDistribution) {
    Preconditions.checkNotNull(messageDispatcher, "message dispatcher must not be null");
    try (io.opentracing.Scope ignoredScope =
        infra.tracer().activateSpan(processorMessage.getSpan())) {
      return messageDispatcher
          .submit(
              ItemAndJob.of(
                  processorMessage.getGrpcDispatcherMessage(
                      finalJob.getRpcDispatcherTask().getUri()),
                  finalJob))
          .whenComplete(
              (r, e) -> {
                Scope subScope = infra.scope().tagged(getMetricsTags(finalJob));
                if (r != null) {
                  responseDistribution.recordValue(r.getCode().ordinal());
                  if (r.getCode() == DispatcherResponse.Code.COMMIT) {
                    // measure message end-to-end latency
                    subScope
                        .histogram(MetricNames.MESSAGE_END_TO_END_LATENCY, E2E_DURATION_BUCKETS)
                        .recordDuration(
                            Duration.ofMillis(
                                System.currentTimeMillis()
                                    - processorMessage.getLogicalTimestamp()));
                  }
                }
              })
          .whenComplete((r, e) -> handlePermit(r, e, permit))
          .thenApply(r -> handleTimeout(r, processorMessage, finalJob))
          .thenApply(
              r -> {
                switch (r.getCode()) {
                  case SKIP:
                    // fallthrough
                  case COMMIT:
                    // For SKIP and COMMIT response codes, mark ack in ack
                    // manager and
                    // return
                    // DispatcherResponseAndOffset with COMMIT action.
                    return new DispatcherResponseAndOffset(
                        DispatcherResponse.Code.COMMIT, ackManager.ack(processorMessage));
                  case RETRY:
                  case RESQ:
                    // fallthrough
                  case DLQ:
                    // For RETRY and STASH actions, mark nack in ack manager.
                    // If nack returns false, we can skip producing to the retry
                    // topic or
                    // dlq topic.
                    // As for now, we always produce to the retry topic or dlq
                    // topic, will
                    // fix this
                    // in
                    // another change.
                    ackManager.nack(processorMessage.getPhysicalMetadata());
                    // Increase both attempt and  retry count.
                    processorMessage.increaseAttemptCount();
                    processorMessage.increaseRetryCount();
                    return new DispatcherResponseAndOffset(r.getCode(), -1);
                    // fallthrough: ack manager failed to nack, so we
                    // fallthrough to gRPC
                    // in-memory retry.
                  default:
                    // Increase only attempt count. This will allow the message
                    // to be
                    // retried in-memory
                    ackManager.nack(processorMessage.getPhysicalMetadata());
                    processorMessage.increaseAttemptCount();
                    return new DispatcherResponseAndOffset(DispatcherResponse.Code.INVALID, -1);
                }
              });
    }
  }