def validateQuery()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQWriter.scala [51:83]


  def validateQuery(
      schema: Seq[Attribute],
      options: ju.Map[String, String],
      topic: Option[String] = None): Unit = {
    schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
      if (topic.isEmpty) {
        throw new AnalysisException(s"topic option required when no " +
            s"'$TOPIC_ATTRIBUTE_NAME' attribute is present. Use the " +
            s"${RocketMQConf.PRODUCER_TOPIC} option for setting a topic.")
      } else {
        Literal(topic.get, StringType)
      }
    ).dataType match {
      case StringType => // good
      case _ =>
        throw new AnalysisException(s"Topic type must be a String")
    }
    schema.find(_.name == TAGS_ATTRIBUTE_NAME).getOrElse(
      Literal(null, StringType)
    ).dataType match {
      case StringType => // good
      case _ =>
        throw new AnalysisException(s"$TAGS_ATTRIBUTE_NAME attribute type must be String")
    }
    schema.find(_.name == BODY_ATTRIBUTE_NAME).getOrElse(
      throw new AnalysisException(s"Required attribute '$BODY_ATTRIBUTE_NAME' not found")
    ).dataType match {
      case StringType | BinaryType => // good
      case _ =>
        throw new AnalysisException(s"$BODY_ATTRIBUTE_NAME attribute type " +
            s"must be a String or BinaryType")
    }
  }