in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala [194:231]
private def validateBatchOptions(caseInsensitiveParams: Map[String, String]) {
// Batch specific options
RocketMQSourceProvider.getRocketMQOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, EarliestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit => // good to go
case LatestOffsetRangeLimit =>
throw new IllegalArgumentException("starting offset can't be latest " +
"for batch queries on RocketMQ")
case SpecificOffsetRangeLimit(partitionOffsets) =>
partitionOffsets.foreach {
case (mq, off) if off == RocketMQOffsetRangeLimit.LATEST =>
throw new IllegalArgumentException(s"starting offsets for $mq can't be latest for batch queries on RocketMQ")
case _ => // ignore
}
}
RocketMQSourceProvider.getRocketMQOffsetRangeLimit(
caseInsensitiveParams, ENDING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) match {
case EarliestOffsetRangeLimit =>
throw new IllegalArgumentException("ending offset can't be earliest " +
"for batch queries on RocketMQ")
case LatestOffsetRangeLimit => // good to go
case SpecificOffsetRangeLimit(partitionOffsets) =>
partitionOffsets.foreach {
case (mq, off) if off == RocketMQOffsetRangeLimit.EARLIEST =>
throw new IllegalArgumentException(s"ending offset for $mq can't be " +
"earliest for batch queries on RocketMQ")
case _ => // ignore
}
}
validateGeneralOptions(caseInsensitiveParams)
// Don't want to throw an error, but at least log a warning.
if (caseInsensitiveParams.get("maxoffsetspertrigger").isDefined) {
logWarning("maxOffsetsPerTrigger option ignored in batch queries")
}
}