in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQWriter.scala [51:83]
def validateQuery(
schema: Seq[Attribute],
options: ju.Map[String, String],
topic: Option[String] = None): Unit = {
schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
if (topic.isEmpty) {
throw new AnalysisException(s"topic option required when no " +
s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
s"${RocketMQConf.PRODUCER_TOPIC} option for setting a topic.")
} else {
Literal(topic.get, StringType)
}
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"Topic type must be a String")
}
schema.find(_.name == TAGS_ATTRIBUTE_NAME).getOrElse(
Literal(null, StringType)
).dataType match {
case StringType => // good
case _ =>
throw new AnalysisException(s"$TAGS_ATTRIBUTE_NAME attribute type must be String")
}
schema.find(_.name == BODY_ATTRIBUTE_NAME).getOrElse(
throw new AnalysisException(s"Required attribute '$BODY_ATTRIBUTE_NAME' not found")
).dataType match {
case StringType | BinaryType => // good
case _ =>
throw new AnalysisException(s"$BODY_ATTRIBUTE_NAME attribute type " +
s"must be a String or BinaryType")
}
}