in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [551:602]
private void onCommitCompletion(
Map<TopicPartition, OffsetAndMetadata> offsets,
Optional<Exception> exception,
Map<TopicPartition, Job> allTopicPartitionJobMap,
String consumerGroup) {
if (!exception.isPresent()) {
LOGGER.debug("committed offsets", StructuredLogging.count(offsets.size()));
offsets.forEach(
(tp, offsetMeta) -> {
Job job = allTopicPartitionJobMap.get(tp);
if (job == null) {
return;
}
checkpointManager.setCommittedOffset(job, offsetMeta.offset());
LOGGER.debug(
"successfully committed offset",
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaTopic(tp.topic()),
StructuredLogging.kafkaPartition(tp.partition()),
StructuredLogging.kafkaOffset(offsetMeta.offset()));
final Scope scopeWithGroupTopicPartition =
scope.tagged(
StructuredTags.builder()
.setKafkaGroup(consumerGroup)
.setKafkaTopic(tp.topic())
.setKafkaPartition(tp.partition())
.build());
scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_SUCCESS).inc(1);
scopeWithGroupTopicPartition.gauge(MetricNames.OFFSET).update(offsetMeta.offset());
});
} else {
offsets.forEach(
(tp, offsetMeta) -> {
LOGGER.error(
"failed to commit offset",
StructuredLogging.kafkaGroup(consumerGroup),
StructuredLogging.kafkaTopic(tp.topic()),
StructuredLogging.kafkaPartition(tp.partition()),
StructuredLogging.kafkaOffset(offsetMeta.offset()),
exception.get());
final Scope scopeWithGroupTopicPartition =
scope.tagged(
StructuredTags.builder()
.setKafkaGroup(consumerGroup)
.setKafkaTopic(tp.topic())
.setKafkaPartition(tp.partition())
.build());
scopeWithGroupTopicPartition.counter(MetricNames.OFFSET_COMMIT_FAILURE).inc(1);
});
}
}