def partitionsPerformancePercentage()

in core/src/main/scala/org/apache/spark/eventhubs/PartitionsStatusTracker.scala [178:219]


  def partitionsPerformancePercentage(): Option[Map[NameAndPartition, Double]] = {
    // if there is no batch in the tracker, return None
    if (batchesStatusList.isEmpty) {
      logDebug(s"There is no batch in the tracker, so return None")
      None
    } else {
      // find the latest batch with enough updates
      // In Scala 2.13 we can use: val latestUpdatedBatch = batchesStatusList.maxByOption(b => b._2.receivedEnoughUpdates)
      implicit val ordering = new Ordering[(Long, BatchStatus)] {
        override def compare(x: (Long, BatchStatus), y: (Long, BatchStatus)): Int =
          (x._1 - y._1).toInt
      }
      val batchesWithEnoughUpdates = batchesStatusList.filter(b => b._2.receivedEnoughUpdates)
      val latestUpdatedBatch: Option[BatchStatus] =
        if (batchesWithEnoughUpdates.isEmpty) None else Some(batchesWithEnoughUpdates.max._2)

      latestUpdatedBatch match {
        case None => {
          logDebug(
            s"No batch has ${PartitionsStatusTracker.enoughUpdatesCount} partitions with updates (enough updates), " +
              s"so return None")
          None
        }
        case Some(batch) => {
          logDebug(
            s"Batch ${batch.batchId} is the latest batch with enough updates. Caculate and return its perforamnce.")
          val performancePercentages = batch.getPerformancePercentages
          PartitionsStatusTracker.throttlingStatusPlugin.foreach(
            _.onPartitionsPerformanceStatusUpdate(
              partitionContext,
              batch.batchId,
              batch.paritionsStatusList.map(par => (par._1, par._2.batchSize))(breakOut),
              batch.paritionsStatusList
                .map(par => (par._1, par._2.batchReceiveTimeInMillis))(breakOut),
              performancePercentages
            )
          )
          performancePercentages
        }
      }
    }
  }