in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [407:455]
protected DispatcherResponse handleTimeout(
DispatcherResponse dispatcherResponse, ProcessorMessage processorMessage, Job finalJob) {
TopicPartitionOffset topicPartitionOffset = processorMessage.getPhysicalMetadata();
String topic = topicPartitionOffset.getTopic();
DispatcherResponse result = dispatcherResponse;
switch (dispatcherResponse.getCode()) {
case SKIP:
case COMMIT:
// credit token to DLQ limiter
dlqDispatchManager.credit(
new TopicPartition(
topicPartitionOffset.getTopic(), topicPartitionOffset.getPartition()),
1);
break;
case BACKOFF:
if (RetryUtils.isDLQTopic(topic, finalJob)) {
// Messages in DLQ should not enter other queues
result = new DispatcherResponse(DispatcherResponse.Code.DLQ);
} else if (RetryUtils.isResqTopic(topic, finalJob)) {
// messages in resilience queue should do in-memory retry to avoid leak to other queues
result = new DispatcherResponse(DispatcherResponse.Code.INVALID);
} else {
// when maxRpcTimeouts is configured and timeout exceeds
// limit
// 1. acquire token from dlqDispatchManager
// 2. convert BACKOFF to DLQ if token fetched
int maxRpcTimeouts = finalJob.getRpcDispatcherTask().getMaxRpcTimeouts();
if (maxRpcTimeouts > 0
&& processorMessage.getTimeoutCount() >= maxRpcTimeouts
&& dlqDispatchManager.tryAcquire(
new TopicPartition(
topicPartitionOffset.getTopic(), topicPartitionOffset.getPartition()),
1)) {
result = new DispatcherResponseAndOffset(DispatcherResponse.Code.DLQ);
} else {
// timeout as a case of failure in transit should be handled with in-memory retry and
// Blocking detection/mitigation.
// As current implementation already handle timeout with retry queue, change to
// in-memory retry may cause blocking,
// so temporary keep the behavior unchanged.
result = new DispatcherResponseAndOffset(DispatcherResponse.Code.RETRY);
}
}
// increase timeout count
processorMessage.increaseTimeoutCount();
break;
}
return result;
}