in spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/write/DorisDataWriter.scala [82:119]
private def loadBatchWithRetries(record: InternalRow): Unit = {
var isRetrying = false
Retry.exec[Unit, Exception](retries, Duration.ofMillis(retryIntervalMs.toLong), log) {
if (isRetrying) {
// retrying, reload data from buffer
do {
val idx = writer.getBatchCount
writer.load(recordBuffer(idx))
} while (writer.getBatchCount < recordBuffer.size)
isRetrying = false
}
if (writer.endOfBatch()) {
// end of batch, stop batch write
val txnId = Option(writer.stop())
if (twoPhaseCommitEnabled) {
if (txnId.isDefined) {
committedMessages += txnId.get
} else {
throw new Exception("Failed to end batch write")
}
}
// clear buffer if retry is enabled
if (retries > 0) {
recordBuffer.clear()
}
writer.resetBatchCount()
LockSupport.parkNanos(Duration.ofMillis(batchIntervalMs.toLong).toNanos)
}
writer.load(record)
} {
// batch write failed, set retry flag and reset batch count
isRetrying = true
writer.resetBatchCount()
} match {
case Success(_) => if (retries > 0) recordBuffer += record
case Failure(exception) => throw new Exception(exception)
}
}