private def maxMessagesPerPartition()

in rocketmq-spark/src/main/scala/org/apache/spark/streaming/MQPullInputDStream.scala [267:313]


  private def maxMessagesPerPartition(
     offsets: Map[TopicQueueId, Map[String, Long]]): Option[Map[TopicQueueId, Map[String, Long]]] = {
    val estimatedRateLimit = rateController.map(_.getLatestRate().toInt)

    var lagPerPartition = Map[TopicQueueId, Long]()
    var totalLag = 0L
    val lagPerPartitionPerQueue = offsets.map{ case (tp, value) =>
      val partitionTotal = value.map{ case (name, maxOffset) =>
        var count = Math.max(maxOffset - currentOffsets(tp)(name), 0)
        totalLag += count
        (name, count)
      }
      lagPerPartition += tp -> partitionTotal.values.sum
      tp -> partitionTotal
    }

    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
      case Some(rate) =>
        lagPerPartitionPerQueue.map { case (tp, queues) =>
          val backPressRate = Math.round(lagPerPartition(tp) / totalLag.toFloat * rate)
          val partitionMessages = (if (maxRateLimitPerPartition > 0) {
            Math.min(backPressRate, maxRateLimitPerPartition)} else backPressRate)
          tp -> queues.map{ case (name, count) =>
            (name, Math.ceil(count / lagPerPartition(tp).toFloat * partitionMessages))
          }
        }
      case None =>

        lagPerPartitionPerQueue.map { case (tp, queues) =>
          val partitionMessages = maxRateLimitPerPartition
          tp -> queues.map{ case (name, count) =>
            (name, Math.ceil(count / lagPerPartition(tp).toFloat * partitionMessages))
          }
        }
    }

    if (effectiveRateLimitPerPartition.flatMap(_._2).map(_._2).sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> limit.map{ case (name, count) =>
          name -> (count * secsPerBatch).toLong
        }
      })
    } else {
      None
    }
  }