in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [857:909]
private RetryPolicy<DispatcherResponseAndOffset> getKafkaRetryPolicy(Job job, Scope scope) {
final String dispatcher = DispatcherMessage.Type.KAFKA.toString();
final String rpcUri = job.getRpcDispatcherTask().getUri();
final String group = job.getKafkaConsumerTask().getConsumerGroup();
final String topic = job.getKafkaConsumerTask().getTopic();
final int partition = job.getKafkaConsumerTask().getPartition();
final Scope retryerScope =
scope.tagged(ImmutableMap.of(StructuredLogging.DISPATCHER, dispatcher));
return instrumentedRetryPolicy(retryerScope, dispatcher, rpcUri, group, topic, partition)
// Failsafe does not support unlimited retries.
.withMaxRetries(maxKafkaRetry)
.abortIf(dispatcherResponseAndOffset -> !isRunning())
// when the executor was shut down
.abortOn(RejectedExecutionException.class)
// exponential backoff from 1 milliseconds to 1 min
// combined with Integer.MAX_VALUE retries means about 2 billion seconds = 63 years
// until we exceed max retires, which is practically forever.
.withBackoff(1, 60000, ChronoUnit.MILLIS)
// By default all exceptions are handled.
.handleResultIf(
r -> {
// do not retry when the processor is not running any more,
if (!isRunning()) {
return false;
}
switch (r.getCode()) {
case SKIP:
// fallthrough to false
case COMMIT:
// SKIP, COMMIT is considered success for Kafka producing
retryerScope.counter(MetricNames.RETRYER_ACCEPT_RESULT).inc(1);
LOGGER.debug(
MetricNames.RETRYER_ACCEPT_RESULT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition));
return false;
default:
// All other response codes should be retried to Kafka.
retryerScope.counter(MetricNames.RETRYER_REJECT_RESULT).inc(1);
LOGGER.warn(
MetricNames.RETRYER_REJECT_RESULT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition));
return true;
}
});
}