in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala [125:182]
override def compute(
thePart: Partition,
context: TaskContext): Iterator[MessageExt] = {
val sourcePartition = thePart.asInstanceOf[RocketMQSourceRDDPartition]
val consumer = if (!reuseRocketMQConsumer) {
CachedRocketMQConsumer.getOrCreate(sourcePartition.offsetRange.messageQueue, executorRocketMQParams)
} else {
CachedRocketMQConsumer.createUncached(sourcePartition.offsetRange.messageQueue, executorRocketMQParams)
}
val range = resolveRange(consumer, sourcePartition.offsetRange)
assert(
range.fromOffset <= range.untilOffset,
s"Beginning offset ${range.fromOffset} is after the ending offset ${range.untilOffset} for " +
s"${range.messageQueue}. You either provided an invalid fromOffset, or the RocketMQ topic has been damaged")
if (range.fromOffset == range.untilOffset) {
logInfo(s"Beginning offset ${range.fromOffset} is the same as ending offset, " +
s"skipping ${range.messageQueue}")
Iterator.empty
} else {
val underlying = new NextIterator[MessageExt]() {
private var requestOffset = range.fromOffset
override def getNext(): MessageExt = {
if (requestOffset >= range.untilOffset) {
// Processed all offsets in this partition.
finished = true
null
} else {
val r = consumer.get(requestOffset, range.untilOffset, pollTimeoutMs, failOnDataLoss)
if (r == null) {
// Losing some data. Skip the rest offsets in this partition.
finished = true
null
} else {
requestOffset = r.getQueueOffset + 1
// The MessageExt structure does not contains any field of `brokerName`, so put one into properties
r.putUserProperty(RocketMQSource.PROP_BROKER_NAME, sourcePartition.offsetRange.messageQueue.getBrokerName)
r
}
}
}
override protected def close(): Unit = {
if (!reuseRocketMQConsumer) {
consumer.close()
} else {
CachedRocketMQConsumer.releaseConsumer(sourcePartition.offsetRange.messageQueue, executorRocketMQParams)
}
}
}
// Release consumer, either by removing it or indicating we're no longer using it
context.addTaskCompletionListener { _ =>
underlying.closeIfNeeded()
}
underlying
}
}