in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [414:453]
void commitOffsets(Map<TopicPartition, Job> allTopicPartitionJobMap) {
Preconditions.checkNotNull(pipelineStateManager, "pipeline config manager required");
if (allTopicPartitionJobMap.isEmpty()) {
return;
}
long currentTimestampMs = System.currentTimeMillis();
// we don't need to make lastCommitOffsetsCheckTimeMs thread-safe, as this number does not need
// to be
// accurate
if (currentTimestampMs - lastCommitOffsetsCheckTimeMs > config.getOffsetCommitIntervalMs()) {
lastCommitOffsetsCheckTimeMs = currentTimestampMs;
try {
Map<TopicPartition, OffsetAndMetadata> tpCommitInfoMap = new HashMap<>();
allTopicPartitionJobMap.forEach(
(tp, job) -> {
long offsetToCommit = checkpointManager.getOffsetToCommit(job);
long committedOffset = checkpointManager.getCommittedOffset(job);
if (eligibleToCommit(offsetToCommit, committedOffset)) {
tpCommitInfoMap.put(tp, new OffsetAndMetadata(offsetToCommit));
}
});
if (tpCommitInfoMap.isEmpty()) {
return;
}
String consumerGroup =
pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getConsumerGroup();
String topic = pipelineStateManager.getJobTemplate().getKafkaConsumerTask().getTopic();
if (asyncCommitOffset) {
commitAsync(tpCommitInfoMap, allTopicPartitionJobMap, consumerGroup, topic);
} else {
commitSync(tpCommitInfoMap, allTopicPartitionJobMap, consumerGroup, topic);
}
lastCommitTimestampMs = currentTimestampMs;
} catch (Throwable throwable) {
// we catch the error and continue the work
LOGGER.error("failed to commit offsets to kafka servers", throwable);
scope.counter(MetricNames.OFFSET_COMMIT_EXCEPTION).inc(1);
}
}
}