in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala [207:309]
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
// Make sure initialPartitionOffsets is initialized
initialPartitionOffsets
logInfo(s"GetBatch called with start = $start, end = $end")
val untilPartitionOffsets = RocketMQSourceOffset.getPartitionOffsets(end)
// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
}
if (start.isDefined && start.get == end) {
return sqlContext.internalCreateDataFrame(
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
}
val fromPartitionOffsets = start match {
case Some(prevBatchEndOffset) =>
RocketMQSourceOffset.getPartitionOffsets(prevBatchEndOffset)
case None =>
initialPartitionOffsets
}
// Find the new partitions, and get their earliest offsets
val newPartitions = untilPartitionOffsets.keySet.diff(fromPartitionOffsets.keySet)
val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
if (newPartitionOffsets.keySet != newPartitions) {
// We cannot get from offsets for some partitions. It means they got deleted.
val deletedPartitions = newPartitions.diff(newPartitionOffsets.keySet)
reportDataLoss(
s"Cannot find earliest offsets of $deletedPartitions. Some data may have been missed")
}
logInfo(s"Partitions added: $newPartitionOffsets")
newPartitionOffsets.filter(_._2 != 0).foreach { case (p, o) =>
reportDataLoss(
s"Added partition $p starts from $o instead of 0. Some data may have been missed")
}
val deletedPartitions = fromPartitionOffsets.keySet.diff(untilPartitionOffsets.keySet)
if (deletedPartitions.nonEmpty) {
reportDataLoss(s"$deletedPartitions are gone. Some data may have been missed")
}
// Use the until partitions to calculate offset ranges to ignore partitions that have
// been deleted
val topicPartitions = untilPartitionOffsets.keySet.filter { tp =>
// Ignore partitions that we don't know the from offsets.
newPartitionOffsets.contains(tp) || fromPartitionOffsets.contains(tp)
}.toSeq
logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
val sortedExecutors = getSortedExecutorList(sc)
val numExecutors = sortedExecutors.length
logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
// Calculate offset ranges
val offsetRanges = topicPartitions.map { tp =>
val fromOffset = fromPartitionOffsets.getOrElse(tp, {
newPartitionOffsets.getOrElse(tp, {
// This should not happen since newPartitionOffsets contains all partitions not in
// fromPartitionOffsets
throw new IllegalStateException(s"$tp doesn't have a from offset")
})
})
val untilOffset = untilPartitionOffsets(tp)
val preferredLoc = if (numExecutors > 0) {
// This allows cached RocketMQConsumers in the executors to be re-used to read the same
// partition in every batch.
Some(sortedExecutors(Math.floorMod(tp.hashCode, numExecutors)))
} else None
RocketMQSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
}.filter { range =>
if (range.untilOffset < range.fromOffset) {
reportDataLoss(s"Partition ${range.messageQueue}'s offset was changed from " +
s"${range.fromOffset} to ${range.untilOffset}, some data may have been missed")
false
} else {
true
}
}.toArray
// Create an RDD that reads from RocketMQ and get the (key, value) pair as byte arrays.
val rdd = new RocketMQSourceRDD(
sc, executorRocketMQParams, offsetRanges, pollTimeoutMs, failOnDataLoss,
reuseRocketMQConsumer = true).map { cr =>
// Remove the `brokerName` property which was added by us. See `RocketMQSourceRDD.compute`
val brokerName = cr.getProperties.remove(RocketMQSource.PROP_BROKER_NAME)
InternalRow(
UTF8String.fromString(cr.getTopic), // topic
cr.getFlag, // flag
cr.getBody, // body
UTF8String.fromString(JsonUtils.messageProperties(cr.getProperties)), // properties
UTF8String.fromString(brokerName), // brokerName
cr.getQueueId, // queueId
cr.getQueueOffset, // queueOffset
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getBornTimestamp)), // bornTimestamp
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getStoreTimestamp)) // storeTimestamp
)
}
logInfo("GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.messageQueue.toString).mkString(", "))
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}