private def consumer()

in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [86:120]


  private def consumer() = this.synchronized {
    if (null == kc) {
      kc = RocketMqUtils.mkPullConsumerInstance(groupId, optionParams, "driver")
      val messageQueues = fetchSubscribeMessageQueues(topics)
      val iter = messageQueues.iterator
      while (iter.hasNext){
        val messageQueue = iter.next
        val offset = computePullFromWhere(messageQueue)
        val topicQueueId = new TopicQueueId(messageQueue.getTopic, messageQueue.getQueueId)
        if (!currentOffsets.contains(topicQueueId)) {
          currentOffsets += topicQueueId -> Map(messageQueue.getBrokerName -> offset)
        } else {
          if (!currentOffsets(topicQueueId).contains(messageQueue.getBrokerName)){
            currentOffsets(topicQueueId) += messageQueue.getBrokerName -> offset
          }
        }
      }

      // TODO should not persist here if autoCommit is off
      // timer persist
      this.scheduledExecutorService.scheduleAtFixedRate(
        new Runnable() {
          def run() {
            try {
              kc.getOffsetStore.persistAll(fetchSubscribeMessageQueues(topics))
            } catch {
              case e: Exception => {
                log.error("ScheduledTask persistAllConsumerOffset exception", e)
              }
            }
          }
        }, 1000 * 10, 1000 * 5, TimeUnit.MILLISECONDS)
    }
    kc
  }