protected DispatcherResponse handleTimeout()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [407:455]


  protected DispatcherResponse handleTimeout(
      DispatcherResponse dispatcherResponse, ProcessorMessage processorMessage, Job finalJob) {
    TopicPartitionOffset topicPartitionOffset = processorMessage.getPhysicalMetadata();
    String topic = topicPartitionOffset.getTopic();
    DispatcherResponse result = dispatcherResponse;
    switch (dispatcherResponse.getCode()) {
      case SKIP:
      case COMMIT:
        // credit token to DLQ limiter
        dlqDispatchManager.credit(
            new TopicPartition(
                topicPartitionOffset.getTopic(), topicPartitionOffset.getPartition()),
            1);
        break;
      case BACKOFF:
        if (RetryUtils.isDLQTopic(topic, finalJob)) {
          // Messages in DLQ should not enter other queues
          result = new DispatcherResponse(DispatcherResponse.Code.DLQ);
        } else if (RetryUtils.isResqTopic(topic, finalJob)) {
          // messages in resilience queue should do in-memory retry to avoid leak to other queues
          result = new DispatcherResponse(DispatcherResponse.Code.INVALID);
        } else {
          // when maxRpcTimeouts is configured and timeout exceeds
          // limit
          // 1. acquire token from dlqDispatchManager
          // 2. convert BACKOFF to DLQ if token fetched
          int maxRpcTimeouts = finalJob.getRpcDispatcherTask().getMaxRpcTimeouts();
          if (maxRpcTimeouts > 0
              && processorMessage.getTimeoutCount() >= maxRpcTimeouts
              && dlqDispatchManager.tryAcquire(
                  new TopicPartition(
                      topicPartitionOffset.getTopic(), topicPartitionOffset.getPartition()),
                  1)) {
            result = new DispatcherResponseAndOffset(DispatcherResponse.Code.DLQ);
          } else {
            // timeout as a case of failure in transit should be handled with in-memory retry and
            // Blocking detection/mitigation.
            // As current implementation already handle timeout with retry queue, change to
            // in-memory retry may cause blocking,
            // so temporary keep the behavior unchanged.
            result = new DispatcherResponseAndOffset(DispatcherResponse.Code.RETRY);
          }
        }
        // increase timeout count
        processorMessage.increaseTimeoutCount();
        break;
    }
    return result;
  }