in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [267:313]
private def maxMessagesPerPartition(
offsets: Map[TopicQueueId, Map[String, Long]]): Option[Map[TopicQueueId, Map[String, Long]]] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)
var lagPerPartition = Map[TopicQueueId, Long]()
var totalLag = 0L
val lagPerPartitionPerQueue = offsets.map{ case (tp, value) =>
val partitionTotal = value.map{ case (name, maxOffset) =>
var count = Math.max(maxOffset - currentOffsets(tp)(name), 0)
totalLag += count
(name, count)
}
lagPerPartition += tp -> partitionTotal.values.sum
tp -> partitionTotal
}
val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
case Some(rate) =>
lagPerPartitionPerQueue.map { case (tp, queues) =>
val backPressRate = Math.round(lagPerPartition(tp) / totalLag.toFloat * rate)
val partitionMessages = (if (maxRateLimitPerPartition > 0) {
Math.min(backPressRate, maxRateLimitPerPartition)} else backPressRate)
tp -> queues.map{ case (name, count) =>
(name, Math.ceil(count / lagPerPartition(tp).toFloat * partitionMessages))
}
}
case None =>
lagPerPartitionPerQueue.map { case (tp, queues) =>
val partitionMessages = maxRateLimitPerPartition
tp -> queues.map{ case (name, count) =>
(name, Math.ceil(count / lagPerPartition(tp).toFloat * partitionMessages))
}
}
}
if (effectiveRateLimitPerPartition.flatMap(_._2).map(_._2).sum > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some(effectiveRateLimitPerPartition.map {
case (tp, limit) => tp -> limit.map{ case (name, count) =>
name -> (count * secsPerBatch).toLong
}
})
} else {
None
}
}