in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala [75:136]
def shutdown(): Unit = dynamoDB.shutdown()
private var reporter: ActorRef = _
def setReporter(ref: ActorRef): Unit = reporter = ref
private def send[In <: AmazonWebServiceRequest, Out](aws: In, func: AsyncHandler[In, Out] => juc.Future[Out])(implicit
d: Describe[_ >: In]): Future[Out] = {
def name = d.desc(aws)
def sendSingle(): Future[Out] = {
val p = Promise[Out]()
val handler = new AsyncHandler[In, Out] {
override def onError(ex: Exception) =
ex match {
case DynamoRetriableException(_) =>
p.tryFailure(ex)
case _ =>
val n = name
log.error(ex, "failure while executing {}", n)
p.tryFailure(new DynamoDBJournalFailure("failure while executing " + n, ex))
}
override def onSuccess(req: In, resp: Out) = p.trySuccess(resp)
}
try {
func(handler)
} catch {
case ex: Throwable =>
log.error(ex, "failure while preparing {}", name)
p.tryFailure(ex)
}
p.future
}
val state = new RetryStateHolder
lazy val retry: PartialFunction[Throwable, Future[Out]] = {
case DynamoRetriableException(ex) if state.retries > 0 =>
val backoff = state.backoff
state.retries -= 1
state.backoff *= 2
log.warning("failure while executing {} but will retry! Message: {}", name, ex.getMessage())
after(backoff, scheduler)(sendSingle().recoverWith(retry))
case other: DynamoDBJournalFailure => Future.failed(other)
case other =>
val n = name
Future.failed(new DynamoDBJournalFailure("failed retry " + n, other))
}
if (Tracing) log.debug("{}", name)
val start = if (reporter ne null) System.nanoTime else 0L
// backoff retries when sending too fast
val f = sendSingle().recoverWith(retry)
if (reporter ne null) f.onComplete(_ => reporter ! LatencyReport(System.nanoTime - start, 10 - state.retries))
f
}