private RetryPolicy getGrpcRetryPolicy()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [635:692]


  private RetryPolicy<DispatcherResponseAndOffset> getGrpcRetryPolicy(Job job, Scope scope) {
    final String dispatcher = DispatcherMessage.Type.GRPC.toString();
    final String rpcUri = job.getRpcDispatcherTask().getUri();
    final String group = job.getKafkaConsumerTask().getConsumerGroup();
    final String topic = job.getKafkaConsumerTask().getTopic();
    final int partition = job.getKafkaConsumerTask().getPartition();
    final Scope retryerScope =
        scope.tagged(ImmutableMap.of(StructuredLogging.DISPATCHER, dispatcher));
    return instrumentedRetryPolicy(retryerScope, dispatcher, rpcUri, group, topic, partition)
        .withMaxRetries(maxGrpcRetry)
        .abortIf(dispatcherResponseAndOffset -> !isRunning())
        // when the executor was shut down
        .abortOn(RejectedExecutionException.class)
        // exponential backoff from 1 milliseconds to 1 min
        // combined with Integer.MAX_VALUE retries means about 2 billion seconds = 63 years
        // until we exceed max retires, which is practically forever.
        .withBackoff(1, 60000, ChronoUnit.MILLIS)
        // By default all exceptions are handled.
        .handleResultIf(
            r -> {
              // do not retry when the processor is not running any more,
              if (!isRunning()) {
                return false;
              }
              switch (r.getCode()) {
                case SKIP:
                  // fallthrough to false
                case COMMIT:
                  // SKIP, COMMIT should not be retried to gRPC.
                  return proceedToRetry(
                      false, retryerScope, dispatcher, rpcUri, group, topic, partition);
                case DLQ:
                  // if dlq topic is configured, should not be retried to gRPC
                  if (!job.getRpcDispatcherTask().getDlqTopic().isEmpty()) {
                    return false;
                  }
                  // fallthrough to do next check
                case RETRY:
                  // if retry queue topic is configured, should not be retried to gRPC
                  if (RetryUtils.hasRetryTopic(job)) {
                    return false;
                  }
                  // No retry or DLQ retried to gRPC.
                  return proceedToRetry(
                      true, retryerScope, dispatcher, rpcUri, group, topic, partition);
                case RESQ:
                  // if resilience queue topic is configured, should not be retried to gRPC
                  if (RetryUtils.hasResqTopic(job)) {
                    return false;
                  }
                  // fallthrough to true
                default:
                  // All other response codes should be retried to gRPC.
                  return proceedToRetry(
                      true, retryerScope, dispatcher, rpcUri, group, topic, partition);
              }
            });
  }