in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [268:316]
private Runnable updateOffsetTask(final String leaderBroker, final TopicAndPartition tp) {
return new Runnable() {
@Override
public void run() {
if (StringUtils.isEmpty(consumerOffsetPath)) {
logger.warn("No consumer group id, skip updateOffsetTask");
return;
}
ZkClient zk = zkClientQueue.poll();
try {
Object obj = zk.readData(consumerOffsetPath + tp.topic() + "/" + tp.partition(), true);
long commitOffset = obj == null ? -1 : Long.valueOf(String.valueOf(obj));
SimpleConsumer consumer = getSimpleConsumer(leaderBroker);
long latestOffset = getLatestOffset(consumer, tp);
if (latestOffset < 0) {
latestOffset = -1;
}
TopicPartitionLag previousOffset = topicPartitionToOffsetMap
.put(tp,
new TopicPartitionLag(tp.topic(), tp.partition(), latestOffset, commitOffset));
logger
.debug("Get latest offset={} committed offset={} for {}", latestOffset, commitOffset,
tp);
if (latestOffset > 0 && commitOffset > 0) {
if (latestOffset - commitOffset > 0 && previousOffset != null
&& previousOffset.getCommitOffset() == commitOffset) {
TopicPartitionLag oldLag = noProgressMap.get(tp);
// keep the oldest record (the time began to have no progress) in
// order to measure whether the time larger than the threshold,
// therefore we do not overwrite the old record if the commit
// offset is the same as current
if (oldLag == null || oldLag.getCommitOffset() != commitOffset) {
noProgressMap.put(tp, previousOffset);
}
} else {
noProgressMap.remove(tp);
}
}
} catch (Exception e) {
offsetMonitorFailureCount.getAndAdd(1);
logger.warn("Got exception to get offset for TopicPartition=" + tp, e);
} finally {
zkClientQueue.add(zk);
}
}
};
}