private RetryPolicy instrumentedRetryPolicy()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [481:552]


  private RetryPolicy<DispatcherResponseAndOffset> instrumentedRetryPolicy(
      Scope scope, String dispatcher, String rpcUri, String group, String topic, int partition) {
    return new RetryPolicy<DispatcherResponseAndOffset>()
        .onRetry(
            r -> {
              scope.counter(MetricNames.RETRYER_RETRY).inc(1);
              logRetryWarn(
                  MetricNames.RETRYER_RETRY,
                  dispatcher,
                  rpcUri,
                  group,
                  topic,
                  partition,
                  r.getLastFailure());
            })
        .onRetriesExceeded(
            r -> {
              scope.counter(MetricNames.RETRYER_RETRIES_EXCEEDED).inc(1);
              if (r.getFailure() == null) {
                LOGGER.error(
                    MetricNames.RETRYER_RETRIES_EXCEEDED,
                    StructuredLogging.dispatcher(dispatcher),
                    StructuredLogging.rpcRoutingKey(rpcUri),
                    StructuredLogging.kafkaGroup(group),
                    StructuredLogging.kafkaTopic(topic),
                    StructuredLogging.kafkaPartition(partition));
              } else {
                LOGGER.error(
                    MetricNames.RETRYER_RETRIES_EXCEEDED,
                    StructuredLogging.dispatcher(dispatcher),
                    StructuredLogging.rpcRoutingKey(rpcUri),
                    StructuredLogging.kafkaGroup(group),
                    StructuredLogging.kafkaTopic(topic),
                    StructuredLogging.kafkaPartition(partition),
                    r.getFailure());
              }
            })
        .onAbort(
            r -> {
              scope.counter(MetricNames.RETRYER_ABORT).inc(1);
              if (r.getFailure() == null) {
                LOGGER.warn(
                    MetricNames.RETRYER_ABORT,
                    StructuredLogging.dispatcher(dispatcher),
                    StructuredLogging.rpcRoutingKey(rpcUri),
                    StructuredLogging.kafkaGroup(group),
                    StructuredLogging.kafkaTopic(topic),
                    StructuredLogging.kafkaPartition(partition));
              } else {
                LOGGER.warn(
                    MetricNames.RETRYER_ABORT,
                    StructuredLogging.dispatcher(dispatcher),
                    StructuredLogging.rpcRoutingKey(rpcUri),
                    StructuredLogging.kafkaGroup(group),
                    StructuredLogging.kafkaTopic(topic),
                    StructuredLogging.kafkaPartition(partition),
                    r.getFailure());
              }
            })
        .onFailedAttempt(
            r -> {
              scope.counter(MetricNames.RETRYER_FAILED_ATTEMPT).inc(1);
              logRetryWarn(
                  MetricNames.RETRYER_FAILED_ATTEMPT,
                  dispatcher,
                  rpcUri,
                  group,
                  topic,
                  partition,
                  r.getLastFailure());
            });
  }