override def buildScan()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQRelation.scala [61:128]


  override def buildScan(): RDD[Row] = {
    // Each running query should use its own group id. Otherwise, the query may be only assigned
    // partial data since RocketMQ will assign partitions to multiple consumers having the same group
    // id. Hence, we should generate a unique id for each query.
    val uniqueGroupId = s"spark-rocketmq-relation-${UUID.randomUUID}"

    val offsetReader = new RocketMQOffsetReader(
      RocketMQSourceProvider.paramsForDriver(optionParams),
      sourceOptions,
      driverGroupIdPrefix = s"$uniqueGroupId-driver")

    // Leverage the RocketMQReader to obtain the relevant partition offsets
    val (fromPartitionOffsets, untilPartitionOffsets) = {
      try {
        (getPartitionOffsets(offsetReader, startingOffsets),
          getPartitionOffsets(offsetReader, endingOffsets))
      } finally {
        offsetReader.close()
      }
    }

    // Obtain topicPartitions in both from and until partition offset, ignoring
    // topic partitions that were added and/or deleted between the two above calls.
    if (fromPartitionOffsets.keySet != untilPartitionOffsets.keySet) {
      implicit val topicOrdering: Ordering[MessageQueue] = Ordering.by(t => t.getTopic)
      val fromTopics = fromPartitionOffsets.keySet.toList.sorted.mkString(",")
      val untilTopics = untilPartitionOffsets.keySet.toList.sorted.mkString(",")
      throw new IllegalStateException("different topic partitions " +
        s"for starting offsets topics[$fromTopics] and " +
        s"ending offsets topics[$untilTopics]")
    }

    // Calculate offset ranges
    val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
      val fromOffset = fromPartitionOffsets.getOrElse(tp, {
          // This should not happen since messageQueues contains all partitions not in
          // fromPartitionOffsets
          throw new IllegalStateException(s"$tp doesn't have a from offset")
      })
      val untilOffset = untilPartitionOffsets(tp)
      RocketMQSourceRDDOffsetRange(tp, fromOffset, untilOffset, None)
    }.toArray

    logInfo("GetBatch generating RDD of offset range: " +
      offsetRanges.sortBy(_.messageQueue.toString).mkString(", "))

    // Create an RDD that reads from RocketMQ and get the (key, value) pair as byte arrays.
    val executorRocketMQParams =
      RocketMQSourceProvider.paramsForExecutors(optionParams, uniqueGroupId)
    val rdd = new RocketMQSourceRDD(
      sqlContext.sparkContext, executorRocketMQParams, offsetRanges,
      pollTimeoutMs, failOnDataLoss, reuseRocketMQConsumer = false).map { cr =>
      // Remove the `brokerName` property which was added by us. See `RocketMQSourceRDD.compute`
      val brokerName = cr.getProperties.remove(RocketMQSource.PROP_BROKER_NAME)
      InternalRow(
        UTF8String.fromString(cr.getTopic), // topic
        cr.getFlag, // flag
        cr.getBody, // body
        UTF8String.fromString(JsonUtils.messageProperties(cr.getProperties)), // properties
        UTF8String.fromString(brokerName), // brokerName
        cr.getQueueId, // queueId
        cr.getQueueOffset, // queueOffset
        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getBornTimestamp)), // bornTimestamp
        DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.getStoreTimestamp)) // storeTimestamp
      )
    }
    sqlContext.internalCreateDataFrame(rdd, schema).rdd
  }