in src/main/scala/org/apache/pekko/persistence/dynamodb/DynamoDBRequests.scala [74:90]
private def sendUnprocessedItems(
result: BatchWriteItemResult,
retriesRemaining: Int = 10,
backoff: FiniteDuration = 1.millis): Future[BatchWriteItemResult] = {
val unprocessed: Int = result.getUnprocessedItems.get(Table) match {
case null => 0
case items => items.size
}
if (unprocessed == 0) Future.successful(result)
else if (retriesRemaining == 0) {
throw new RuntimeException(s"unable to batch write ${result.getUnprocessedItems.get(Table)} after 10 tries")
} else {
val rest = batchWriteReq(result.getUnprocessedItems)
after(backoff, context.system.scheduler)(
dynamo.batchWriteItem(rest).flatMap(r => sendUnprocessedItems(r, retriesRemaining - 1, backoff * 2)))
}
}