private def loadBatchWithRetries()

in spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala [82:119]


  private def loadBatchWithRetries(record: InternalRow): Unit = {
    var isRetrying = false
    Retry.exec[Unit, Exception](retries, Duration.ofMillis(retryIntervalMs.toLong), log) {
      if (isRetrying) {
        // retrying, reload data from buffer
        do {
          val idx = writer.getBatchCount
          writer.load(recordBuffer(idx))
        } while (writer.getBatchCount < recordBuffer.size)
        isRetrying = false
      }
      if (writer.endOfBatch()) {
        // end of batch, stop batch write
        val txnId = Option(writer.stop())
        if (twoPhaseCommitEnabled) {
          if (txnId.isDefined) {
            committedMessages += txnId.get
          } else {
            throw new Exception("Failed to end batch write")
          }
        }
        // clear buffer if retry is enabled
        if (retries > 0) {
          recordBuffer.clear()
        }
        writer.resetBatchCount()
        LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs.toLong).toNanos)
      }
      writer.load(record)
    } {
      // batch write failed, set retry flag and reset batch count
      isRetrying = true
      writer.resetBatchCount()
    } match {
      case Success(_) => if (retries > 0) recordBuffer += record
      case Failure(exception) => throw new Exception(exception)
    }
  }