in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [319:356]
protected def latestOffsets(): Map[TopicQueueId, Map[String, Long]] = {
val c = consumer
val messageQueues = fetchSubscribeMessageQueues(topics)
var maxOffsets = Map[TopicQueueId, Map[String, Long]]()
val lastTopicQueues = currentOffsets.keySet
val fetchTopicQueues = mutable.Set[TopicQueueId]()
val iter = messageQueues.iterator
while (iter.hasNext) {
val messageQueue = iter.next
logDebug(s"${messageQueue.toString} min: ${c.minOffset(messageQueue)} max: ${c.maxOffset(messageQueue)}")
val topicQueueId = new TopicQueueId(messageQueue.getTopic, messageQueue.getQueueId)
fetchTopicQueues.add(topicQueueId)
if (!currentOffsets.contains(topicQueueId)){
currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> firstConsumerOffset(messageQueue))
}else{
if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName))
currentOffsets(topicQueueId) += messageQueue.getBrokerName -> firstConsumerOffset(messageQueue)
}
if (!maxOffsets.contains(topicQueueId)) {
maxOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> c.maxOffset(messageQueue))
}else{
if (!maxOffsets(topicQueueId).contains(messageQueue.getBrokerName)) {
val tempMap = maxOffsets(topicQueueId) + (messageQueue.getBrokerName -> c.maxOffset(messageQueue))
maxOffsets += topicQueueId -> tempMap
}
}
}
val deletedPartitions = lastTopicQueues.diff(fetchTopicQueues)
if (deletedPartitions.size > 0){
reportDataLoss(
s"Cannot find offsets of ${deletedPartitions}. Some data may have been missed")
}
maxOffsets
}