private def withRetries()

in rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQOffsetReader.scala [189:212]


  private def withRetries(
      body: => Map[MessageQueue, Long]): Map[MessageQueue, Long] = synchronized {
    var result: Option[Map[MessageQueue, Long]] = None
    var attempt = 1
    var lastException: Throwable = null
    while (result.isEmpty && attempt <= maxOffsetFetchAttempts) {
      try {
        result = Some(body)
      } catch {
        case NonFatal(e) =>
          lastException = e
          logWarning(s"Error in attempt $attempt getting RocketMQ offsets: ", e)
          attempt += 1
          Thread.sleep(offsetFetchAttemptIntervalMs)
          resetConsumer()
      }
    }
    if (result.isEmpty) {
      assert(attempt > maxOffsetFetchAttempts)
      assert(lastException != null)
      throw lastException
    }
    result.get
  }