in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [481:552]
private RetryPolicy<DispatcherResponseAndOffset> instrumentedRetryPolicy(
Scope scope, String dispatcher, String rpcUri, String group, String topic, int partition) {
return new RetryPolicy<DispatcherResponseAndOffset>()
.onRetry(
r -> {
scope.counter(MetricNames.RETRYER_RETRY).inc(1);
logRetryWarn(
MetricNames.RETRYER_RETRY,
dispatcher,
rpcUri,
group,
topic,
partition,
r.getLastFailure());
})
.onRetriesExceeded(
r -> {
scope.counter(MetricNames.RETRYER_RETRIES_EXCEEDED).inc(1);
if (r.getFailure() == null) {
LOGGER.error(
MetricNames.RETRYER_RETRIES_EXCEEDED,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition));
} else {
LOGGER.error(
MetricNames.RETRYER_RETRIES_EXCEEDED,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition),
r.getFailure());
}
})
.onAbort(
r -> {
scope.counter(MetricNames.RETRYER_ABORT).inc(1);
if (r.getFailure() == null) {
LOGGER.warn(
MetricNames.RETRYER_ABORT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition));
} else {
LOGGER.warn(
MetricNames.RETRYER_ABORT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition),
r.getFailure());
}
})
.onFailedAttempt(
r -> {
scope.counter(MetricNames.RETRYER_FAILED_ATTEMPT).inc(1);
logRetryWarn(
MetricNames.RETRYER_FAILED_ATTEMPT,
dispatcher,
rpcUri,
group,
topic,
partition,
r.getLastFailure());
});
}