in core/src/main/scala/org/apache/pekko/persistence/cassandra/package.scala [59:125]
@InternalApi private[pekko] def serializeEvent(
p: PersistentRepr,
tags: Set[String],
uuid: UUID,
bucketSize: BucketSize,
serialization: Serialization,
system: ActorSystem)(implicit executionContext: ExecutionContext): Future[Serialized] =
try {
// use same clock source as the UUID for the timeBucket
val timeBucket = TimeBucket(Uuids.unixTimestamp(uuid), bucketSize)
def serializeMeta(): Option[SerializedMeta] =
// meta data, if any
p.metadata.map { m =>
val m2 = m.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(m2)
val serManifest = Serializers.manifestFor(serializer, m2)
val metaBuf = ByteBuffer.wrap(serialization.serialize(m2).get)
SerializedMeta(metaBuf, serManifest, serializer.identifier)
}
val event: AnyRef = p.payload.asInstanceOf[AnyRef]
val serializer = serialization.findSerializerFor(event)
val serManifest = Serializers.manifestFor(serializer, event)
serializer match {
case asyncSer: AsyncSerializer =>
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
asyncSer.toBinaryAsync(event).map { bytes =>
val serEvent = ByteBuffer.wrap(bytes)
Serialized(
p.persistenceId,
p.sequenceNr,
serEvent,
tags,
eventAdapterManifest = p.manifest,
serManifest = serManifest,
serId = serializer.identifier,
p.writerUuid,
serializeMeta(),
uuid,
timeBucket)
}
}
case _ =>
Future {
// Serialization.serialize adds transport info
val serEvent = ByteBuffer.wrap(serialization.serialize(event).get)
Serialized(
p.persistenceId,
p.sequenceNr,
serEvent,
tags,
eventAdapterManifest = p.manifest,
serManifest = serManifest,
serId = serializer.identifier,
p.writerUuid,
serializeMeta(),
uuid,
timeBucket)
}
}
} catch {
case NonFatal(e) => Future.failed(e)
}