in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [371:425]
override def compute(validTime: Time): Option[RocketMqRDD] = {
val untilOffsets = clamp(latestOffsets())
val offsetRangeRdd: ju.Map[TopicQueueId, Array[OffsetRange]] = new ju.HashMap()
untilOffsets.foreach { case (tp, uo) =>
val values = uo.map { case (name, until) =>
val fo = currentOffsets(tp)(name)
OffsetRange(tp.topic, tp.queueId, name, fo, until)
}.filter(item => {
item.count() > 0
}).toArray
if (values != null && values.length > 0) {
offsetRangeRdd.put(tp, values)
}
}
val rdd = new RocketMqRDD(
context.sparkContext, groupId, optionParams, offsetRangeRdd, getPreferredHosts, true)
// Report the record number and metadata of this batch interval to InputInfoTracker.
val description = offsetRangeRdd.asScala.flatMap { case (tp, arrayRange) =>
// Don't display empty ranges.
arrayRange
}.filter { offsetRange =>
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tqueueId: ${offsetRange.queueId}\t" +
s"brokerName: ${offsetRange.brokerName}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")
// Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(
"offsets" -> offsetRangeRdd,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
currentOffsets = collection.mutable.Map() ++ untilOffsets
if (autoCommit) {
currentOffsets.foreach { case (tp, uo) =>
uo.map { case (name, until) =>
val offset = currentOffsets(tp)(name) - 1
val mq = new MessageQueue(tp.topic, name, tp.queueId)
kc.commitConsumeOffset(mq, offset)
}
}
} else {
commitAll()
}
Some(rdd)
}