in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [458:474]
public List<TopicAndPartition> getNoProgessTopicPartitions() {
List<TopicAndPartition> tps = new ArrayList<>();
for (Map.Entry<TopicAndPartition, TopicPartitionLag> entry : noProgressMap.entrySet()) {
TopicPartitionLag currentLag = topicPartitionToOffsetMap.get(entry.getKey());
if (currentLag == null || currentLag.getCommitOffset() <= 0
|| currentLag.getLatestOffset() <= 0
|| currentLag.getLatestOffset() <= currentLag.getCommitOffset()) {
continue;
}
TopicPartitionLag lastLag = entry.getValue();
if (currentLag.getTimeStamp() - lastLag.getTimeStamp() > MIN_NO_PROGRESS_TIME_MS
&& currentLag.getCommitOffset() == lastLag.getCommitOffset()) {
tps.add(entry.getKey());
}
}
return tps;
}