def getUnprocessedItems()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [485:510]


  def getUnprocessedItems(result: BatchGetItemResult, retriesRemaining: Int = 10): Future[BatchGetItemResult] = {
    val unprocessed = result.getUnprocessedKeys.get(JournalTable) match {
      case null => 0
      case x    => x.getKeys.size
    }
    if (unprocessed == 0) Future.successful(result)
    else if (retriesRemaining == 0) {
      Future.failed(
        new DynamoDBJournalFailure(
          s"unable to batch get ${result.getUnprocessedKeys.get(JournalTable).getKeys} after 10 tries"))
    } else {
      val rest = batchGetReq(result.getUnprocessedKeys)
      dynamo
        .batchGetItem(rest)
        .map { rr =>
          val items = rr.getResponses.get(JournalTable)
          val responses = result.getResponses.get(JournalTable)
          items.forEach(new Consumer[Item] {
            override def accept(item: Item): Unit = responses.add(item)
          })
          result.setUnprocessedKeys(rr.getUnprocessedKeys)
          result
        }
        .flatMap(getUnprocessedItems(_, retriesRemaining - 1))
    }
  }