in connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala [48:111]
def getLatestException(): Option[Throwable] = latestException
/** Executes task asynchronously or blocks if more than `maxConcurrentTasks` limit is reached */
def executeAsync(task: T): Future[R] = {
val submissionTimestamp = System.nanoTime()
semaphore.acquire()
val promise = Promise[R]()
pendingFutures.put(promise.future, true)
val executionTimestamp = System.nanoTime()
def tryFuture(): Future[R] = {
val value = Try(asyncAction(task)) recover {
case e =>
val future = new CompletableFuture[R]()
future.completeExceptionally(e)
future
}
value.get.whenComplete(new BiConsumer[R, Throwable] {
private def release() {
semaphore.release()
pendingFutures.remove(promise.future)
}
private def onSuccess(result: R) {
release()
promise.success(result)
successHandler.foreach(_ (task, submissionTimestamp, executionTimestamp))
}
private def onFailure(throwable: Throwable) {
throwable match {
case e: AllNodesFailedException if e.getAllErrors.asScala.values.exists(_.isInstanceOf[BusyConnectionException]) =>
logTrace("BusyConnectionException ... Retrying")
tryFuture()
case e: NoNodeAvailableException =>
logTrace("No Nodes Available ... Retrying")
tryFuture()
case e: OverloadedException =>
logTrace("Backpressure rejection ... Retrying")
tryFuture()
case otherException =>
logError("Failed to execute: " + task, otherException)
latestException = Some(throwable)
release()
promise.failure(throwable)
failureHandler.foreach(_ (task, submissionTimestamp, executionTimestamp))
}
}
override def accept(r: R, t: Throwable): Unit = {
Option(t).foreach(onFailure)
Option(r).foreach(onSuccess)
}
})
promise.future
}
tryFuture()
}