in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/dispatcher/DispatcherImpl.java [103:169]
public CompletableFuture<DispatcherResponse> submit(ItemAndJob<DispatcherMessage> item) {
final Job job = item.getJob();
final DispatcherMessage message = item.getItem();
// Only emit dispatch by type metric b/c GrpcDispatcher and KafkaDispatcher already emit fine
// grained success/failure/latency metrics.
final String rpcUri = job.getRpcDispatcherTask().getUri();
final String group = job.getKafkaConsumerTask().getConsumerGroup();
final String topic = job.getKafkaConsumerTask().getTopic();
final int partition = job.getKafkaConsumerTask().getPartition();
LOGGER.debug(
MetricNames.DISPATCH,
StructuredLogging.dispatcher(message.getType().toString()),
StructuredLogging.rpcRoutingKey(rpcUri),
StructuredLogging.kafkaGroup(group),
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaPartition(partition),
StructuredLogging.destination(message.getDestination()));
infra
.scope()
.tagged(
StructuredTags.builder()
.setMode(message.getType().toString())
.setDestination(rpcUri)
.setKafkaGroup(group)
.setKafkaTopic(topic)
.setKafkaPartition(partition)
.build())
.counter(MetricNames.DISPATCH)
.inc(1);
switch (message.getType()) {
case GRPC:
return grpcDispatcher
.submit(ItemAndJob.of(message.getGrpcMessage(), job))
// wrap KafkaMessageAction from gRPC response in a Dispatcher response
// exceptional completions are treated as in-memory retry by the processor,
// which resends the message to the gRPC endpoint.
.thenApply(DispatcherImpl::dispatcherResponseFromGrpcStatus)
.toCompletableFuture();
case KAFKA:
KafkaDispatcher<byte[], byte[]> kafkaDispatcher;
String destTopic = message.getDestination();
if (RetryUtils.isResqTopic(destTopic, job)) {
if (!resqProducer.isPresent()) {
CompletableFuture<DispatcherResponse> future = new CompletableFuture<>();
future.completeExceptionally(
new IllegalStateException("resilience queue producer is not present"));
return future;
}
kafkaDispatcher = resqProducer.get();
} else {
kafkaDispatcher = dlqProducer;
}
return kafkaDispatcher
.submit(ItemAndJob.of(message.getProducerRecord(), job))
// wrap KafkaMessageAction from gRPC response in a Dispatcher response
// exceptional completions are treated as in-memory retry by the processor,
// which resends the message to the Kafka Producer endpoint.
.thenApply(r -> new DispatcherResponse(DispatcherResponse.Code.COMMIT))
.toCompletableFuture();
default:
CompletableFuture<DispatcherResponse> future = new CompletableFuture<>();
future.completeExceptionally(
new IllegalArgumentException(
String.format("unsupported message type %s", message.getType())));
return future;
}
}