private def rateLimit()

in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala [263:319]


  private def rateLimit(
      limit: Long,
      from: Map[NameAndPartition, SequenceNumber],
      until: Map[NameAndPartition, SequenceNumber],
      fromNew: Map[NameAndPartition, SequenceNumber]): Map[NameAndPartition, SequenceNumber] = {

    // if slowPartitionAdjustment is on, get the latest partition performance percentages
    val partitionsPerformancePercentage: Map[NameAndPartition, Double] =
      if (slowPartitionAdjustment) {
        partitionsStatusTracker.partitionsPerformancePercentage.getOrElse(
          defaultPartitionsPerformancePercentage)
      } else {
        defaultPartitionsPerformancePercentage
      }

    val sizes = until.flatMap {
      case (nameAndPartition, end) =>
        // If begin isn't defined, something's wrong, but let alert logic in getBatch handle it
        from.get(nameAndPartition).orElse(fromNew.get(nameAndPartition)).flatMap { begin =>
          val size = end - begin
          logDebug(s"rateLimit $nameAndPartition size is $size")
          if (size > 0) Some(nameAndPartition -> size) else None
        }
    }
    val total = sizes.values.sum.toDouble
    if (total < 1) {
      until
    } else {
      until.map {
        case (nameAndPartition, end) =>
          nameAndPartition -> sizes
            .get(nameAndPartition)
            .map { size =>
              val begin = from.getOrElse(nameAndPartition, fromNew(nameAndPartition))
              // adjust performance performance pewrcentages to use as much as events possible in the batch
              val perforamnceFactor: Double = if (slowPartitionAdjustment) {
                partitionsPerformancePercentage(nameAndPartition)
              } else 1.0

              if (slowPartitionAdjustment) {
                partitionsThrottleFactor(nameAndPartition) = perforamnceFactor
                logInfo(
                  s"Slow partition adjustment is on, so prorate amount for $nameAndPartition will be adjusted by" +
                    s" the perfromanceFactor = $perforamnceFactor")
              }
              val prorate = limit * (size / total) * perforamnceFactor
              logDebug(s"rateLimit $nameAndPartition prorated amount is $prorate")
              // Don't completely starve small partitions
              val off = begin + (if (prorate < 1) Math.ceil(prorate) else Math.floor(prorate)).toLong
              logDebug(s"rateLimit $nameAndPartition new offset is $off")
              // Paranoia, make sure not to return an offset that's past end
              Math.min(end, off)
            }
            .getOrElse(end)
      }
    }
  }