def fetchSpecificOffsets()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQOffsetReader.scala [93:129]


  def fetchSpecificOffsets(
      partitionOffsets: Map[MessageQueue, Long],
      reportDataLoss: String => Unit): RocketMQSourceOffset = {
    val fetched = {
      withRetries {
        val partitions = consumer.fetchSubscribeMessageQueues(topic)
        assert(partitions.asScala == partitionOffsets.keySet,
          "If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
            "Use -1 for latest, -2 for earliest, if you don't care.\n" +
            s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
        logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")

        partitionOffsets.foreach {
          case (mq, RocketMQOffsetRangeLimit.LATEST) =>
            consumer.updateConsumeOffset(mq, consumer.maxOffset(mq))
          case (mq, RocketMQOffsetRangeLimit.EARLIEST) =>
            consumer.updateConsumeOffset(mq, consumer.minOffset(mq))
          case (mq, offset) => consumer.updateConsumeOffset(mq, offset)
        }
        partitionOffsets.map {
          case (mq, _) => mq -> consumer.fetchConsumeOffset(mq, false)
        }
      }
    }

    partitionOffsets.foreach {
      case (tp, off) if off != RocketMQOffsetRangeLimit.LATEST &&
        off != RocketMQOffsetRangeLimit.EARLIEST =>
        if (fetched(tp) != off) {
          reportDataLoss(
            s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
        }
      case _ =>
        // no real way to check that beginning or end is reasonable
    }
    RocketMQSourceOffset(fetched)
  }