in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [323:398]
private CompletionStage<DispatcherResponseAndOffset> doDispatch(
ProcessorMessage processorMessage,
InflightLimiter.Permit permit,
Job finalJob,
Histogram responseDistribution) {
Preconditions.checkNotNull(messageDispatcher, "message dispatcher must not be null");
try (io.opentracing.Scope ignoredScope =
infra.tracer().activateSpan(processorMessage.getSpan())) {
return messageDispatcher
.submit(
ItemAndJob.of(
processorMessage.getGrpcDispatcherMessage(
finalJob.getRpcDispatcherTask().getUri()),
finalJob))
.whenComplete(
(r, e) -> {
Scope subScope = infra.scope().tagged(getMetricsTags(finalJob));
if (r != null) {
responseDistribution.recordValue(r.getCode().ordinal());
if (r.getCode() == DispatcherResponse.Code.COMMIT) {
// measure message end-to-end latency
subScope
.histogram(MetricNames.MESSAGE_END_TO_END_LATENCY, E2E_DURATION_BUCKETS)
.recordDuration(
Duration.ofMillis(
System.currentTimeMillis()
- processorMessage.getLogicalTimestamp()));
}
}
})
.whenComplete((r, e) -> handlePermit(r, e, permit))
.thenApply(r -> handleTimeout(r, processorMessage, finalJob))
.thenApply(
r -> {
switch (r.getCode()) {
case SKIP:
// fallthrough
case COMMIT:
// For SKIP and COMMIT response codes, mark ack in ack
// manager and
// return
// DispatcherResponseAndOffset with COMMIT action.
return new DispatcherResponseAndOffset(
DispatcherResponse.Code.COMMIT, ackManager.ack(processorMessage));
case RETRY:
case RESQ:
// fallthrough
case DLQ:
// For RETRY and STASH actions, mark nack in ack manager.
// If nack returns false, we can skip producing to the retry
// topic or
// dlq topic.
// As for now, we always produce to the retry topic or dlq
// topic, will
// fix this
// in
// another change.
ackManager.nack(processorMessage.getPhysicalMetadata());
// Increase both attempt and retry count.
processorMessage.increaseAttemptCount();
processorMessage.increaseRetryCount();
return new DispatcherResponseAndOffset(r.getCode(), -1);
// fallthrough: ack manager failed to nack, so we
// fallthrough to gRPC
// in-memory retry.
default:
// Increase only attempt count. This will allow the message
// to be
// retried in-memory
ackManager.nack(processorMessage.getPhysicalMetadata());
processorMessage.increaseAttemptCount();
return new DispatcherResponseAndOffset(DispatcherResponse.Code.INVALID, -1);
}
});
}
}