def batchWriteReq()

in src/main/scala/org/apache/pekko/persistence/dynamodb/DynamoDBRequests.scala [41:63]


  def batchWriteReq(writes: Seq[WriteRequest]): BatchWriteItemRequest =
    batchWriteReq(Collections.singletonMap(Table, writes.asJava))

  def batchWriteReq(items: JMap[String, JList[WriteRequest]]): BatchWriteItemRequest =
    new BatchWriteItemRequest().withRequestItems(items).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)

  /*
   * Request execution helpers.
   */

  /**
   * Execute the given WriteRequests in batches of MaxBatchWrite, ignoring and
   * logging all errors. The returned Future never fails.
   */
  def doBatch(desc: Seq[WriteRequest] => String, writes: Seq[WriteRequest]): Future[Done] =
    Future
      .sequence {
        writes.grouped(MaxBatchWrite).map { batch =>
          dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(sendUnprocessedItems(_)).recover {
            case NonFatal(ex) => log.error(ex, "cannot " + desc(batch))
          }
        }
      }