in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQOffsetReader.scala [189:212]
private def withRetries(
body: => Map[MessageQueue, Long]): Map[MessageQueue, Long] = synchronized {
var result: Option[Map[MessageQueue, Long]] = None
var attempt = 1
var lastException: Throwable = null
while (result.isEmpty && attempt <= maxOffsetFetchAttempts) {
try {
result = Some(body)
} catch {
case NonFatal(e) =>
lastException = e
logWarning(s"Error in attempt $attempt getting RocketMQ offsets: ", e)
attempt += 1
Thread.sleep(offsetFetchAttemptIntervalMs)
resetConsumer()
}
}
if (result.isEmpty) {
assert(attempt > maxOffsetFetchAttempts)
assert(lastException != null)
throw lastException
}
result.get
}