private CompletionStage doDispatch()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [263:351]


  private CompletionStage<DispatcherResponseAndOffset> doDispatch(
      ProcessorMessage processorMessage,
      InflightLimiter.Permit permit,
      Job finalJob,
      Histogram responseDistribution) {
    Preconditions.checkNotNull(messageDispatcher, "message dispatcher must not be null");
    try (io.opentracing.Scope ignoredScope =
        infra.tracer().activateSpan(processorMessage.getSpan())) {
      return messageDispatcher
          .submit(
              ItemAndJob.of(
                  processorMessage.getGrpcDispatcherMessage(
                      finalJob.getRpcDispatcherTask().getUri()),
                  finalJob))
          .whenComplete(
              (r, e) -> {
                // To measure downstream service availability, mark success if
                // message
                // is handled successfully
                // or failed because of non-retryable error
                // otherwise, mark permit failed
                if (e != null) {
                  // For any exception such as CancellationException close
                  // permit with
                  // unknown result
                  // to avoid leak of count
                  permit.complete(InflightLimiter.Result.Unknown);
                } else {
                  responseDistribution.recordValue(r.getCode().ordinal());
                  switch (r.getCode()) {
                    case SKIP:
                    case COMMIT:
                    case RETRY: // RETRY could be message issue or service issue
                    case DLQ: // DLQ is message issue
                      permit.complete(InflightLimiter.Result.Succeed);
                      break;
                    case RESQ:
                      permit.complete();
                      break;
                    default:
                      permit.complete(InflightLimiter.Result.Failed);
                  }
                }
              })
          .thenApply(r -> handleTimeout(r, processorMessage, finalJob))
          .thenApply(
              r -> {
                switch (r.getCode()) {
                  case SKIP:
                    // fallthrough
                  case COMMIT:
                    // For SKIP and COMMIT response codes, mark ack in ack
                    // manager and
                    // return
                    // DispatcherResponseAndOffset with COMMIT action.
                    return new DispatcherResponseAndOffset(
                        DispatcherResponse.Code.COMMIT, ackManager.ack(processorMessage));
                  case RETRY:
                  case RESQ:
                    // fallthrough
                  case DLQ:
                    // For RETRY and STASH actions, mark nack in ack manager.
                    // If nack returns false, we can skip producing to the retry
                    // topic or
                    // dlq topic.
                    // As for now, we always produce to the retry topic or dlq
                    // topic, will
                    // fix this
                    // in
                    // another change.
                    ackManager.nack(processorMessage.getPhysicalMetadata());
                    // Increase both attempt and  retry count.
                    processorMessage.increaseAttemptCount();
                    processorMessage.increaseRetryCount();
                    return new DispatcherResponseAndOffset(r.getCode(), -1);
                    // fallthrough: ack manager failed to nack, so we
                    // fallthrough to gRPC
                    // in-memory retry.
                  default:
                    // Increase only attempt count. This will allow the message
                    // to be
                    // retried in-memory
                    ackManager.nack(processorMessage.getPhysicalMetadata());
                    processorMessage.increaseAttemptCount();
                    return new DispatcherResponseAndOffset(DispatcherResponse.Code.INVALID, -1);
                }
              });
    }
  }