in uforwarder-core/src/main/java/com/uber/data/kafka/datatransfer/worker/fetchers/kafka/AbstractKafkaFetcherThread.java [387:397]
private boolean eligibleToCommit(long offsetToCommit, long committedOffset) {
if (commitOnIdleFetcher
&& offsetToCommit == committedOffset
&& (System.currentTimeMillis() - lastCommitTimestampMs > ACTIVE_COMMIT_INTERVAL_IN_MS)) {
return true;
} else if (offsetToCommit != committedOffset
&& offsetToCommit > KafkaUtils.MAX_INVALID_OFFSET_TO_COMMIT) {
return true;
}
return false;
}