in core/src/main/scala/org/apache/pekko/persistence/cassandra/journal/CassandraJournal.scala [882:935]
def deserializeEvent(row: Row, async: Boolean)(implicit ec: ExecutionContext): Future[DeserializedEvent] =
try {
def meta: OptionVal[AnyRef] = {
if (columnDefinitionCache.hasMetaColumns(row)) {
row.getByteBuffer("meta") match {
case null =>
OptionVal.None // no meta data
case metaBytes =>
// has meta data, wrap in EventWithMetaData
val metaSerId = row.getInt("meta_ser_id")
val metaSerManifest = row.getString("meta_ser_manifest")
serialization.deserialize(Bytes.getArray(metaBytes), metaSerId, metaSerManifest) match {
case Success(m) => OptionVal.Some(m)
case Failure(ex) =>
log.warning(
"Deserialization of event metadata failed (pid: [{}], seq_nr: [{}], meta_ser_id: [{}], meta_ser_manifest: [{}], ignoring metadata content. Exception: {}",
Array(
row.getString("persistence_id"),
row.getLong("sequence_nr"),
metaSerId,
metaSerManifest,
ex.toString))
OptionVal.None
}
}
} else {
// for backwards compatibility, when table was not altered, meta columns not added
OptionVal.None // no meta data
}
}
val bytes = Bytes.getArray(row.getByteBuffer("event"))
val serId = row.getInt("ser_id")
val manifest = row.getString("ser_manifest")
(serialization.serializerByIdentity.get(serId) match {
case Some(asyncSerializer: AsyncSerializer) =>
Serialization.withTransportInformation(system.asInstanceOf[ExtendedActorSystem]) { () =>
asyncSerializer.fromBinaryAsync(bytes, manifest)
}
case _ =>
def deserializedEvent: AnyRef =
// Serialization.deserialize adds transport info
serialization.deserialize(bytes, serId, manifest).get
if (async) Future(deserializedEvent)
else Future.successful(deserializedEvent)
}).map(event => DeserializedEvent(event, meta))(ExecutionContexts.parasitic)
} catch {
case NonFatal(e) => Future.failed(e)
}