in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala [145:169]
def removeLS(persistenceId: String): Future[Done] =
doBatch(
_ => s"remove lowest sequence number entry for $persistenceId",
(0 until SequenceShards).map(deleteLSItem(persistenceId, _)))
/*
* Request and Item construction helpers.
*/
/**
* Converts a sequence of PersistentRepr to a single batch write request
*/
private def toBatchWriteItemRequest(msgs: Seq[PersistentRepr]): Future[BatchWriteItemRequest] = {
Future
.traverse(msgs) { repr =>
for {
item <- toMsgItem(repr)
} yield Seq(putReq(item), putReq(toHSItem(repr.persistenceId, repr.sequenceNr)))
}
.map { writesSeq =>
val writes = writesSeq.flatten
val reqItems = Collections.singletonMap(JournalTable, writes.asJava)
batchWriteReq(reqItems)
}
}