in src/main/scala/org/apache/pekko/persistence/dynamodb/DynamoDBRequests.scala [41:63]
def batchWriteReq(writes: Seq[WriteRequest]): BatchWriteItemRequest =
batchWriteReq(Collections.singletonMap(Table, writes.asJava))
def batchWriteReq(items: JMap[String, JList[WriteRequest]]): BatchWriteItemRequest =
new BatchWriteItemRequest().withRequestItems(items).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
/*
* Request execution helpers.
*/
/**
* Execute the given WriteRequests in batches of MaxBatchWrite, ignoring and
* logging all errors. The returned Future never fails.
*/
def doBatch(desc: Seq[WriteRequest] => String, writes: Seq[WriteRequest]): Future[Done] =
Future
.sequence {
writes.grouped(MaxBatchWrite).map { batch =>
dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(sendUnprocessedItems(_)).recover {
case NonFatal(ex) => log.error(ex, "cannot " + desc(batch))
}
}
}