private def fetchData()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/CachedRocketMQConsumer.scala [192:239]


  private def fetchData(
      offset: Long,
      untilOffset: Long,
      pollTimeoutMs: Long,
      failOnDataLoss: Boolean): MessageExt = {
    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext) {
      // This is the first fetch, or the last pre-fetched data has been drained.
      val p = consumer.pull(queue, subExpression, offset, maxBatchSize, pollTimeoutMs)
      if (p.getPullStatus == PullStatus.OFFSET_ILLEGAL){
        throw new OffsetIllegalException(AvailableOffsetRange(p.getMinOffset, p.getMaxOffset))
      } else if (p.getPullStatus == PullStatus.NO_MATCHED_MSG || p.getPullStatus == PullStatus.NO_NEW_MSG) {
        throw new IllegalStateException(s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds. " +
            s"status = ${p.getPullStatus.toString}")
      }
      fetchedData = p.getMsgFoundList.iterator
      assert(fetchedData.hasNext)
    }

    val record = fetchedData.next()
    assert(record.getQueueOffset == offset,
      s"Got wrong record for $groupId ${queue.toString} even after seeking to offset $offset")
    nextOffsetInFetchedData = record.getQueueOffset + 1
    // In general, RocketMQ uses the specified offset as the start point, and tries to fetch the next
    // available offset. Hence we need to handle offset mismatch.
    if (record.getQueueOffset > offset) {
      // This may happen when some records aged out but their offsets already got verified
      if (failOnDataLoss) {
        reportDataLoss(true, s"Cannot fetch records in [$offset, ${record.getQueueOffset})")
        // Never happen as "reportDataLoss" will throw an exception
        null
      } else {
        if (record.getQueueOffset >= untilOffset) {
          reportDataLoss(false, s"Skip missing records in [$offset, $untilOffset)")
          null
        } else {
          reportDataLoss(false, s"Skip missing records in [$offset, ${record.getQueueOffset})")
          record
        }
      }
    } else if (record.getQueueOffset < offset) {
      // This should not happen. If it does happen, then we probably misunderstand RocketMQ internal
      // mechanism.
      throw new IllegalStateException(
        s"Tried to fetch $offset but the returned record offset was ${record.getQueueOffset}")
    } else {
      record
    }
  }