override def getBatch()

in core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala [329:420]


  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    initialPartitionSeqNos

    logInfo(s"getBatch for namespace $namespaceUri, Evenhub $ehName called with start = $start and end = $end")
    val untilSeqNos = EventHubsSourceOffset.getPartitionSeqNos(end)
    // On recovery, getBatch wil be called before getOffset
    if (currentSeqNos.isEmpty) {
      currentSeqNos = Some(untilSeqNos)
    }
    if (start.isDefined && start.get == end) {
      return sqlContext.internalCreateDataFrame(sqlContext.sparkContext.emptyRDD,
                                                schema,
                                                isStreaming = true)
    }
    if (earliestSeqNos.isEmpty) {
      val earliestAndLatest = ehClient.allBoundedSeqNos
      earliestSeqNos = Some(earliestAndLatest.map {
        case (p, (e, _)) => NameAndPartition(ehName, p) -> e
      }.toMap)
    }

    val fromSeqNos = start match {
      // recovery mode ..
      case Some(prevBatchEndOffset) =>
        val prevOffsets = EventHubsSourceOffset.getPartitionSeqNos(prevBatchEndOffset)
        val startingSeqNos = if (prevOffsets.size < untilSeqNos.size) {
          logInfo(
            s"Number of partitions has increased from ${prevOffsets.size} to ${untilSeqNos.size}")
          val defaultSeqNos = ehClient
            .translate(ehConf, partitionCount)
            .map {
              case (pId, seqNo) =>
                (NameAndPartition(ehName, pId), seqNo)
            }
          defaultSeqNos ++ prevOffsets
        } else {
          prevOffsets
        }
        adjustStartingOffset(startingSeqNos)

      case None => adjustStartingOffset(initialPartitionSeqNos)
    }

    val nameAndPartitions = untilSeqNos.keySet.toSeq
    logDebug("Partitions: " + nameAndPartitions.mkString(", "))
    val sortedExecutors = getSortedExecutorList(sc)
    val numExecutors = sortedExecutors.length
    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))

    // Calculate offset ranges
    val offsetRanges = (for {
      np <- nameAndPartitions
      fromSeqNo = fromSeqNos.getOrElse(
        np,
        throw new IllegalStateException(s"$np doesn't have a fromSeqNo"))

      untilSeqNo = untilSeqNos(np)
      preferredPartitionLocation = ehConf.partitionPreferredLocationStrategy match {
        case PartitionPreferredLocationStrategy.Hash => np.hashCode
        case PartitionPreferredLocationStrategy.BalancedHash =>
          np.ehName.hashCode() + np.partitionId
        case _ =>
          throw new IllegalArgumentException(
            "Unsupported partition strategy: " +
              ehConf.partitionPreferredLocationStrategy)
      }
      preferredLoc = if (numExecutors > 0) {
        Some(sortedExecutors(Math.floorMod(preferredPartitionLocation, numExecutors)))
      } else None
    } yield OffsetRange(np, fromSeqNo, untilSeqNo, preferredLoc)).map { range =>
      if (range.untilSeqNo < range.fromSeqNo) {
        reportDataLoss(
          s"Partition ${range.nameAndPartition}'s sequence number was changed from " +
            s"${range.fromSeqNo} to ${range.untilSeqNo}, some data may have been missed")
        OffsetRange(range.nameAndPartition, range.fromSeqNo, range.fromSeqNo, range.preferredLoc)
      } else {
        range
      }
    }.toArray
    // if slowPartitionAdjustment is on, add the current batch to the perforamnce tracker
    if (slowPartitionAdjustment) {
      addCurrentBatchToStatusTracker(offsetRanges)
      throttlingStatusPlugin.foreach(
        _.onBatchCreation(partitionContext, localBatchId, offsetRanges, partitionsThrottleFactor))
    }
    val rdd =
      EventHubsSourceProvider.toInternalRow(new EventHubsRDD(sc, ehConf.trimmed, offsetRanges))
    logInfo(
      s"GetBatch for namespace $namespaceUri, Evenhub $ehName generating RDD of offset range: " +
        offsetRanges.sortBy(_.nameAndPartition.toString).mkString(", "))
    sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
  }