override def getBatch()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala [207:309]


  override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
    // Make sure initialPartitionOffsets is initialized
    initialPartitionOffsets

    logInfo(s"GetBatch called with start = $start, end = $end")
    val untilPartitionOffsets = RocketMQSourceOffset.getPartitionOffsets(end)
    // On recovery, getBatch will get called before getOffset
    if (currentPartitionOffsets.isEmpty) {
      currentPartitionOffsets = Some(untilPartitionOffsets)
    }
    if (start.isDefined && start.get == end) {
      return sqlContext.internalCreateDataFrame(
        sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
    }
    val fromPartitionOffsets = start match {
      case Some(prevBatchEndOffset) =>
        RocketMQSourceOffset.getPartitionOffsets(prevBatchEndOffset)
      case None =>
        initialPartitionOffsets
    }

    // Find the new partitions, and get their earliest offsets
    val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
    val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
    if (newPartitionOffsets.keySet != newPartitions) {
      // We cannot get from offsets for some partitions. It means they got deleted.
      val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
      reportDataLoss(
        s"Cannot find earliest offsets of $deletedPartitions. Some data may have been missed")
    }
    logInfo(s"Partitions added: $newPartitionOffsets")
    newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
      reportDataLoss(
        s"Added partition $p starts from $o instead of 0. Some data may have been missed")
    }

    val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
    if (deletedPartitions.nonEmpty) {
      reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
    }

    // Use the until partitions to calculate offset ranges to ignore partitions that have
    // been deleted
    val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
      // Ignore partitions that we don't know the from offsets.
      newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
    }.toSeq
    logDebug("TopicPartitions: " + topicPartitions.mkString(", "))

    val sortedExecutors = getSortedExecutorList(sc)
    val numExecutors = sortedExecutors.length
    logDebug("Sorted executors: " + sortedExecutors.mkString(", "))

    // Calculate offset ranges
    val offsetRanges = topicPartitions.map { tp =>
      val fromOffset = fromPartitionOffsets.getOrElse(tp, {
        newPartitionOffsets.getOrElse(tp, {
          // This should not happen since newPartitionOffsets contains all partitions not in
          // fromPartitionOffsets
          throw new IllegalStateException(s"$tp doesn't have a from offset")
        })
      })
      val untilOffset = untilPartitionOffsets(tp)
      val preferredLoc = if (numExecutors > 0) {
        // This allows cached RocketMQConsumers in the executors to be re-used to read the same
        // partition in every batch.
        Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
      } else None
      RocketMQSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
    }.filter { range =>
      if (range.untilOffset < range.fromOffset) {
        reportDataLoss(s"Partition ${range.messageQueue}'s offset was changed from " +
          s"${range.fromOffset} to ${range.untilOffset}, some data may have been missed")
        false
      } else {
        true
      }
    }.toArray

    // Create an RDD that reads from RocketMQ and get the (key, value) pair as byte arrays.
    val rdd = new RocketMQSourceRDD(
      sc, executorRocketMQParams, offsetRanges, pollTimeoutMs, failOnDataLoss,
      reuseRocketMQConsumer = true).map { cr =>
      // Remove the `brokerName` property which was added by us. See `RocketMQSourceRDD.compute`
      val brokerName = cr.getProperties.remove(RocketMQSource.PROP_BROKER_NAME)
      InternalRow(
        UTF8String.fromString(cr.getTopic), // topic
        cr.getFlag, // flag
        cr.getBody, // body
        UTF8String.fromString(JsonUtils.messageProperties(cr.getProperties)), // properties
        UTF8String.fromString(brokerName), // brokerName
        cr.getQueueId, // queueId
        cr.getQueueOffset, // queueOffset
        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getBornTimestamp)), // bornTimestamp
        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getStoreTimestamp)) // storeTimestamp
      )
    }

    logInfo("GetBatch generating RDD of offset range: " +
      offsetRanges.sortBy(_.messageQueue.toString).mkString(", "))

    sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
  }