in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala [44:101]
def shutdown(): Unit = dynamoDB.shutdown()
private var reporter: ActorRef = _
def setReporter(ref: ActorRef): Unit = reporter = ref
private def send[In <: AmazonWebServiceRequest, Out](aws: In, func: AsyncHandler[In, Out] => juc.Future[Out])(implicit
d: Describe[_ >: In]): Future[Out] = {
def name = d.desc(aws)
def sendSingle(): Future[Out] = {
val p = Promise[Out]()
val handler = new AsyncHandler[In, Out] {
override def onError(ex: Exception) =
ex match {
case e: ProvisionedThroughputExceededException =>
p.tryFailure(ex)
case _ =>
val n = name
log.error(ex, "failure while executing {}", n)
p.tryFailure(new DynamoDBJournalFailure("failure while executing " + n, ex))
}
override def onSuccess(req: In, resp: Out) = p.trySuccess(resp)
}
try {
func(handler)
} catch {
case ex: Throwable =>
log.error(ex, "failure while preparing {}", name)
p.tryFailure(ex)
}
p.future
}
val state = new RetryStateHolder
lazy val retry: PartialFunction[Throwable, Future[Out]] = {
case _: ProvisionedThroughputExceededException if state.retries > 0 =>
val backoff = state.backoff
state.retries -= 1
state.backoff *= 2
after(backoff, scheduler)(sendSingle().recoverWith(retry))
case other => Future.failed(other)
}
if (Tracing) log.debug("{}", name)
val start = if (reporter ne null) System.nanoTime else 0L
// backoff retries when sending too fast
val f = sendSingle().recoverWith(retry)
if (reporter ne null) f.onComplete(_ => reporter ! LatencyReport(System.nanoTime - start, 10 - state.retries))
f
}