protected def commitAll()

in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [457:477]


  protected def commitAll(): Unit = {
    val m = new ju.HashMap[MessageQueue, jl.Long]
    var osr = commitQueue.poll()
    try {
      while (null != osr) {
        //Exclusive ending offset
        val mq = new MessageQueue(osr.topic, osr.brokerName, osr.queueId)
        kc.commitConsumeOffset(mq, osr.untilOffset - 1)
        m.put(mq, osr.untilOffset - 1)
        osr = commitQueue.poll()
      }
      if (commitCallback.get != null) {
        commitCallback.get.onComplete(m, null)
      }
    } catch {
      case e: Exception => {
        if (commitCallback.get != null)
          commitCallback.get.onComplete(m, e)
      }
    }
  }