in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [722:874]
private Fallback<DispatcherResponseAndOffset> getKafkaProducerAsyncFallbackPolicy(
ItemAndJob<ProcessorMessage> record, Scope processingScope) {
final Scope dispatchingScope =
processingScope.tagged(
ImmutableMap.of(StructuredLogging.DISPATCHER, DispatcherMessage.Type.KAFKA.toString()));
final ProcessorMessage processorMessage = record.getItem();
final String dispatcher = DispatcherMessage.Type.KAFKA.toString();
final Job job = record.getJob();
final String rpcUri = job.getRpcDispatcherTask().getUri();
final String group = job.getKafkaConsumerTask().getConsumerGroup();
final String topic = job.getKafkaConsumerTask().getTopic();
final int partition = job.getKafkaConsumerTask().getPartition();
return Fallback.<DispatcherResponseAndOffset>ofStage(
executionAttemptedEvent -> {
DispatcherResponse.Code code = null;
if (executionAttemptedEvent.getLastResult() == null) {
Throwable lastFailure = executionAttemptedEvent.getLastFailure();
if (lastFailure != null && lastFailure instanceof CancellationException) {
// retry policy canceled
code = processorMessage.getStub().cancelCode().get();
}
} else {
code = executionAttemptedEvent.getLastResult().getCode();
}
// grpc dispatcher failed, propagate errors up.
// TODO(T4772337): the fetcher needs to handle exceptions correctly.
// currently, the processor will eventually get stuck
if (code == null) {
CompletableFuture<DispatcherResponseAndOffset> completableFuture =
new CompletableFuture<>();
completableFuture.completeExceptionally(executionAttemptedEvent.getLastFailure());
return completableFuture;
}
// pick a topic to produce to based on code in DispatcherResponseAndOffset
String topicToProduce = "";
switch (code) {
case DLQ:
topicToProduce = job.getRpcDispatcherTask().getDlqTopic();
if (!topicToProduce.isEmpty()) {
LOGGER.debug(
MetricNames.RETRYER_KAFKA_ACCEPT_RESULT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.action(code.toString()),
StructuredLogging.kafkaPartition(partition));
break;
}
dispatchingScope.counter(MetricNames.MISSING_DLQ).inc(1);
// fallthrough to use retry topic
case RETRY:
// If there's retry queue and enabled, find the one matches the retry count
// If the message has already exhausted all its retry count, then send to the DLQ,
// unless it's empty, sending to the last RQ
topicToProduce =
RetryUtils.getKafkaDestinationRetryTopic(
job, processorMessage.getRetryCount());
// The topicToProduce could be empty if:
// 1. There's a retry queue, in the new tiered retry format
// 2. There's no DLQ configured
// 3. Messages are returned from the consumer for a retriable error
// Since DLQ is the fallback queue of all the retry queues,
// getKafkaDestinationRetryTopic above will return DLQ, which is an empty string.
// To avoid proceeding with an empty queue, we will fallback to the default case,
// which is in-mem retry.
if (!topicToProduce.isEmpty()) {
LOGGER.debug(
MetricNames.RETRYER_KAFKA_ACCEPT_RESULT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.action(code.toString()),
StructuredLogging.kafkaPartition(partition));
break;
}
dispatchingScope.counter(MetricNames.MISSING_DLQ_AND_RETRY_QUEUE).inc(1);
// This is not RETRY or STASH so do not send to Kafka producer and propagate
// errors
// up.
return CompletableFuture.completedFuture(executionAttemptedEvent.getLastResult());
case RESQ:
if (RetryUtils.hasResqTopic(job)) {
topicToProduce = job.getResqConfig().getResqTopic();
}
if (!topicToProduce.isEmpty()) {
LOGGER.debug(
MetricNames.RETRYER_KAFKA_ACCEPT_RESULT,
StructuredLogging.dispatcher(dispatcher),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.action(code.toString()),
StructuredLogging.kafkaPartition(partition));
break;
}
dispatchingScope.counter(MetricNames.MISSING_RESQ).inc(1);
// fallthrough to true
default:
// There is no resilience queue, so do not send to Kafka producer and propagate
// errors up.
return CompletableFuture.completedFuture(executionAttemptedEvent.getLastResult());
}
// Else, we produce through to the broker.
final Scope overallKafkaProducerScope =
processingScope.tagged(
ImmutableMap.of(StructuredLogging.DISPATCHER, MetricNames.OVERALL_KAFKA));
final Stopwatch overallProduceLatency =
overallKafkaProducerScope.timer(MetricNames.DISPATCH_LATENCY).start();
// Failsafe invokes policies in reverse. The callstack looks like:
// 1. Kafka Producer
// 2. Kafka Retry
return Failsafe.with(getKafkaRetryPolicy(job, processingScope))
.with(Scheduler.of(executor))
.getStageAsync(getKafkaProducerSupplier(record, topicToProduce, processingScope))
.whenComplete(
instrumentDispatchCompletion(
overallKafkaProducerScope,
overallProduceLatency,
MetricNames.OVERALL_DISPATCHER,
rpcUri,
group,
topic,
partition));
})
.handleResultIf(
result -> {
// treat null as failure
if (result == null) {
return true;
}
switch (result.getCode()) {
case RESQ:
// fallthrough
case RETRY:
// fallthrough
case DLQ:
// Treat RETRY and STASH as failures that should produce to KAFKA.
return true;
// TODO(T4772403): what should the fetcher do if we propagate the result up?
// currently, the processor will eventually get stuck
default:
return false;
}
});
}