in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQOffsetReader.scala [93:129]
def fetchSpecificOffsets(
partitionOffsets: Map[MessageQueue, Long],
reportDataLoss: String => Unit): RocketMQSourceOffset = {
val fetched = {
withRetries {
val partitions = consumer.fetchSubscribeMessageQueues(topic)
assert(partitions.asScala == partitionOffsets.keySet,
"If startingOffsets contains specific offsets, you must specify all TopicPartitions.\n" +
"Use -1 for latest, -2 for earliest, if you don't care.\n" +
s"Specified: ${partitionOffsets.keySet} Assigned: ${partitions.asScala}")
logDebug(s"Partitions assigned to consumer: $partitions. Seeking to $partitionOffsets")
partitionOffsets.foreach {
case (mq, RocketMQOffsetRangeLimit.LATEST) =>
consumer.updateConsumeOffset(mq, consumer.maxOffset(mq))
case (mq, RocketMQOffsetRangeLimit.EARLIEST) =>
consumer.updateConsumeOffset(mq, consumer.minOffset(mq))
case (mq, offset) => consumer.updateConsumeOffset(mq, offset)
}
partitionOffsets.map {
case (mq, _) => mq -> consumer.fetchConsumeOffset(mq, false)
}
}
}
partitionOffsets.foreach {
case (tp, off) if off != RocketMQOffsetRangeLimit.LATEST &&
off != RocketMQOffsetRangeLimit.EARLIEST =>
if (fetched(tp) != off) {
reportDataLoss(
s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}")
}
case _ =>
// no real way to check that beginning or end is reasonable
}
RocketMQSourceOffset(fetched)
}