in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsRelation.scala [45:71]
override def buildScan(): RDD[Row] = {
val partitionCount: Int = eventHubClient.partitionCount
val fromSeqNos = eventHubClient.translate(ehConf, partitionCount)
val untilSeqNos = eventHubClient.translate(ehConf, partitionCount, useStart = false)
require(fromSeqNos.forall(f => f._2 >= 0L),
"Currently only sequence numbers can be passed in your starting positions.")
require(untilSeqNos.forall(u => u._2 >= 0L),
"Currently only sequence numbers can be passed in your ending positions.")
val offsetRanges = untilSeqNos.keySet.map { p =>
val fromSeqNo = fromSeqNos
.getOrElse(p, throw new IllegalStateException(s"$p doesn't have a fromSeqNo"))
val untilSeqNo = untilSeqNos(p)
OffsetRange(ehConf.name, p, fromSeqNo, untilSeqNo, None)
}.toArray
eventHubClient.close()
logInfo(
"GetBatch generating RDD of with offsetRanges: " +
offsetRanges.sortBy(_.nameAndPartition.toString).mkString(", "))
val rdd = EventHubsSourceProvider.toInternalRow(
new EventHubsRDD(sqlContext.sparkContext, ehConf.trimmed, offsetRanges))
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = false).rdd
}