in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQRelation.scala [61:128]
override def buildScan(): RDD[Row] = {
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since RocketMQ will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = s"spark-rocketmq-relation-${UUID.randomUUID}"
val offsetReader = new RocketMQOffsetReader(
RocketMQSourceProvider.paramsForDriver(optionParams),
sourceOptions,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
// Leverage the RocketMQReader to obtain the relevant partition offsets
val (fromPartitionOffsets, untilPartitionOffsets) = {
try {
(getPartitionOffsets(offsetReader, startingOffsets),
getPartitionOffsets(offsetReader, endingOffsets))
} finally {
offsetReader.close()
}
}
// Obtain topicPartitions in both from and until partition offset, ignoring
// topic partitions that were added and/or deleted between the two above calls.
if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
implicit val topicOrdering: Ordering[MessageQueue] = Ordering.by(t => t.getTopic)
val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",")
throw new IllegalStateException("different topic partitions " +
s"for starting offsets topics[$fromTopics] and " +
s"ending offsets topics[$untilTopics]")
}
// Calculate offset ranges
val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
val fromOffset = fromPartitionOffsets.getOrElse(tp, {
// This should not happen since messageQueues contains all partitions not in
// fromPartitionOffsets
throw new IllegalStateException(s"$tp doesn't have a from offset")
})
val untilOffset = untilPartitionOffsets(tp)
RocketMQSourceRDDOffsetRange(tp, fromOffset, untilOffset, None)
}.toArray
logInfo("GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.messageQueue.toString).mkString(", "))
// Create an RDD that reads from RocketMQ and get the (key, value) pair as byte arrays.
val executorRocketMQParams =
RocketMQSourceProvider.paramsForExecutors(optionParams, uniqueGroupId)
val rdd = new RocketMQSourceRDD(
sqlContext.sparkContext, executorRocketMQParams, offsetRanges,
pollTimeoutMs, failOnDataLoss, reuseRocketMQConsumer = false).map { cr =>
// Remove the `brokerName` property which was added by us. See `RocketMQSourceRDD.compute`
val brokerName = cr.getProperties.remove(RocketMQSource.PROP_BROKER_NAME)
InternalRow(
UTF8String.fromString(cr.getTopic), // topic
cr.getFlag, // flag
cr.getBody, // body
UTF8String.fromString(JsonUtils.messageProperties(cr.getProperties)), // properties
UTF8String.fromString(brokerName), // brokerName
cr.getQueueId, // queueId
cr.getQueueOffset, // queueOffset
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getBornTimestamp)), // bornTimestamp
DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getStoreTimestamp)) // storeTimestamp
)
}
sqlContext.internalCreateDataFrame(rdd, schema).rdd
}