in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [457:477]
protected def commitAll(): Unit = {
val m = new ju.HashMap[MessageQueue, jl.Long]
var osr = commitQueue.poll()
try {
while (null != osr) {
//Exclusive ending offset
val mq = new MessageQueue(osr.topic, osr.brokerName, osr.queueId)
kc.commitConsumeOffset(mq, osr.untilOffset - 1)
m.put(mq, osr.untilOffset - 1)
osr = commitQueue.poll()
}
if (commitCallback.get != null) {
commitCallback.get.onComplete(m, null)
}
} catch {
case e: Exception => {
if (commitCallback.get != null)
commitCallback.get.onComplete(m, e)
}
}
}