in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala [329:420]
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
initialPartitionSeqNos
logInfo(s"getBatch for namespace $namespaceUri, Evenhub $ehName called with start = $start and end = $end")
val untilSeqNos = EventHubsSourceOffset.getPartitionSeqNos(end)
// On recovery, getBatch wil be called before getOffset
if (currentSeqNos.isEmpty) {
currentSeqNos = Some(untilSeqNos)
}
if (start.isDefined && start.get == end) {
return sqlContext.internalCreateDataFrame(sqlContext.sparkContext.emptyRDD,
schema,
isStreaming = true)
}
if (earliestSeqNos.isEmpty) {
val earliestAndLatest = ehClient.allBoundedSeqNos
earliestSeqNos = Some(earliestAndLatest.map {
case (p, (e, _)) => NameAndPartition(ehName, p) -> e
}.toMap)
}
val fromSeqNos = start match {
// recovery mode ..
case Some(prevBatchEndOffset) =>
val prevOffsets = EventHubsSourceOffset.getPartitionSeqNos(prevBatchEndOffset)
val startingSeqNos = if (prevOffsets.size < untilSeqNos.size) {
logInfo(
s"Number of partitions has increased from ${prevOffsets.size} to ${untilSeqNos.size}")
val defaultSeqNos = ehClient
.translate(ehConf, partitionCount)
.map {
case (pId, seqNo) =>
(NameAndPartition(ehName, pId), seqNo)
}
defaultSeqNos ++ prevOffsets
} else {
prevOffsets
}
adjustStartingOffset(startingSeqNos)
case None => adjustStartingOffset(initialPartitionSeqNos)
}
val nameAndPartitions = untilSeqNos.keySet.toSeq
logDebug("Partitions: " + nameAndPartitions.mkString(", "))
val sortedExecutors = getSortedExecutorList(sc)
val numExecutors = sortedExecutors.length
logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
// Calculate offset ranges
val offsetRanges = (for {
np <- nameAndPartitions
fromSeqNo = fromSeqNos.getOrElse(
np,
throw new IllegalStateException(s"$np doesn't have a fromSeqNo"))
untilSeqNo = untilSeqNos(np)
preferredPartitionLocation = ehConf.partitionPreferredLocationStrategy match {
case PartitionPreferredLocationStrategy.Hash => np.hashCode
case PartitionPreferredLocationStrategy.BalancedHash =>
np.ehName.hashCode() + np.partitionId
case _ =>
throw new IllegalArgumentException(
"Unsupported partition strategy: " +
ehConf.partitionPreferredLocationStrategy)
}
preferredLoc = if (numExecutors > 0) {
Some(sortedExecutors(Math.floorMod(preferredPartitionLocation, numExecutors)))
} else None
} yield OffsetRange(np, fromSeqNo, untilSeqNo, preferredLoc)).map { range =>
if (range.untilSeqNo < range.fromSeqNo) {
reportDataLoss(
s"Partition ${range.nameAndPartition}'s sequence number was changed from " +
s"${range.fromSeqNo} to ${range.untilSeqNo}, some data may have been missed")
OffsetRange(range.nameAndPartition, range.fromSeqNo, range.fromSeqNo, range.preferredLoc)
} else {
range
}
}.toArray
// if slowPartitionAdjustment is on, add the current batch to the perforamnce tracker
if (slowPartitionAdjustment) {
addCurrentBatchToStatusTracker(offsetRanges)
throttlingStatusPlugin.foreach(
_.onBatchCreation(partitionContext, localBatchId, offsetRanges, partitionsThrottleFactor))
}
val rdd =
EventHubsSourceProvider.toInternalRow(new EventHubsRDD(sc, ehConf.trimmed, offsetRanges))
logInfo(
s"GetBatch for namespace $namespaceUri, Evenhub $ehName generating RDD of offset range: " +
offsetRanges.sortBy(_.nameAndPartition.toString).mkString(", "))
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}