in core/src/main/scala/org/apache/spark/eventhubs/PartitionsStatusTracker.scala [178:219]
def partitionsPerformancePercentage(): Option[Map[NameAndPartition, Double]] = {
// if there is no batch in the tracker, return None
if (batchesStatusList.isEmpty) {
logDebug(s"There is no batch in the tracker, so return None")
None
} else {
// find the latest batch with enough updates
// In Scala 2.13 we can use: val latestUpdatedBatch = batchesStatusList.maxByOption(b => b._2.receivedEnoughUpdates)
implicit val ordering = new Ordering[(Long, BatchStatus)] {
override def compare(x: (Long, BatchStatus), y: (Long, BatchStatus)): Int =
(x._1 - y._1).toInt
}
val batchesWithEnoughUpdates = batchesStatusList.filter(b => b._2.receivedEnoughUpdates)
val latestUpdatedBatch: Option[BatchStatus] =
if (batchesWithEnoughUpdates.isEmpty) None else Some(batchesWithEnoughUpdates.max._2)
latestUpdatedBatch match {
case None => {
logDebug(
s"No batch has ${PartitionsStatusTracker.enoughUpdatesCount} partitions with updates (enough updates), " +
s"so return None")
None
}
case Some(batch) => {
logDebug(
s"Batch ${batch.batchId} is the latest batch with enough updates. Caculate and return its perforamnce.")
val performancePercentages = batch.getPerformancePercentages
PartitionsStatusTracker.throttlingStatusPlugin.foreach(
_.onPartitionsPerformanceStatusUpdate(
partitionContext,
batch.batchId,
batch.paritionsStatusList.map(par => (par._1, par._2.batchSize))(breakOut),
batch.paritionsStatusList
.map(par => (par._1, par._2.batchReceiveTimeInMillis))(breakOut),
performancePercentages
)
)
performancePercentages
}
}
}
}