in src/main/scala/org/apache/pekko/persistence/dynamodb/journal/DynamoDBRecovery.scala [432:483]
def readPersistentRepr(item: JMap[String, AttributeValue], async: Boolean): Future[PersistentRepr] = {
val clazz = classOf[PersistentRepr]
if (item.containsKey(Event)) {
val serializerManifest = getValueOrEmptyString(item, SerializerManifest)
val pI = item.get(PersistentId).getS
val sN = item.get(SequenceNr).getN.toLong
val wU = item.get(WriterUuid).getS
val reprManifest = getValueOrEmptyString(item, Manifest)
val eventPayload = item.get(Event).getB
val serId = item.get(SerializerId).getN.toInt
val fut = serialization.serializerByIdentity.get(serId) match {
case Some(asyncSerializer: AsyncSerializer) =>
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
asyncSerializer.fromBinaryAsync(eventPayload.array(), serializerManifest)
}
case _ =>
def deserializedEvent: AnyRef = {
// Serialization.deserialize adds transport info
serialization.deserialize(eventPayload.array(), serId, serializerManifest).get
}
if (async) Future(deserializedEvent)
else
Future.successful(deserializedEvent)
}
fut.map { (event: AnyRef) =>
PersistentRepr(
event,
sequenceNr = sN,
persistenceId = pI,
manifest = reprManifest,
writerUuid = wU,
sender = null)
}
} else {
def deserializedEvent: PersistentRepr = {
// Serialization.deserialize adds transport info
serialization.deserialize(item.get(Payload).getB.array(), clazz).get
}
if (async) Future(deserializedEvent)
else
Future.successful(deserializedEvent)
}
}