in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [263:351]
private CompletionStage<DispatcherResponseAndOffset> doDispatch(
ProcessorMessage processorMessage,
InflightLimiter.Permit permit,
Job finalJob,
Histogram responseDistribution) {
Preconditions.checkNotNull(messageDispatcher, "message dispatcher must not be null");
try (io.opentracing.Scope ignoredScope =
infra.tracer().activateSpan(processorMessage.getSpan())) {
return messageDispatcher
.submit(
ItemAndJob.of(
processorMessage.getGrpcDispatcherMessage(
finalJob.getRpcDispatcherTask().getUri()),
finalJob))
.whenComplete(
(r, e) -> {
// To measure downstream service availability, mark success if
// message
// is handled successfully
// or failed because of non-retryable error
// otherwise, mark permit failed
if (e != null) {
// For any exception such as CancellationException close
// permit with
// unknown result
// to avoid leak of count
permit.complete(InflightLimiter.Result.Unknown);
} else {
responseDistribution.recordValue(r.getCode().ordinal());
switch (r.getCode()) {
case SKIP:
case COMMIT:
case RETRY: // RETRY could be message issue or service issue
case DLQ: // DLQ is message issue
permit.complete(InflightLimiter.Result.Succeed);
break;
case RESQ:
permit.complete();
break;
default:
permit.complete(InflightLimiter.Result.Failed);
}
}
})
.thenApply(r -> handleTimeout(r, processorMessage, finalJob))
.thenApply(
r -> {
switch (r.getCode()) {
case SKIP:
// fallthrough
case COMMIT:
// For SKIP and COMMIT response codes, mark ack in ack
// manager and
// return
// DispatcherResponseAndOffset with COMMIT action.
return new DispatcherResponseAndOffset(
DispatcherResponse.Code.COMMIT, ackManager.ack(processorMessage));
case RETRY:
case RESQ:
// fallthrough
case DLQ:
// For RETRY and STASH actions, mark nack in ack manager.
// If nack returns false, we can skip producing to the retry
// topic or
// dlq topic.
// As for now, we always produce to the retry topic or dlq
// topic, will
// fix this
// in
// another change.
ackManager.nack(processorMessage.getPhysicalMetadata());
// Increase both attempt and retry count.
processorMessage.increaseAttemptCount();
processorMessage.increaseRetryCount();
return new DispatcherResponseAndOffset(r.getCode(), -1);
// fallthrough: ack manager failed to nack, so we
// fallthrough to gRPC
// in-memory retry.
default:
// Increase only attempt count. This will allow the message
// to be
// retried in-memory
ackManager.nack(processorMessage.getPhysicalMetadata());
processorMessage.increaseAttemptCount();
return new DispatcherResponseAndOffset(DispatcherResponse.Code.INVALID, -1);
}
});
}
}