in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [117:170]
public void start() {
if (refreshIntervalInSec > 0) {
// delay for 1-5 minutes
int delaySec = 60 + new Random().nextInt(240);
logger.info("OffsetMonitor starts updating offsets every {} seconds with delay {} seconds",
refreshIntervalInSec,
delaySec);
Meter offsetMonitorMeter = new Meter();
KafkaUReplicatorMetricsReporter.get()
.registerMetric("offsetMonitor.executed", offsetMonitorMeter);
refreshExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.info("TopicList starts updating");
if (zkClientQueue == null) {
zkClientQueue = new LinkedBlockingQueue<>(numOffsetThread);
for (int i = 0; i < numOffsetThread; i++) {
ZkClient zkClient = new ZkClient(offsetZkString, 30000, 30000,
ZKStringSerializer$.MODULE$);
zkClientQueue.add(zkClient);
}
ZkClient zkClient = new ZkClient(srcZkString, 30000, 30000,
ZKStringSerializer$.MODULE$);
List<String> brokerIdList = zkClient.getChildren("/brokers/ids");
for (String id : brokerIdList) {
try {
JSONObject json = JSONObject
.parseObject(zkClient.readData("/brokers/ids/" + id).toString());
srcBrokerList
.add(String.valueOf(json.get("host")) + ":" + String.valueOf(json.get("port")));
} catch (Exception e) {
logger.warn("Failed to get broker", e);
}
}
logger.info("OffsetMonitor starts with brokerList=" + srcBrokerList);
}
updateTopicList();
updateOffset();
updateOffsetMetrics();
offsetMonitorMeter.mark();
lastSucceedOffsetCheck = new Date().getTime();
}
}, delaySec, refreshIntervalInSec, TimeUnit.SECONDS);
registerNoProgressMetric();
registerUpdateOffsetStatusMetric();
} else {
logger.info("OffsetMonitor is disabled");
}
}