in spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala [36:81]
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val txnIds: mutable.Buffer[Int] = preCommittedTxnAcc.value.asScala
val failedTxnIds = mutable.Buffer[Int]()
jobEnd.jobResult match {
// if job succeed, commit all transactions
case JobSucceeded =>
if (txnIds.isEmpty) {
logger.warn("job run succeed, but there is no pre-committed txn ids")
return
}
logger.info("job run succeed, start committing transactions")
txnIds.foreach(txnId =>
Utils.retry(3, Duration.ofSeconds(1), logger) {
dorisStreamLoad.commit(txnId)
} match {
case Success(_) =>
case Failure(_) => failedTxnIds += txnId
}
)
if (failedTxnIds.nonEmpty) {
logger.error("uncommitted txn ids: {}", failedTxnIds.mkString(","))
} else {
logger.info("commit transaction success")
}
// if job failed, abort all pre committed transactions
case _ =>
if (txnIds.isEmpty) {
logger.warn("job run failed, but there is no pre-committed txn ids")
return
}
logger.info("job run failed, start aborting transactions")
txnIds.foreach(txnId =>
Utils.retry(3, Duration.ofSeconds(1), logger) {
dorisStreamLoad.abort(txnId)
} match {
case Success(_) =>
case Failure(_) => failedTxnIds += txnId
})
if (failedTxnIds.nonEmpty) {
logger.error("not aborted txn ids: {}", failedTxnIds.mkString(","))
} else {
logger.info("abort transaction success")
}
}
}