in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [635:692]
private RetryPolicy<DispatcherResponseAndOffset> getGrpcRetryPolicy(Job job, Scope scope) {
final String dispatcher = DispatcherMessage.Type.GRPC.toString();
final String rpcUri = job.getRpcDispatcherTask().getUri();
final String group = job.getKafkaConsumerTask().getConsumerGroup();
final String topic = job.getKafkaConsumerTask().getTopic();
final int partition = job.getKafkaConsumerTask().getPartition();
final Scope retryerScope =
scope.tagged(ImmutableMap.of(StructuredLogging.DISPATCHER, dispatcher));
return instrumentedRetryPolicy(retryerScope, dispatcher, rpcUri, group, topic, partition)
.withMaxRetries(maxGrpcRetry)
.abortIf(dispatcherResponseAndOffset -> !isRunning())
// when the executor was shut down
.abortOn(RejectedExecutionException.class)
// exponential backoff from 1 milliseconds to 1 min
// combined with Integer.MAX_VALUE retries means about 2 billion seconds = 63 years
// until we exceed max retires, which is practically forever.
.withBackoff(1, 60000, ChronoUnit.MILLIS)
// By default all exceptions are handled.
.handleResultIf(
r -> {
// do not retry when the processor is not running any more,
if (!isRunning()) {
return false;
}
switch (r.getCode()) {
case SKIP:
// fallthrough to false
case COMMIT:
// SKIP, COMMIT should not be retried to gRPC.
return proceedToRetry(
false, retryerScope, dispatcher, rpcUri, group, topic, partition);
case DLQ:
// if dlq topic is configured, should not be retried to gRPC
if (!job.getRpcDispatcherTask().getDlqTopic().isEmpty()) {
return false;
}
// fallthrough to do next check
case RETRY:
// if retry queue topic is configured, should not be retried to gRPC
if (RetryUtils.hasRetryTopic(job)) {
return false;
}
// No retry or DLQ retried to gRPC.
return proceedToRetry(
true, retryerScope, dispatcher, rpcUri, group, topic, partition);
case RESQ:
// if resilience queue topic is configured, should not be retried to gRPC
if (RetryUtils.hasResqTopic(job)) {
return false;
}
// fallthrough to true
default:
// All other response codes should be retried to gRPC.
return proceedToRetry(
true, retryerScope, dispatcher, rpcUri, group, topic, partition);
}
});
}