in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [485:510]
def getUnprocessedItems(result: BatchGetItemResult, retriesRemaining: Int = 10): Future[BatchGetItemResult] = {
val unprocessed = result.getUnprocessedKeys.get(JournalTable) match {
case null => 0
case x => x.getKeys.size
}
if (unprocessed == 0) Future.successful(result)
else if (retriesRemaining == 0) {
Future.failed(
new DynamoDBJournalFailure(
s"unable to batch get ${result.getUnprocessedKeys.get(JournalTable).getKeys} after 10 tries"))
} else {
val rest = batchGetReq(result.getUnprocessedKeys)
dynamo
.batchGetItem(rest)
.map { rr =>
val items = rr.getResponses.get(JournalTable)
val responses = result.getResponses.get(JournalTable)
items.forEach(new Consumer[Item] {
override def accept(item: Item): Unit = responses.add(item)
})
result.setUnprocessedKeys(rr.getUnprocessedKeys)
result
}
.flatMap(getUnprocessedItems(_, retriesRemaining - 1))
}
}