in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala [168:200]
private def rateLimit(
limit: Long,
from: Map[MessageQueue, Long],
until: Map[MessageQueue, Long]): Map[MessageQueue, Long] = {
val fromNew = offsetReader.fetchEarliestOffsets(until.keySet.diff(from.keySet).toSeq)
val sizes = until.flatMap {
case (tp, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
from.get(tp).orElse(fromNew.get(tp)).flatMap { begin =>
val size = end - begin
logDebug(s"rateLimit $tp size is $size")
if (size > 0) Some(tp -> size) else None
}
}
val total = sizes.values.sum.toDouble
if (total < 1) {
until
} else {
until.map {
case (tp, end) =>
tp -> sizes.get(tp).map { size =>
val begin = from.get(tp).getOrElse(fromNew(tp))
val prorate = limit * (size / total)
logDebug(s"rateLimit $tp prorated amount is $prorate")
// Don't completely starve small topicpartitions
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
logDebug(s"rateLimit $tp new offset is $off")
// Paranoia, make sure not to return an offset that's past end
Math.min(end, off)
}.getOrElse(end)
}
}
}