in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/ProcessorImpl.java [1003:1166]
public CompletionStage<Long> submit(ItemAndJob<ConsumerRecord<byte[], byte[]>> request) {
final Job job = request.getJob();
final ConsumerRecord<byte[], byte[]> consumerRecord = request.getItem();
final StructuredArgument[] logTags =
new StructuredArgument[] {
StructuredArguments.keyValue(StructuredFields.URI, addressFromUri),
StructuredArguments.keyValue(
StructuredFields.KAFKA_GROUP, job.getKafkaConsumerTask().getConsumerGroup()),
StructuredArguments.keyValue(
StructuredFields.KAFKA_CLUSTER, job.getKafkaConsumerTask().getCluster()),
StructuredArguments.keyValue(
StructuredFields.KAFKA_TOPIC, job.getKafkaConsumerTask().getTopic()),
StructuredArguments.keyValue(
StructuredFields.KAFKA_PARTITION,
Integer.toString(job.getKafkaConsumerTask().getPartition()))
};
Map<String, String> metricsTags = getMetricsTags(job);
final String[] tags =
metricsTags
.entrySet()
.stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
.toArray(String[]::new);
infra.contextManager().createRootContext();
// TODO (gteo): cleanup / deprecate processing scope
Scope processingScope = infra.scope().tagged(metricsTags);
// use completable future chaining so that any exception thrown by an earlier stage
// will automatically terminate the chain.
final Stopwatch prefetchLatency =
processingScope.timer("processor.prefetch.overall.latency").start();
return Instrumentation.instrument
.withExceptionalCompletion(
LOGGER,
infra.scope(),
// Create a new processor message
// DirectSupplier creates it on this thread.
() ->
DirectSupplier.supply(
() -> {
ProcessorMessage processorMessage =
ProcessorMessage.of(
consumerRecord,
job,
infra,
new MessageStub(logTags, processingScope));
acquireQuota(job, processorMessage);
return processorMessage;
}),
"processor.prefetch.new-message",
tags)
.thenApply(
pm ->
Instrumentation.instrument.withRuntimeException(
LOGGER,
infra.scope(),
// Enqueue this to the ack manager, which acts as a prefetch buffer and
// out of order commit tracker.
() -> {
ackManager.receive(pm);
return pm;
},
"processor.prefetch.ack-manager.receive",
tags))
.thenApply(
pm ->
Instrumentation.instrument.withRuntimeException(
LOGGER,
infra.scope(),
() -> {
if (!messageFilter.shouldProcess(ItemAndJob.of(pm, job))) {
pm.setOffsetToCommit(ackManager.ack(pm));
pm.setShouldDispatch(false);
processingScope
.counter("processor.prefetch.filter.cluster.filtered")
.inc(1);
LOGGER.debug("processor.prefetch.filter.cluster.filtered", logTags);
}
return pm;
},
"processor.prefetch.filter.cluster",
tags))
.thenApplyAsync(
// From here onwards, we enter the dispatch phase, where we send messages via gRPC or to
// DLQ.
// The dispatch phases uses separate executor pool so that we can send multiple messages
// concurrently.
pm -> pm,
executor)
.whenComplete(
(r, t) -> {
// metrics and log overall prefetch stage.
prefetchLatency.stop();
if (t != null) {
processingScope
.tagged(ImmutableMap.of(Tags.Key.code, t.getClass().getSimpleName()))
.counter("processor.prefetch.overall")
.inc(1);
} else {
processingScope
.tagged(ImmutableMap.of(Tags.Key.code, "ok"))
.counter("processor.prefetch.overall")
.inc(1);
}
})
.thenCompose(
pm ->
infra
.contextManager()
.runAsync(infra.contextManager()::createRootContext, executor)
.thenApply(ignoredVoid -> pm))
.thenCompose(
pm ->
supplyAsyncIfDispatchable(
pm,
() ->
Instrumentation.instrument.withExceptionalCompletion(
LOGGER,
infra.scope(),
() -> {
// Forward the messages to the dispatchers
// This handles the actual transmission to callee via gRPC
// and any follow up transmission to Kafka for DLQ.
return Failsafe.with(
getKafkaProducerAsyncFallbackPolicy(
ItemAndJob.of(pm, job), processingScope))
.with(Scheduler.of(executor))
.getStageAsync(
() ->
pm.getStub()
.withRetryFuture(
Failsafe.with(
getGrpcRetryPolicy(job, processingScope))
.with(Scheduler.of(executor))
.getStageAsync(
infra
.contextManager()
.wrap(
dispatchToGrpcOutbound(
ItemAndJob.of(pm, job),
tags)))))
.whenComplete(
(r, t) -> {
try {
pm.close(r, t);
} catch (Exception e) {
LOGGER.error("failed to close processor message", e);
}
})
.thenApply(
r -> {
pm.setOffsetToCommit(r.getOffset());
return pm;
});
},
"processor.dispatch",
tags)))
.thenApply(
// Convert response to commit offsets.
r -> r.getOffsetToCommit());
}