in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBJournalRequests.scala [175:225]
private def toMsgItem(repr: PersistentRepr): Future[Item] = {
try {
val reprPayload: AnyRef = repr.payload.asInstanceOf[AnyRef]
val serializer = serialization.serializerFor(reprPayload.getClass)
val fut = serializer match {
case aS: AsyncSerializer =>
Serialization.withTransportInformation(context.system.asInstanceOf[ExtendedActorSystem]) { () =>
aS.toBinaryAsync(reprPayload)
}
case _ =>
Future {
ByteBuffer.wrap(serialization.serialize(reprPayload).get).array()
}
}
fut.map { serialized =>
val eventData = B(serialized)
val serializerId = N(serializer.identifier)
val fieldLength = repr.persistenceId.getBytes.length + repr.sequenceNr.toString.getBytes.length +
repr.writerUuid.getBytes.length + repr.manifest.getBytes.length
val manifest = Serializers.manifestFor(serializer, reprPayload)
val manifestLength = if (manifest.isEmpty) 0 else manifest.getBytes.length
val itemSize = keyLength(
repr.persistenceId,
repr.sequenceNr) + eventData.getB.remaining + serializerId.getN.getBytes.length + manifestLength + fieldLength
if (itemSize > MaxItemSize) {
throw new DynamoDBJournalRejection(s"MaxItemSize exceeded: $itemSize > $MaxItemSize")
}
val item: Item = messageKey(repr.persistenceId, repr.sequenceNr)
item.put(PersistentId, S(repr.persistenceId))
item.put(SequenceNr, N(repr.sequenceNr))
item.put(Event, eventData)
item.put(WriterUuid, S(repr.writerUuid))
item.put(SerializerId, serializerId)
if (repr.manifest.nonEmpty) {
item.put(Manifest, S(repr.manifest))
}
if (manifest.nonEmpty) {
item.put(SerializerManifest, S(manifest))
}
item
}
} catch {
case NonFatal(e) => Future.failed(e)
}
}