override def createRelation()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala [131:164]


  override def createRelation(
      sqlContext: SQLContext,
      mode: SaveMode,
      parameters: Map[String, String],
      data: DataFrame): BaseRelation = {
    mode match {
      case SaveMode.Overwrite | SaveMode.Ignore =>
        throw new AnalysisException(s"Save mode $mode not allowed for RocketMQ. " +
            s"Allowed save modes are ${SaveMode.Append} and " +
            s"${SaveMode.ErrorIfExists} (default).")
      case _ => // good
    }
    val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
    val defaultTopic = parameters.get(RocketMQConf.PRODUCER_TOPIC).map(_.trim)
    val uniqueGroupId = s"spark-rocketmq-sink-${UUID.randomUUID}"

    val specifiedKafkaParams = paramsForProducer(caseInsensitiveParams, uniqueGroupId)
    RocketMQWriter.write(sqlContext.sparkSession, data.queryExecution, specifiedKafkaParams, defaultTopic)

    /* This method is suppose to return a relation that reads the data that was written.
     * We cannot support this for RocketMQ. Therefore, in order to make things consistent,
     * we return an empty base relation.
     */
    new BaseRelation {
      override def sqlContext: SQLContext = unsupportedException
      override def schema: StructType = unsupportedException
      override def needConversion: Boolean = unsupportedException
      override def sizeInBytes: Long = unsupportedException
      override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
      private def unsupportedException =
        throw new UnsupportedOperationException("BaseRelation from RocketMQ write " +
            "operation is not usable.")
    }
  }