def get()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/CachedRocketMQConsumer.scala [88:126]


  def get(
      offset: Long,
      untilOffset: Long,
      pollTimeoutMs: Long,
      failOnDataLoss: Boolean):
    MessageExt = {
    require(offset < untilOffset,
      s"offset must always be less than untilOffset [offset: $offset, untilOffset: $untilOffset]")
    logDebug(s"Get $groupId $queue requested $offset")
    // The following loop is basically for `failOnDataLoss = false`. When `failOnDataLoss` is
    // `false`, first, we will try to fetch the record at `offset`. If no such record exists, then
    // we will move to the next available offset within `[offset, untilOffset)` and retry.
    // If `failOnDataLoss` is `true`, the loop body will be executed only once.
    var toFetchOffset = offset
    var consumerRecord: MessageExt = null
    // We want to break out of the while loop on a successful fetch to avoid using "return"
    // which may causes a NonLocalReturnControl exception when this method is used as a function.
    var isFetchComplete = false

    while (toFetchOffset != UNKNOWN_OFFSET && !isFetchComplete) {
      try {
        consumerRecord = fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss)
        isFetchComplete = true
      } catch {
        case e: OffsetIllegalException =>
          // When there is some error thrown, reset all states
          resetFetchedData()
          reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset: ${e.toString}", e)
          toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset, e.availableOffsetRange)
      }
    }

    if (isFetchComplete) {
      consumerRecord
    } else {
      resetFetchedData()
      null
    }
  }