in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [86:120]
private def consumer() = this.synchronized {
if (null == kc) {
kc = RocketMqUtils.mkPullConsumerInstance(groupId, optionParams, "driver")
val messageQueues = fetchSubscribeMessageQueues(topics)
val iter = messageQueues.iterator
while (iter.hasNext){
val messageQueue = iter.next
val offset = computePullFromWhere(messageQueue)
val topicQueueId = new TopicQueueId(messageQueue.getTopic, messageQueue.getQueueId)
if (!currentOffsets.contains(topicQueueId)) {
currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> offset)
} else {
if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName)){
currentOffsets(topicQueueId) += messageQueue.getBrokerName -> offset
}
}
}
// TODO should not persist here if autoCommit is off
// timer persist
this.scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
def run() {
try {
kc.getOffsetStore.persistAll(fetchSubscribeMessageQueues(topics))
} catch {
case e: Exception => {
log.error("ScheduledTask persistAllConsumerOffset exception", e)
}
}
}
}, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS)
}
kc
}