in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/dispatchers/kafka/KafkaDispatcher.java [76:125]
public CompletionStage<Void> submit(ItemAndJob<ProducerRecord<K, V>> request) {
final Job job = request.getJob();
final ProducerRecord<K, V> record = request.getItem();
String topic = record.topic();
String cluster = JobUtils.getKafkaProducerCluster(job);
Scope requestScope =
infra
.scope()
.tagged(StructuredTags.builder().setKafkaTopic(topic).setKafkaCluster(cluster).build());
CompletableFuture<Void> future = new CompletableFuture<>();
Stopwatch stopwatch = requestScope.timer(MetricNames.SEND_LATENCY).start();
kafkaProducer.send(
record,
(recordMetadata, e) -> {
stopwatch.stop();
if (e != null) {
if (e instanceof RecordTooLargeException) {
String topicPrefix =
"hp-dbevents-schemaless-jobstore-orders-ENTITY_FARES".toLowerCase();
if (topic.toLowerCase().startsWith(topicPrefix)) {
requestScope.counter(MetricNames.SKIPPED_LARGE_MESSAGE).inc(1);
future.complete(null);
return;
}
}
requestScope
.tagged(ImmutableMap.of(Tags.Key.code, e.getClass().getSimpleName()))
.counter(MetricNames.SEND)
.inc(1);
LOGGER.error(
"dispatcher.kafka.send.failure",
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaCluster(cluster),
e);
future.completeExceptionally(e);
} else {
requestScope
.tagged(ImmutableMap.of(Tags.Key.code, "ok"))
.counter(MetricNames.SEND)
.inc(1);
LOGGER.debug(
"dispatcher.kafka.send.success",
StructuredLogging.kafkaTopic(topic),
StructuredLogging.kafkaCluster(cluster));
future.complete(null);
}
});
return future;
}