private def deleteLSItem()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala [262:299]


  private def deleteLSItem(persistenceId: String, shard: Int): WriteRequest =
    new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(lowSeqKey(persistenceId, shard)))

  private def putReq(item: Item): WriteRequest = new WriteRequest().withPutRequest(new PutRequest().withItem(item))

  private def deleteReq(persistenceId: String, sequenceNr: Long): WriteRequest =
    new WriteRequest().withDeleteRequest(new DeleteRequest().withKey(messageKey(persistenceId, sequenceNr)))

  /*
   * Request execution helpers.
   */

  /**
   * Sends the unprocessed batch write items, and sets the back-off.
   * if no more retries remain (number of back-off retries exhausted), we throw a Runtime exception
   *
   * Note: the DynamoDB client supports automatic retries, however a batch will not fail if some of the items in the
   * batch fail; that is why we need our own back-off mechanism here.  If we exhaust OUR retry logic on top of
   * the retries from the client, then we are hosed and cannot continue; that is why we have a RuntimeException here
   */
  private def sendUnprocessedItems(
      result: BatchWriteItemResult,
      retriesRemaining: Int = 10,
      backoff: FiniteDuration = 1.millis): Future[BatchWriteItemResult] = {
    val unprocessed: Int = result.getUnprocessedItems.get(JournalTable) match {
      case null  => 0
      case items => items.size
    }
    if (unprocessed == 0) Future.successful(result)
    else if (retriesRemaining == 0) {
      throw new RuntimeException(
        s"unable to batch write ${result.getUnprocessedItems.get(JournalTable)} after 10 tries")
    } else {
      val rest = batchWriteReq(result.getUnprocessedItems)
      after(backoff, context.system.scheduler)(
        dynamo.batchWriteItem(rest).flatMap(r => sendUnprocessedItems(r, retriesRemaining - 1, backoff * 2)))
    }
  }