in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala [71:125]
def writeMessages(atomicWrite: AtomicWrite): Future[Try[Unit]] =
// optimize the common case
if (atomicWrite.size == 1) {
toMsgItem(atomicWrite.payload.head)
.flatMap { event =>
try {
if (event.get(Sort).getN == "0") {
val hs = toHSItem(atomicWrite.persistenceId, atomicWrite.lowestSequenceNr)
liftUnit(dynamo.batchWriteItem(batchWriteReq(putReq(event) :: putReq(hs) :: Nil)))
} else {
liftUnit(dynamo.putItem(putItem(event)))
}
} catch {
case NonFatal(ex) =>
log.error(ex, "Failure during message write preparation: {}", ex.getMessage)
Future.successful(Failure(new DynamoDBJournalRejection("write rejected due to " + ex.getMessage, ex)))
}
}
.recover {
case NonFatal(ex) =>
log.error(ex, "Failure during message write preparation: {}", ex.getMessage)
Failure(new DynamoDBJournalRejection("write rejected due to " + ex.getMessage, ex))
}
} else {
Future
.sequence(atomicWrite.payload.map(repr => toMsgItem(repr)))
.flatMap { items =>
// we created our writes successfully, send them off to DynamoDB
val low = atomicWrite.lowestSequenceNr
val high = atomicWrite.highestSequenceNr
val id = atomicWrite.persistenceId
val size = N(high - low)
val writes = items.iterator.zipWithIndex.map {
case (item, index) =>
item.put(AtomIndex, N(index))
item.put(AtomEnd, size)
putReq(item)
} ++ (if ((low - 1) / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)
val futures = writes.grouped(MaxBatchWrite).map { batch =>
dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(r => sendUnprocessedItems(r))
}
// Squash all of the futures into a single result
trySequence(futures).map(seq => Try(seq.foreach(_.get)))
}
.recover {
case NonFatal(e) =>
log.error(e, "Failure during message batch write preparation: {}", e.getMessage)
val rej = new DynamoDBJournalRejection(s"AtomicWrite rejected as a whole due to ${e.getMessage}", e)
Failure(rej)
}
}