private Runnable updateOffsetTask()

in uReplicator-Controller/src/main/java/com/uber/stream/kafka/mirrormaker/controller/core/OffsetMonitor.java [268:316]


  private Runnable updateOffsetTask(final String leaderBroker, final TopicAndPartition tp) {
    return new Runnable() {
      @Override
      public void run() {
        if (StringUtils.isEmpty(consumerOffsetPath)) {
          logger.warn("No consumer group id, skip updateOffsetTask");
          return;
        }
        ZkClient zk = zkClientQueue.poll();
        try {
          Object obj = zk.readData(consumerOffsetPath + tp.topic() + "/" + tp.partition(), true);
          long commitOffset = obj == null ? -1 : Long.valueOf(String.valueOf(obj));

          SimpleConsumer consumer = getSimpleConsumer(leaderBroker);
          long latestOffset = getLatestOffset(consumer, tp);
          if (latestOffset < 0) {
            latestOffset = -1;
          }

          TopicPartitionLag previousOffset = topicPartitionToOffsetMap
              .put(tp,
                  new TopicPartitionLag(tp.topic(), tp.partition(), latestOffset, commitOffset));
          logger
              .debug("Get latest offset={} committed offset={} for {}", latestOffset, commitOffset,
                  tp);
          if (latestOffset > 0 && commitOffset > 0) {
            if (latestOffset - commitOffset > 0 && previousOffset != null
                && previousOffset.getCommitOffset() == commitOffset) {
              TopicPartitionLag oldLag = noProgressMap.get(tp);
              // keep the oldest record (the time began to have no progress) in
              // order to measure whether the time larger than the threshold,
              // therefore we do not overwrite the old record if the commit
              // offset is the same as current
              if (oldLag == null || oldLag.getCommitOffset() != commitOffset) {
                noProgressMap.put(tp, previousOffset);
              }
            } else {
              noProgressMap.remove(tp);
            }
          }
        } catch (Exception e) {
          offsetMonitorFailureCount.getAndAdd(1);
          logger.warn("Got exception to get offset for TopicPartition=" + tp, e);
        } finally {
          zkClientQueue.add(zk);
        }
      }
    };
  }