private RetryPolicy getKafkaRetryPolicy()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [857:909]


  private RetryPolicy<DispatcherResponseAndOffset> getKafkaRetryPolicy(Job job, Scope scope) {
    final String dispatcher = DispatcherMessage.Type.KAFKA.toString();
    final String rpcUri = job.getRpcDispatcherTask().getUri();
    final String group = job.getKafkaConsumerTask().getConsumerGroup();
    final String topic = job.getKafkaConsumerTask().getTopic();
    final int partition = job.getKafkaConsumerTask().getPartition();
    final Scope retryerScope =
        scope.tagged(ImmutableMap.of(StructuredLogging.DISPATCHER, dispatcher));
    return instrumentedRetryPolicy(retryerScope, dispatcher, rpcUri, group, topic, partition)
        // Failsafe does not support unlimited retries.
        .withMaxRetries(maxKafkaRetry)
        .abortIf(dispatcherResponseAndOffset -> !isRunning())
        // when the executor was shut down
        .abortOn(RejectedExecutionException.class)
        // exponential backoff from 1 milliseconds to 1 min
        // combined with Integer.MAX_VALUE retries means about 2 billion seconds = 63 years
        // until we exceed max retires, which is practically forever.
        .withBackoff(1, 60000, ChronoUnit.MILLIS)
        // By default all exceptions are handled.
        .handleResultIf(
            r -> {
              // do not retry when the processor is not running any more,
              if (!isRunning()) {
                return false;
              }
              switch (r.getCode()) {
                case SKIP:
                  // fallthrough to false
                case COMMIT:
                  // SKIP, COMMIT is considered success for Kafka producing
                  retryerScope.counter(MetricNames.RETRYER_ACCEPT_RESULT).inc(1);
                  LOGGER.debug(
                      MetricNames.RETRYER_ACCEPT_RESULT,
                      StructuredLogging.dispatcher(dispatcher),
                      StructuredLogging.rpcRoutingKey(rpcUri),
                      StructuredLogging.kafkaGroup(group),
                      StructuredLogging.kafkaTopic(topic),
                      StructuredLogging.kafkaPartition(partition));
                  return false;
                default:
                  // All other response codes should be retried to Kafka.
                  retryerScope.counter(MetricNames.RETRYER_REJECT_RESULT).inc(1);
                  LOGGER.warn(
                      MetricNames.RETRYER_REJECT_RESULT,
                      StructuredLogging.dispatcher(dispatcher),
                      StructuredLogging.rpcRoutingKey(rpcUri),
                      StructuredLogging.kafkaGroup(group),
                      StructuredLogging.kafkaTopic(topic),
                      StructuredLogging.kafkaPartition(partition));
                  return true;
              }
            });
  }