public void start()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [117:170]


  public void start() {
    if (refreshIntervalInSec > 0) {
      // delay for 1-5 minutes
      int delaySec = 60 + new Random().nextInt(240);
      logger.info("OffsetMonitor starts updating offsets every {} seconds with delay {} seconds",
          refreshIntervalInSec,
          delaySec);

      Meter offsetMonitorMeter = new Meter();
      KafkaUReplicatorMetricsReporter.get()
          .registerMetric("offsetMonitor.executed", offsetMonitorMeter);

      refreshExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
          logger.info("TopicList starts updating");
          if (zkClientQueue == null) {

            zkClientQueue = new LinkedBlockingQueue<>(numOffsetThread);
            for (int i = 0; i < numOffsetThread; i++) {
              ZkClient zkClient = new ZkClient(offsetZkString, 30000, 30000,
                  ZKStringSerializer$.MODULE$);
              zkClientQueue.add(zkClient);
            }

            ZkClient zkClient = new ZkClient(srcZkString, 30000, 30000,
                ZKStringSerializer$.MODULE$);
            List<String> brokerIdList = zkClient.getChildren("/brokers/ids");

            for (String id : brokerIdList) {
              try {
                JSONObject json = JSONObject
                    .parseObject(zkClient.readData("/brokers/ids/" + id).toString());
                srcBrokerList
                    .add(String.valueOf(json.get("host")) + ":" + String.valueOf(json.get("port")));
              } catch (Exception e) {
                logger.warn("Failed to get broker", e);
              }
            }
            logger.info("OffsetMonitor starts with brokerList=" + srcBrokerList);
          }
          updateTopicList();
          updateOffset();
          updateOffsetMetrics();
          offsetMonitorMeter.mark();
          lastSucceedOffsetCheck = new Date().getTime();
        }
      }, delaySec, refreshIntervalInSec, TimeUnit.SECONDS);
      registerNoProgressMetric();
      registerUpdateOffsetStatusMetric();
    } else {
      logger.info("OffsetMonitor is disabled");
    }
  }