in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala [131:164]
override def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
parameters: Map[String, String],
data: DataFrame): BaseRelation = {
mode match {
case SaveMode.Overwrite | SaveMode.Ignore =>
throw new AnalysisException(s"Save mode $mode not allowed for RocketMQ. " +
s"Allowed save modes are ${SaveMode.Append} and " +
s"${SaveMode.ErrorIfExists} (default).")
case _ => // good
}
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val defaultTopic = parameters.get(RocketMQConf.PRODUCER_TOPIC).map(_.trim)
val uniqueGroupId = s"spark-rocketmq-sink-${UUID.randomUUID}"
val specifiedKafkaParams = paramsForProducer(caseInsensitiveParams, uniqueGroupId)
RocketMQWriter.write(sqlContext.sparkSession, data.queryExecution, specifiedKafkaParams, defaultTopic)
/* This method is suppose to return a relation that reads the data that was written.
* We cannot support this for RocketMQ. Therefore, in order to make things consistent,
* we return an empty base relation.
*/
new BaseRelation {
override def sqlContext: SQLContext = unsupportedException
override def schema: StructType = unsupportedException
override def needConversion: Boolean = unsupportedException
override def sizeInBytes: Long = unsupportedException
override def unhandledFilters(filters: Array[Filter]): Array[Filter] = unsupportedException
private def unsupportedException =
throw new UnsupportedOperationException("BaseRelation from RocketMQ write " +
"operation is not usable.")
}
}