protected def latestOffsets()

in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [319:356]


  protected def latestOffsets(): Map[TopicQueueId, Map[String, Long]] = {
    val c = consumer

    val messageQueues = fetchSubscribeMessageQueues(topics)

    var maxOffsets = Map[TopicQueueId, Map[String, Long]]()

    val lastTopicQueues = currentOffsets.keySet
    val fetchTopicQueues = mutable.Set[TopicQueueId]()
    val iter = messageQueues.iterator
    while (iter.hasNext) {
      val messageQueue = iter.next
      logDebug(s"${messageQueue.toString} min: ${c.minOffset(messageQueue)}  max: ${c.maxOffset(messageQueue)}")
      val topicQueueId = new TopicQueueId(messageQueue.getTopic, messageQueue.getQueueId)
      fetchTopicQueues.add(topicQueueId)
      if (!currentOffsets.contains(topicQueueId)){
        currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> firstConsumerOffset(messageQueue))
      }else{
        if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName))
          currentOffsets(topicQueueId) += messageQueue.getBrokerName -> firstConsumerOffset(messageQueue)
      }
      if (!maxOffsets.contains(topicQueueId)) {
        maxOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> c.maxOffset(messageQueue))
      }else{
        if (!maxOffsets(topicQueueId).contains(messageQueue.getBrokerName)) {
          val tempMap = maxOffsets(topicQueueId) + (messageQueue.getBrokerName -> c.maxOffset(messageQueue))
          maxOffsets += topicQueueId -> tempMap
        }
      }
    }

    val deletedPartitions = lastTopicQueues.diff(fetchTopicQueues)
    if (deletedPartitions.size > 0){
      reportDataLoss(
        s"Cannot find offsets of ${deletedPartitions}. Some data may have been missed")
    }
    maxOffsets
  }