in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java [398:420]
public TopicPartitionLag calculateLagTime(TopicPartition tp) {
TopicPartitionLag tpl = getOffsetMonitor().getTopicPartitionOffset(tp);
if (tpl == null || tpl.getLatestOffset() <= 0 || tpl.getCommitOffset() <= 0
|| System.currentTimeMillis() - tpl.getTimeStamp() > _offsetMaxValidTimeMillis) {
return null;
}
long lag = tpl.getLatestOffset() - tpl.getCommitOffset();
if (lag <= _minLagOffset) {
return null;
}
double msgRate = getWorkloadInfoRetriever().topicWorkload(tp.getTopic())
.getMsgsPerSecondPerPartition();
if (msgRate < 1) {
msgRate = 1;
}
double lagTime = lag / msgRate;
if (lagTime > _minLagTimeSec) {
tpl.setLagTime(Math.round(lagTime));
return tpl;
}
return null;
}