in core/src/main/scala/org/apache/pekko/persistence/cassandra/snapshot/CassandraSnapshotStore.scala [324:356]
def serialize(payload: Any, meta: Option[Any]): Future[Serialized] =
try {
def serializeMeta(): Option[SerializedMeta] =
meta.map { m =>
val m2 = m.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(m2)
val serManifest = Serializers.manifestFor(serializer, m2)
val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get)
SerializedMeta(metaBuf, serManifest, serializer.identifier)
}
val p: AnyRef = payload.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(p)
val serManifest = Serializers.manifestFor(serializer, p)
serializer match {
case asyncSer: AsyncSerializer =>
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
asyncSer.toBinaryAsync(p).map { bytes =>
val serPayload = ByteBuffer.wrap(bytes)
Serialized(serPayload, serManifest, serializer.identifier, serializeMeta())
}
}
case _ =>
Future {
// Serialization.serialize adds transport info
val serPayload = ByteBuffer.wrap(serialization.serialize(p).get)
Serialized(serPayload, serManifest, serializer.identifier, serializeMeta())
}
}
} catch {
case NonFatal(e) => Future.failed(e)
}