override def compute()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala [125:182]


  override def compute(
      thePart: Partition,
      context: TaskContext): Iterator[MessageExt] = {
    val sourcePartition = thePart.asInstanceOf[RocketMQSourceRDDPartition]
    val consumer = if (!reuseRocketMQConsumer) {
      CachedRocketMQConsumer.getOrCreate(sourcePartition.offsetRange.messageQueue, executorRocketMQParams)
    } else {
      CachedRocketMQConsumer.createUncached(sourcePartition.offsetRange.messageQueue, executorRocketMQParams)
    }

    val range = resolveRange(consumer, sourcePartition.offsetRange)
    assert(
      range.fromOffset <= range.untilOffset,
      s"Beginning offset ${range.fromOffset} is after the ending offset ${range.untilOffset} for " +
          s"${range.messageQueue}. You either provided an invalid fromOffset, or the RocketMQ topic has been damaged")
    if (range.fromOffset == range.untilOffset) {
      logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset, " +
          s"skipping ${range.messageQueue}")
      Iterator.empty
    } else {
      val underlying = new NextIterator[MessageExt]() {
        private var requestOffset = range.fromOffset

        override def getNext(): MessageExt = {
          if (requestOffset >= range.untilOffset) {
            // Processed all offsets in this partition.
            finished = true
            null
          } else {
            val r = consumer.get(requestOffset, range.untilOffset, pollTimeoutMs, failOnDataLoss)
            if (r == null) {
              // Losing some data. Skip the rest offsets in this partition.
              finished = true
              null
            } else {
              requestOffset = r.getQueueOffset + 1
              // The MessageExt structure does not contains any field of `brokerName`, so put one into properties
              r.putUserProperty(RocketMQSource.PROP_BROKER_NAME, sourcePartition.offsetRange.messageQueue.getBrokerName)
              r
            }
          }
        }

        override protected def close(): Unit = {
          if (!reuseRocketMQConsumer) {
            consumer.close()
          } else {
            CachedRocketMQConsumer.releaseConsumer(sourcePartition.offsetRange.messageQueue, executorRocketMQParams)
          }
        }
      }
      // Release consumer, either by removing it or indicating we're no longer using it
      context.addTaskCompletionListener { _ =>
        underlying.closeIfNeeded()
      }
      underlying
    }
  }