def writeMessages()

in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala [71:125]


  def writeMessages(atomicWrite: AtomicWrite): Future[Try[Unit]] =
    // optimize the common case
    if (atomicWrite.size == 1) {
      toMsgItem(atomicWrite.payload.head)
        .flatMap { event =>
          try {
            if (event.get(Sort).getN == "0") {
              val hs = toHSItem(atomicWrite.persistenceId, atomicWrite.lowestSequenceNr)
              liftUnit(dynamo.batchWriteItem(batchWriteReq(putReq(event) :: putReq(hs) :: Nil)))
            } else {
              liftUnit(dynamo.putItem(putItem(event)))
            }
          } catch {
            case NonFatal(ex) =>
              log.error(ex, "Failure during message write preparation: {}", ex.getMessage)
              Future.successful(Failure(new DynamoDBJournalRejection("write rejected due to " + ex.getMessage, ex)))
          }
        }
        .recover {
          case NonFatal(ex) =>
            log.error(ex, "Failure during message write preparation: {}", ex.getMessage)
            Failure(new DynamoDBJournalRejection("write rejected due to " + ex.getMessage, ex))
        }

    } else {
      Future
        .sequence(atomicWrite.payload.map(repr => toMsgItem(repr)))
        .flatMap { items =>
          // we created our writes successfully, send them off to DynamoDB
          val low = atomicWrite.lowestSequenceNr
          val high = atomicWrite.highestSequenceNr
          val id = atomicWrite.persistenceId
          val size = N(high - low)

          val writes = items.iterator.zipWithIndex.map {
            case (item, index) =>
              item.put(AtomIndex, N(index))
              item.put(AtomEnd, size)
              putReq(item)
          } ++ (if ((low - 1) / PartitionSize != high / PartitionSize) Some(putReq(toHSItem(id, high))) else None)

          val futures = writes.grouped(MaxBatchWrite).map { batch =>
            dynamo.batchWriteItem(batchWriteReq(batch)).flatMap(r => sendUnprocessedItems(r))
          }

          // Squash all of the futures into a single result
          trySequence(futures).map(seq => Try(seq.foreach(_.get)))
        }
        .recover {
          case NonFatal(e) =>
            log.error(e, "Failure during message batch write preparation: {}", e.getMessage)
            val rej = new DynamoDBJournalRejection(s"AtomicWrite rejected as a whole due to ${e.getMessage}", e)
            Failure(rej)
        }
    }