def getLatestException()

in connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala [48:111]


  def getLatestException(): Option[Throwable] = latestException

  /** Executes task asynchronously or blocks if more than `maxConcurrentTasks` limit is reached */
  def executeAsync(task: T): Future[R] = {
    val submissionTimestamp = System.nanoTime()
    semaphore.acquire()

    val promise = Promise[R]()
    pendingFutures.put(promise.future, true)

    val executionTimestamp = System.nanoTime()

    def tryFuture(): Future[R] = {
      val value = Try(asyncAction(task)) recover {
        case e =>
          val future = new CompletableFuture[R]()
          future.completeExceptionally(e)
          future
      }

      value.get.whenComplete(new BiConsumer[R, Throwable] {
        private def release() {
          semaphore.release()
          pendingFutures.remove(promise.future)
        }

        private def onSuccess(result: R) {
          release()
          promise.success(result)
          successHandler.foreach(_ (task, submissionTimestamp, executionTimestamp))
        }

        private def onFailure(throwable: Throwable) {
          throwable match {
            case e: AllNodesFailedException if e.getAllErrors.asScala.values.exists(_.isInstanceOf[BusyConnectionException]) =>
              logTrace("BusyConnectionException ... Retrying")
              tryFuture()
            case e: NoNodeAvailableException =>
              logTrace("No Nodes Available ... Retrying")
              tryFuture()
            case e: OverloadedException =>
              logTrace("Backpressure rejection ... Retrying")
              tryFuture()

            case otherException =>
              logError("Failed to execute: " + task, otherException)
              latestException = Some(throwable)
              release()
              promise.failure(throwable)
              failureHandler.foreach(_ (task, submissionTimestamp, executionTimestamp))
          }
        }

        override def accept(r: R, t: Throwable): Unit = {
          Option(t).foreach(onFailure)
          Option(r).foreach(onSuccess)
        }
      })

      promise.future
    }

    tryFuture()
  }