private def computePullFromWhere()

in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [132:217]


  private def computePullFromWhere(mq: MessageQueue): Long = {
    var result = -1L
    val offsetStore = kc.getOffsetStore
    val minOffset = kc.minOffset(mq)
    val checkpointOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)

    consumerStrategy match {
      case LatestStrategy => {
        if (checkpointOffset >= 0) {
          //consider the checkpoint offset first
          if (checkpointOffset < minOffset) {
            reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
            result = kc.maxOffset(mq)
          } else {
            result = checkpointOffset
          }
        } else {
          // First start,no offset
          if (mq.getTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            result = 0
          } else {
            result = kc.maxOffset(mq)
          }
        }
      }
      case EarliestStrategy => {
        if (checkpointOffset >= 0) {
          //consider the checkpoint offset first
          if (checkpointOffset < minOffset) {
            reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
            result = minOffset
          } else {
            result = checkpointOffset
          }
        } else {
          // First start,no offset
          result = minOffset
        }
      }
      case SpecificOffsetStrategy(queueToOffset) => {

        val specificOffset = queueToOffset.get(mq)

        if (checkpointOffset >= 0 && !forceSpecial) {
          if (checkpointOffset < minOffset) {
            reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
            result = minOffset
          } else {
            result = checkpointOffset
          }
        } else {
          specificOffset match {
            case Some(ConsumerStrategy.LATEST) => {
              result = kc.maxOffset(mq)
            }
            case Some(ConsumerStrategy.EARLIEST) => {
              result = kc.minOffset(mq)
            }
            case Some(offset) => {
              if (offset < minOffset) {
                reportDataLoss(s"MessageQueue $mq's specific offset $offset is smaller than minOffset $minOffset")
                result = minOffset
              } else {
                result = offset
              }
            }
            case None => {
              if (checkpointOffset >= 0) {
                //consider the checkpoint offset first
                if (checkpointOffset < minOffset) {
                  reportDataLoss(s"MessageQueue $mq's checkpointOffset $checkpointOffset is smaller than minOffset $minOffset")
                  result = minOffset
                } else {
                  result = checkpointOffset
                }
              } else {
                logWarning(s"MessageQueue $mq's specific offset and checkpointOffset are none, then use the minOffset")
                result = kc.minOffset(mq)
              }
            }
          }
        }
      }
    }
    result
  }