private Fallback getKafkaProducerAsyncFallbackPolicy()

in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [722:874]


  private Fallback<DispatcherResponseAndOffset> getKafkaProducerAsyncFallbackPolicy(
      ItemAndJob<ProcessorMessage> record, Scope processingScope) {
    final Scope dispatchingScope =
        processingScope.tagged(
            ImmutableMap.of(StructuredLogging.DISPATCHER, DispatcherMessage.Type.KAFKA.toString()));
    final ProcessorMessage processorMessage = record.getItem();
    final String dispatcher = DispatcherMessage.Type.KAFKA.toString();
    final Job job = record.getJob();
    final String rpcUri = job.getRpcDispatcherTask().getUri();
    final String group = job.getKafkaConsumerTask().getConsumerGroup();
    final String topic = job.getKafkaConsumerTask().getTopic();
    final int partition = job.getKafkaConsumerTask().getPartition();
    return Fallback.<DispatcherResponseAndOffset>ofStage(
            executionAttemptedEvent -> {
              DispatcherResponse.Code code = null;
              if (executionAttemptedEvent.getLastResult() == null) {
                Throwable lastFailure = executionAttemptedEvent.getLastFailure();
                if (lastFailure != null && lastFailure instanceof CancellationException) {
                  // retry policy canceled
                  code = processorMessage.getStub().cancelCode().get();
                }
              } else {
                code = executionAttemptedEvent.getLastResult().getCode();
              }

              // grpc dispatcher failed, propagate errors up.
              // TODO(T4772337): the fetcher needs to handle exceptions correctly.
              //  currently, the processor will eventually get stuck
              if (code == null) {
                CompletableFuture<DispatcherResponseAndOffset> completableFuture =
                    new CompletableFuture<>();
                completableFuture.completeExceptionally(executionAttemptedEvent.getLastFailure());
                return completableFuture;
              }

              // pick a topic to produce to based on code in DispatcherResponseAndOffset
              String topicToProduce = "";
              switch (code) {
                case DLQ:
                  topicToProduce = job.getRpcDispatcherTask().getDlqTopic();

                  if (!topicToProduce.isEmpty()) {
                    LOGGER.debug(
                        MetricNames.RETRYER_KAFKA_ACCEPT_RESULT,
                        StructuredLogging.dispatcher(dispatcher),
                        StructuredLogging.rpcRoutingKey(rpcUri),
                        StructuredLogging.kafkaGroup(group),
                        StructuredLogging.kafkaTopic(topic),
                        StructuredLogging.action(code.toString()),
                        StructuredLogging.kafkaPartition(partition));
                    break;
                  }
                  dispatchingScope.counter(MetricNames.MISSING_DLQ).inc(1);
                  // fallthrough to use retry topic
                case RETRY:

                  // If there's retry queue and enabled, find the one matches the retry count
                  // If the message has already exhausted all its retry count, then send to the DLQ,
                  // unless it's empty, sending to the last RQ
                  topicToProduce =
                      RetryUtils.getKafkaDestinationRetryTopic(
                          job, processorMessage.getRetryCount());

                  // The topicToProduce could be empty if:
                  //   1. There's a retry queue, in the new tiered retry format
                  //   2. There's no DLQ configured
                  //   3. Messages are returned from the consumer for a retriable error
                  // Since DLQ is the fallback queue of all the retry queues,
                  // getKafkaDestinationRetryTopic above will return DLQ, which is an empty string.
                  // To avoid proceeding with an empty queue, we will fallback to the default case,
                  // which is in-mem retry.
                  if (!topicToProduce.isEmpty()) {
                    LOGGER.debug(
                        MetricNames.RETRYER_KAFKA_ACCEPT_RESULT,
                        StructuredLogging.dispatcher(dispatcher),
                        StructuredLogging.rpcRoutingKey(rpcUri),
                        StructuredLogging.kafkaGroup(group),
                        StructuredLogging.kafkaTopic(topic),
                        StructuredLogging.action(code.toString()),
                        StructuredLogging.kafkaPartition(partition));
                    break;
                  }
                  dispatchingScope.counter(MetricNames.MISSING_DLQ_AND_RETRY_QUEUE).inc(1);
                  // This is not RETRY or STASH so do not send to Kafka producer and propagate
                  // errors
                  // up.
                  return CompletableFuture.completedFuture(executionAttemptedEvent.getLastResult());
                case RESQ:
                  if (RetryUtils.hasResqTopic(job)) {
                    topicToProduce = job.getResqConfig().getResqTopic();
                  }

                  if (!topicToProduce.isEmpty()) {
                    LOGGER.debug(
                        MetricNames.RETRYER_KAFKA_ACCEPT_RESULT,
                        StructuredLogging.dispatcher(dispatcher),
                        StructuredLogging.rpcRoutingKey(rpcUri),
                        StructuredLogging.kafkaGroup(group),
                        StructuredLogging.kafkaTopic(topic),
                        StructuredLogging.action(code.toString()),
                        StructuredLogging.kafkaPartition(partition));
                    break;
                  }
                  dispatchingScope.counter(MetricNames.MISSING_RESQ).inc(1);
                  // fallthrough to true
                default:
                  // There is no resilience queue, so do not send to Kafka producer and propagate
                  // errors up.
                  return CompletableFuture.completedFuture(executionAttemptedEvent.getLastResult());
              }
              // Else, we produce through to the broker.
              final Scope overallKafkaProducerScope =
                  processingScope.tagged(
                      ImmutableMap.of(StructuredLogging.DISPATCHER, MetricNames.OVERALL_KAFKA));
              final Stopwatch overallProduceLatency =
                  overallKafkaProducerScope.timer(MetricNames.DISPATCH_LATENCY).start();
              // Failsafe invokes policies in reverse. The callstack looks like:
              // 1. Kafka Producer
              // 2. Kafka Retry
              return Failsafe.with(getKafkaRetryPolicy(job, processingScope))
                  .with(Scheduler.of(executor))
                  .getStageAsync(getKafkaProducerSupplier(record, topicToProduce, processingScope))
                  .whenComplete(
                      instrumentDispatchCompletion(
                          overallKafkaProducerScope,
                          overallProduceLatency,
                          MetricNames.OVERALL_DISPATCHER,
                          rpcUri,
                          group,
                          topic,
                          partition));
            })
        .handleResultIf(
            result -> {
              // treat null as failure
              if (result == null) {
                return true;
              }
              switch (result.getCode()) {
                case RESQ:
                  // fallthrough
                case RETRY:
                  // fallthrough
                case DLQ:
                  // Treat RETRY and STASH as failures that should produce to KAFKA.
                  return true;
                  // TODO(T4772403): what should the fetcher do if we propagate the result up?
                  //  currently, the processor will eventually get stuck
                default:
                  return false;
              }
            });
  }