override def compute()

in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [371:425]


  override def compute(validTime: Time): Option[RocketMqRDD] = {

    val untilOffsets = clamp(latestOffsets())

    val offsetRangeRdd: ju.Map[TopicQueueId, Array[OffsetRange]] = new ju.HashMap()

    untilOffsets.foreach { case (tp, uo) =>
      val values = uo.map { case (name, until) =>
        val fo = currentOffsets(tp)(name)
        OffsetRange(tp.topic, tp.queueId, name, fo, until)
      }.filter(item => {
        item.count() > 0
      }).toArray

      if (values != null && values.length > 0) {
        offsetRangeRdd.put(tp, values)
      }
    }

    val rdd = new RocketMqRDD(
      context.sparkContext, groupId, optionParams, offsetRangeRdd, getPreferredHosts, true)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRangeRdd.asScala.flatMap { case (tp, arrayRange) =>
      // Don't display empty ranges.
      arrayRange
    }.filter { offsetRange =>
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tqueueId: ${offsetRange.queueId}\t" +
        s"brokerName: ${offsetRange.brokerName}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRangeRdd,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = collection.mutable.Map() ++ untilOffsets

    if (autoCommit) {
      currentOffsets.foreach { case (tp, uo) =>
        uo.map { case (name, until) =>
          val offset = currentOffsets(tp)(name) - 1
          val mq = new MessageQueue(tp.topic, name, tp.queueId)
          kc.commitConsumeOffset(mq, offset)
        }
      }
    } else {
      commitAll()
    }
    Some(rdd)
  }