override def onJobEnd()

in spark-doris-connector/src/main/scala/org/apache/doris/spark/listener/DorisTransactionListener.scala [36:81]


  override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
    val txnIds: mutable.Buffer[Int] = preCommittedTxnAcc.value.asScala
    val failedTxnIds = mutable.Buffer[Int]()
    jobEnd.jobResult match {
      // if job succeed, commit all transactions
      case JobSucceeded =>
        if (txnIds.isEmpty) {
          logger.warn("job run succeed, but there is no pre-committed txn ids")
          return
        }
        logger.info("job run succeed, start committing transactions")
        txnIds.foreach(txnId =>
          Utils.retry(3, Duration.ofSeconds(1), logger) {
            dorisStreamLoad.commit(txnId)
          } match {
            case Success(_) =>
            case Failure(_) => failedTxnIds += txnId
          }
        )

        if (failedTxnIds.nonEmpty) {
          logger.error("uncommitted txn ids: {}", failedTxnIds.mkString(","))
        } else {
          logger.info("commit transaction success")
        }
      // if job failed, abort all pre committed transactions
      case _ =>
        if (txnIds.isEmpty) {
          logger.warn("job run failed, but there is no pre-committed txn ids")
          return
        }
        logger.info("job run failed, start aborting transactions")
        txnIds.foreach(txnId =>
          Utils.retry(3, Duration.ofSeconds(1), logger) {
            dorisStreamLoad.abort(txnId)
          } match {
            case Success(_) =>
            case Failure(_) => failedTxnIds += txnId
          })
        if (failedTxnIds.nonEmpty) {
          logger.error("not aborted txn ids: {}", failedTxnIds.mkString(","))
        } else {
          logger.info("abort transaction success")
        }
    }
  }