in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/CachedRocketMQConsumer.scala [88:126]
def get(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean):
MessageExt = {
require(offset < untilOffset,
s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
logDebug(s"Get $groupId $queue requested $offset")
// The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
// `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
// we will move to the next available offset within `[offset, untilOffset)` and retry.
// If `failOnDataLoss` is `true`, the loop body will be executed only once.
var toFetchOffset = offset
var consumerRecord: MessageExt = null
// We want to break out of the while loop on a successful fetch to avoid using "return"
// which may causes a NonLocalReturnControl exception when this method is used as a function.
var isFetchComplete = false
while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
try {
consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
isFetchComplete = true
} catch {
case e: OffsetIllegalException =>
// When there is some error thrown, reset all states
resetFetchedData()
reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset: ${e.toString}", e)
toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset, e.availableOffsetRange)
}
}
if (isFetchComplete) {
consumerRecord
} else {
resetFetchedData()
null
}
}