in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/CachedRocketMQConsumer.scala [192:239]
private def fetchData(
offset: Long,
untilOffset: Long,
pollTimeoutMs: Long,
failOnDataLoss: Boolean): MessageExt = {
if (offset != nextOffsetInFetchedData || !fetchedData.hasNext) {
// This is the first fetch, or the last pre-fetched data has been drained.
val p = consumer.pull(queue, subExpression, offset, maxBatchSize, pollTimeoutMs)
if (p.getPullStatus == PullStatus.OFFSET_ILLEGAL){
throw new OffsetIllegalException(AvailableOffsetRange(p.getMinOffset, p.getMaxOffset))
} else if (p.getPullStatus == PullStatus.NO_MATCHED_MSG || p.getPullStatus == PullStatus.NO_NEW_MSG) {
throw new IllegalStateException(s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds. " +
s"status = ${p.getPullStatus.toString}")
}
fetchedData = p.getMsgFoundList.iterator
assert(fetchedData.hasNext)
}
val record = fetchedData.next()
assert(record.getQueueOffset == offset,
s"Got wrong record for $groupId ${queue.toString} even after seeking to offset $offset")
nextOffsetInFetchedData = record.getQueueOffset + 1
// In general, RocketMQ uses the specified offset as the start point, and tries to fetch the next
// available offset. Hence we need to handle offset mismatch.
if (record.getQueueOffset > offset) {
// This may happen when some records aged out but their offsets already got verified
if (failOnDataLoss) {
reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.getQueueOffset})")
// Never happen as "reportDataLoss" will throw an exception
null
} else {
if (record.getQueueOffset >= untilOffset) {
reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
null
} else {
reportDataLoss(false, s"Skip missing records in [$offset, ${record.getQueueOffset})")
record
}
}
} else if (record.getQueueOffset < offset) {
// This should not happen. If it does happen, then we probably misunderstand RocketMQ internal
// mechanism.
throw new IllegalStateException(
s"Tried to fetch $offset but the returned record offset was ${record.getQueueOffset}")
} else {
record
}
}