private def validateBatchOptions()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala [194:231]


  private def validateBatchOptions(caseInsensitiveParams: Map[String, String]) {
    // Batch specific options
    RocketMQSourceProvider.getRocketMQOffsetRangeLimit(
      caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
      case EarliestOffsetRangeLimit => // good to go
      case LatestOffsetRangeLimit =>
        throw new IllegalArgumentException("starting offset can't be latest " +
          "for batch queries on RocketMQ")
      case SpecificOffsetRangeLimit(partitionOffsets) =>
        partitionOffsets.foreach {
          case (mq, off) if off == RocketMQOffsetRangeLimit.LATEST =>
            throw new IllegalArgumentException(s"starting offsets for $mq can't be latest for batch queries on RocketMQ")
          case _ => // ignore
        }
    }

    RocketMQSourceProvider.getRocketMQOffsetRangeLimit(
      caseInsensitiveParams, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
      case EarliestOffsetRangeLimit =>
        throw new IllegalArgumentException("ending offset can't be earliest " +
          "for batch queries on RocketMQ")
      case LatestOffsetRangeLimit => // good to go
      case SpecificOffsetRangeLimit(partitionOffsets) =>
        partitionOffsets.foreach {
          case (mq, off) if off == RocketMQOffsetRangeLimit.EARLIEST =>
            throw new IllegalArgumentException(s"ending offset for $mq can't be " +
              "earliest for batch queries on RocketMQ")
          case _ => // ignore
        }
    }

    validateGeneralOptions(caseInsensitiveParams)

    // Don't want to throw an error, but at least log a warning.
    if (caseInsensitiveParams.get("maxoffsetspertrigger").isDefined) {
      logWarning("maxOffsetsPerTrigger option ignored in batch queries")
    }
  }