in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala [262:299]
private def deleteLSItem(persistenceId: String, shard: Int): WriteRequest =
new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(lowSeqKey(persistenceId, shard)))
private def putReq(item: Item): WriteRequest = new WriteRequest().withPutRequest(new PutRequest().withItem(item))
private def deleteReq(persistenceId: String, sequenceNr: Long): WriteRequest =
new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(messageKey(persistenceId, sequenceNr)))
/*
* Request execution helpers.
*/
/**
* Sends the unprocessed batch write items, and sets the back-off.
* if no more retries remain (number of back-off retries exhausted), we throw a Runtime exception
*
* Note: the DynamoDB client supports automatic retries, however a batch will not fail if some of the items in the
* batch fail; that is why we need our own back-off mechanism here. If we exhaust OUR retry logic on top of
* the retries from the client, then we are hosed and cannot continue; that is why we have a RuntimeException here
*/
private def sendUnprocessedItems(
result: BatchWriteItemResult,
retriesRemaining: Int = 10,
backoff: FiniteDuration = 1.millis): Future[BatchWriteItemResult] = {
val unprocessed: Int = result.getUnprocessedItems.get(JournalTable) match {
case null => 0
case items => items.size
}
if (unprocessed == 0) Future.successful(result)
else if (retriesRemaining == 0) {
throw new RuntimeException(
s"unable to batch write ${result.getUnprocessedItems.get(JournalTable)} after 10 tries")
} else {
val rest = batchWriteReq(result.getUnprocessedItems)
after(backoff, context.system.scheduler)(
dynamo.batchWriteItem(rest).flatMap(r => sendUnprocessedItems(r, retriesRemaining - 1, backoff * 2)))
}
}