def shutdown()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBHelper.scala [75:136]


  def shutdown(): Unit = dynamoDB.shutdown()

  private var reporter: ActorRef = _
  def setReporter(ref: ActorRef): Unit = reporter = ref

  private def send[In <: AmazonWebServiceRequest, Out](aws: In, func: AsyncHandler[In, Out] => juc.Future[Out])(implicit
      d: Describe[_ >: In]): Future[Out] = {

    def name = d.desc(aws)

    def sendSingle(): Future[Out] = {
      val p = Promise[Out]()

      val handler = new AsyncHandler[In, Out] {
        override def onError(ex: Exception) =
          ex match {
            case DynamoRetriableException(_) =>
              p.tryFailure(ex)
            case _ =>
              val n = name
              log.error(ex, "failure while executing {}", n)
              p.tryFailure(new DynamoDBJournalFailure("failure while executing " + n, ex))
          }
        override def onSuccess(req: In, resp: Out) = p.trySuccess(resp)
      }

      try {
        func(handler)
      } catch {
        case ex: Throwable =>
          log.error(ex, "failure while preparing {}", name)
          p.tryFailure(ex)
      }

      p.future
    }

    val state = new RetryStateHolder

    lazy val retry: PartialFunction[Throwable, Future[Out]] = {
      case DynamoRetriableException(ex) if state.retries > 0 =>
        val backoff = state.backoff
        state.retries -= 1
        state.backoff *= 2
        log.warning("failure while executing {} but will retry! Message: {}", name, ex.getMessage())
        after(backoff, scheduler)(sendSingle().recoverWith(retry))
      case other: DynamoDBJournalFailure => Future.failed(other)
      case other =>
        val n = name
        Future.failed(new DynamoDBJournalFailure("failed retry " + n, other))
    }

    if (Tracing) log.debug("{}", name)
    val start = if (reporter ne null) System.nanoTime else 0L

    // backoff retries when sending too fast
    val f = sendSingle().recoverWith(retry)

    if (reporter ne null) f.onComplete(_ => reporter ! LatencyReport(System.nanoTime - start, 10 - state.retries))

    f
  }