public TopicPartitionLag calculateLagTime()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/HelixMirrorMakerManager.java [398:420]


  public TopicPartitionLag calculateLagTime(TopicPartition tp) {
    TopicPartitionLag tpl = getOffsetMonitor().getTopicPartitionOffset(tp);
    if (tpl == null || tpl.getLatestOffset() <= 0 || tpl.getCommitOffset() <= 0
        || System.currentTimeMillis() - tpl.getTimeStamp() > _offsetMaxValidTimeMillis) {
      return null;
    }
    long lag = tpl.getLatestOffset() - tpl.getCommitOffset();
    if (lag <= _minLagOffset) {
      return null;
    }

    double msgRate = getWorkloadInfoRetriever().topicWorkload(tp.getTopic())
        .getMsgsPerSecondPerPartition();
    if (msgRate < 1) {
      msgRate = 1;
    }
    double lagTime = lag / msgRate;
    if (lagTime > _minLagTimeSec) {
      tpl.setLagTime(Math.round(lagTime));
      return tpl;
    }
    return null;
  }