in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala [263:319]
private def rateLimit(
limit: Long,
from: Map[NameAndPartition, SequenceNumber],
until: Map[NameAndPartition, SequenceNumber],
fromNew: Map[NameAndPartition, SequenceNumber]): Map[NameAndPartition, SequenceNumber] = {
// if slowPartitionAdjustment is on, get the latest partition performance percentages
val partitionsPerformancePercentage: Map[NameAndPartition, Double] =
if (slowPartitionAdjustment) {
partitionsStatusTracker.partitionsPerformancePercentage.getOrElse(
defaultPartitionsPerformancePercentage)
} else {
defaultPartitionsPerformancePercentage
}
val sizes = until.flatMap {
case (nameAndPartition, end) =>
// If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
from.get(nameAndPartition).orElse(fromNew.get(nameAndPartition)).flatMap { begin =>
val size = end - begin
logDebug(s"rateLimit $nameAndPartition size is $size")
if (size > 0) Some(nameAndPartition -> size) else None
}
}
val total = sizes.values.sum.toDouble
if (total < 1) {
until
} else {
until.map {
case (nameAndPartition, end) =>
nameAndPartition -> sizes
.get(nameAndPartition)
.map { size =>
val begin = from.getOrElse(nameAndPartition, fromNew(nameAndPartition))
// adjust performance performance pewrcentages to use as much as events possible in the batch
val perforamnceFactor: Double = if (slowPartitionAdjustment) {
partitionsPerformancePercentage(nameAndPartition)
} else 1.0
if (slowPartitionAdjustment) {
partitionsThrottleFactor(nameAndPartition) = perforamnceFactor
logInfo(
s"Slow partition adjustment is on, so prorate amount for $nameAndPartition will be adjusted by" +
s" the perfromanceFactor = $perforamnceFactor")
}
val prorate = limit * (size / total) * perforamnceFactor
logDebug(s"rateLimit $nameAndPartition prorated amount is $prorate")
// Don't completely starve small partitions
val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
logDebug(s"rateLimit $nameAndPartition new offset is $off")
// Paranoia, make sure not to return an offset that's past end
Math.min(end, off)
}
.getOrElse(end)
}
}
}